Skip to content

Commit edd5a99

Browse files
authored
update async pipeline example (#10042)
1 parent ef33cca commit edd5a99

File tree

1 file changed

+64
-16
lines changed

1 file changed

+64
-16
lines changed

docs-website/docs/concepts/pipelines/asyncpipeline.mdx

Lines changed: 64 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -52,37 +52,85 @@ You can find more details in our [API Reference](/reference/pipeline-api#asyncpi
5252

5353
```python
5454
import asyncio
55-
from haystack import AsyncPipeline
56-
from haystack.components.embedders import SentenceTransformersTextEmbedder
57-
from haystack.components.retrievers import InMemoryEmbeddingRetriever, InMemoryBM25Retriever
58-
from haystack.components.joiners import DocumentJoiner
55+
56+
from haystack import AsyncPipeline, Document
5957
from haystack.components.builders import ChatPromptBuilder
58+
from haystack.components.embedders import (
59+
SentenceTransformersDocumentEmbedder,
60+
SentenceTransformersTextEmbedder,
61+
)
6062
from haystack.components.generators.chat import OpenAIChatGenerator
63+
from haystack.components.joiners import DocumentJoiner
64+
from haystack.components.retrievers import InMemoryBM25Retriever, InMemoryEmbeddingRetriever
65+
from haystack.dataclasses import ChatMessage
66+
from haystack.document_stores.in_memory import InMemoryDocumentStore
67+
68+
documents = [
69+
Document(content="Khufu is the largest pyramid."),
70+
Document(content="Khafre is the middle pyramid."),
71+
Document(content="Menkaure is the smallest pyramid."),
72+
]
73+
74+
docs_embedder = SentenceTransformersDocumentEmbedder()
75+
docs_embedder.warm_up()
76+
77+
document_store = InMemoryDocumentStore()
78+
document_store.write_documents(docs_embedder.run(documents=documents)["documents"])
79+
80+
prompt_template = [
81+
ChatMessage.from_system(
82+
"""
83+
You are a precise, factual QA assistant.
84+
According to the following documents:
85+
{% for document in documents %}
86+
{{document.content}}
87+
{% endfor %}
88+
89+
If an answer cannot be deduced from the documents, say "I don't know based on these documents".
90+
91+
When answering:
92+
- be concise
93+
- list the documents that support your answer
94+
95+
Answer the given question.
96+
"""
97+
),
98+
ChatMessage.from_user("{{query}}"),
99+
ChatMessage.from_system("Answer:"),
100+
]
61101

62102
hybrid_rag_retrieval = AsyncPipeline()
63103
hybrid_rag_retrieval.add_component("text_embedder", SentenceTransformersTextEmbedder())
64-
hybrid_rag_retrieval.add_component("embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store))
65-
hybrid_rag_retrieval.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store))
104+
hybrid_rag_retrieval.add_component(
105+
"embedding_retriever", InMemoryEmbeddingRetriever(document_store=document_store, top_k=3)
106+
)
107+
hybrid_rag_retrieval.add_component("bm25_retriever", InMemoryBM25Retriever(document_store=document_store, top_k=3))
66108
hybrid_rag_retrieval.add_component("document_joiner", DocumentJoiner())
67109
hybrid_rag_retrieval.add_component("prompt_builder", ChatPromptBuilder(template=prompt_template))
68110
hybrid_rag_retrieval.add_component("llm", OpenAIChatGenerator())
69111

70-
hybrid_rag_retrieval.connect("text_embedder", "embedding_retriever")
71-
hybrid_rag_retrieval.connect("bm25_retriever", "document_joiner")
72-
hybrid_rag_retrieval.connect("embedding_retriever", "document_joiner")
73-
hybrid_rag_retrieval.connect("document_joiner", "prompt_builder.documents")
74-
hybrid_rag_retrieval.connect("prompt_builder", "llm")
112+
hybrid_rag_retrieval.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
113+
hybrid_rag_retrieval.connect("bm25_retriever.documents", "document_joiner.documents")
114+
hybrid_rag_retrieval.connect("embedding_retriever.documents", "document_joiner.documents")
115+
hybrid_rag_retrieval.connect("document_joiner.documents", "prompt_builder.documents")
116+
hybrid_rag_retrieval.connect("prompt_builder.prompt", "llm.messages")
117+
118+
question = "Which pyramid is neither the smallest nor the biggest?"
119+
120+
data = {
121+
"prompt_builder": {"query": question},
122+
"text_embedder": {"text": question},
123+
"bm25_retriever": {"query": question},
124+
}
75125

76126
async def process_results():
77127
async for partial_output in hybrid_rag_retrieval.run_async_generator(
78-
data=data,
79-
include_outputs_from={"document_joiner", "llm"}
128+
data=data, include_outputs_from={"document_joiner", "llm"}
80129
):
81-
# Each partial_output contains the results from a completed component
82-
if "retriever" in partial_output:
130+
if "document_joiner" in partial_output:
83131
print("Retrieved documents:", len(partial_output["document_joiner"]["documents"]))
84132
if "llm" in partial_output:
85133
print("Generated answer:", partial_output["llm"]["replies"][0])
86134

87135
asyncio.run(process_results())
88-
```
136+
```

0 commit comments

Comments
 (0)