Click a tab to see the full streaming pipeline code for each framework. Same task: BM25 retrieval + streaming LLM output consumed as tokens arrive.
from synapsekit import RAG rag = RAG(model="gpt-4o-mini", api_key=KEY, provider="openai") await rag.add_documents(DOCS) async for token in rag.stream(QUERY): print(token, end="", flush=True)
rag.stream() is a single async generator on the RAG object. Retrieve + prompt + LLM + token yield are all fused into one call. Limitation: async-only — no .stream() sync path. If you're not in an event loop, you need asyncio.run().
from langchain_community.retrievers import BM25Retriever from langchain_openai import ChatOpenAI from langchain_core.prompts import ChatPromptTemplate from langchain_core.runnables import RunnablePassthrough retriever = BM25Retriever.from_texts(DOCS, k=3) prompt = ChatPromptTemplate.from_template("Context: {ctx}\n\nQ: {q}") llm = ChatOpenAI(model="gpt-4o-mini", streaming=True) chain = {"ctx": retriever, "q": RunnablePassthrough()} | prompt | llm for chunk in chain.stream(QUERY): print(chunk.content, end="", flush=True)
{ctx, q} | prompt | llm) makes every step explicit. .stream() traverses the chain synchronously; .astream() is the async version. Strength: callback handlers let you bind stream output to UI progress bars or multi-consumer fan-out. Most flexible runtime of the three.
from llama_index.core import Document, VectorStoreIndex, Settings from llama_index.llms.openai import OpenAI Settings.llm = OpenAI(model="gpt-4o-mini") index = VectorStoreIndex.from_documents([Document(text=d) for d in DOCS]) engine = index.as_query_engine(streaming=True) response = engine.query(QUERY) for chunk in response.response_gen: print(chunk, end="", flush=True)
streaming=True on the query engine. response.response_gen is a native Python generator. Clean. Weakness: no native async stream on the query engine. If you're on an async stack, you wrap the sync generator in a thread or reach for the lower-level LLM API.