Skip to content

Commit c3b3c28

Browse files
committed
Update information in streaming chunks and fix convert streaming chunks to chat message when handling reasoning content
1 parent a85b61b commit c3b3c28

File tree

2 files changed

+126
-112
lines changed

2 files changed

+126
-112
lines changed

haystack/components/generators/chat/openai_responses.py

Lines changed: 68 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -488,9 +488,8 @@ def _handle_stream_response(self, responses: Stream, callback: SyncStreamingCall
488488
chunk_delta = _convert_response_chunk_to_streaming_chunk(
489489
chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
490490
)
491-
if chunk_delta:
492-
chunks.append(chunk_delta)
493-
callback(chunk_delta)
491+
chunks.append(chunk_delta)
492+
callback(chunk_delta)
494493
chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
495494
return [chat_message]
496495

@@ -503,9 +502,8 @@ async def _handle_async_stream_response(
503502
chunk_delta = _convert_response_chunk_to_streaming_chunk(
504503
chunk=openai_chunk, previous_chunks=chunks, component_info=component_info
505504
)
506-
if chunk_delta:
507-
chunks.append(chunk_delta)
508-
await callback(chunk_delta)
505+
chunks.append(chunk_delta)
506+
await callback(chunk_delta)
509507
chat_message = _convert_streaming_chunks_to_chat_message(chunks=chunks)
510508
return [chat_message]
511509

@@ -581,19 +579,28 @@ def _convert_response_chunk_to_streaming_chunk( # pylint: disable=too-many-retu
581579
if chunk.type == "response.output_item.added":
582580
# Responses API always returns reasoning chunks even if there is no summary
583581
if chunk.item.type == "reasoning":
584-
extra = chunk.item.to_dict()
585-
reasoning = ReasoningContent(reasoning_text="", extra=extra)
582+
reasoning = ReasoningContent(reasoning_text="", extra=chunk.item.to_dict())
586583
return StreamingChunk(
587-
content="", component_info=component_info, index=chunk.output_index, reasoning=reasoning, start=True
584+
content="",
585+
component_info=component_info,
586+
index=chunk.output_index,
587+
reasoning=reasoning,
588+
start=True,
589+
meta={"received_at": datetime.now().isoformat()},
588590
)
589591

590592
# the function name is only streamed at the start and end of the function call
591593
if chunk.item.type == "function_call":
592-
function = chunk.item.name
593-
extra = {"type": "function_call", "call_id": chunk.item.call_id, "status": chunk.item.status}
594-
tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item.id, tool_name=function, extra=extra)
594+
tool_call = ToolCallDelta(
595+
index=chunk.output_index, id=chunk.item.id, tool_name=chunk.item.name, extra=chunk.item.to_dict()
596+
)
595597
return StreamingChunk(
596-
content="", component_info=component_info, index=chunk.output_index, tool_calls=[tool_call], start=True
598+
content="",
599+
component_info=component_info,
600+
index=chunk.output_index,
601+
tool_calls=[tool_call],
602+
start=True,
603+
meta={"received_at": datetime.now().isoformat()},
597604
)
598605

599606
elif chunk.type == "response.completed":
@@ -604,14 +611,10 @@ def _convert_response_chunk_to_streaming_chunk( # pylint: disable=too-many-retu
604611
content="",
605612
component_info=component_info,
606613
finish_reason="tool_calls" if any(o.type == "function_call" for o in chunk.response.output) else "stop",
607-
meta=chunk.to_dict(),
614+
meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
608615
)
609616

610617
elif chunk.type == "response.output_text.delta":
611-
# if item is a ResponseTextDeltaEvent
612-
meta = chunk.to_dict()
613-
meta["received_at"] = datetime.now().isoformat()
614-
615618
# Start is determined by checking if this is the first text delta event of a new output_index
616619
# 1) Check if all previous chunks have different output_index
617620
# 2) If any chunks do have the same output_index, check if they have content
@@ -620,17 +623,25 @@ def _convert_response_chunk_to_streaming_chunk( # pylint: disable=too-many-retu
620623
c.content == "" for c in previous_chunks if c.index == chunk.output_index
621624
)
622625
return StreamingChunk(
623-
content=chunk.delta, component_info=component_info, index=chunk.output_index, meta=meta, start=start
626+
content=chunk.delta,
627+
component_info=component_info,
628+
index=chunk.output_index,
629+
start=start,
630+
meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
624631
)
625632

626633
elif chunk.type == "response.reasoning_summary_text.delta":
627-
# we remove the delta from the extra because it is already in the reasoning_text
628-
# rest of the information needs to be saved for chat message
634+
# We remove the delta from the extra because it is already in the reasoning_text
635+
# Remaining information needs to be saved for chat message
629636
extra = chunk.to_dict()
630637
extra.pop("delta")
631638
reasoning = ReasoningContent(reasoning_text=chunk.delta, extra=extra)
632639
return StreamingChunk(
633-
content="", component_info=component_info, index=chunk.output_index, reasoning=reasoning, start=False
640+
content="",
641+
component_info=component_info,
642+
index=chunk.output_index,
643+
reasoning=reasoning,
644+
meta={"received_at": datetime.now().isoformat()},
634645
)
635646

636647
# the function arguments are streamed in parts
@@ -639,16 +650,22 @@ def _convert_response_chunk_to_streaming_chunk( # pylint: disable=too-many-retu
639650
arguments = chunk.delta
640651
extra = chunk.to_dict()
641652
extra.pop("delta")
642-
# in delta of tool calls there is no call_id
643-
# so we use the item_id which is the function call id
653+
# in delta of tool calls there is no call_id so we use the item_id which is the function call id
644654
tool_call = ToolCallDelta(index=chunk.output_index, id=chunk.item_id, arguments=arguments, extra=extra)
645655
return StreamingChunk(
646-
content="", component_info=component_info, index=chunk.output_index, tool_calls=[tool_call], start=False
656+
content="",
657+
component_info=component_info,
658+
index=chunk.output_index,
659+
tool_calls=[tool_call],
660+
meta={"received_at": datetime.now().isoformat()},
647661
)
648662

649663
# we return rest of the chunk as is
650664
chunk_message = StreamingChunk(
651-
content="", component_info=component_info, index=getattr(chunk, "output_index", None), meta=chunk.to_dict()
665+
content="",
666+
component_info=component_info,
667+
index=getattr(chunk, "output_index", None),
668+
meta={**chunk.to_dict(), "received_at": datetime.now().isoformat()},
652669
)
653670
return chunk_message
654671

@@ -661,9 +678,14 @@ def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> C
661678
662679
:returns: The ChatMessage.
663680
"""
681+
# Get the full text by concatenating all text chunks
664682
text = "".join([chunk.content for chunk in chunks])
665-
reasoning = None
666-
tool_calls = []
683+
684+
# Gather reasoning information if present
685+
reasoning_chunks = [chunk for chunk in chunks if chunk.reasoning]
686+
reasoning_ids = [chunk.reasoning.extra.get("id") for chunk in reasoning_chunks if chunk.reasoning.extra.get("id")]
687+
reasoning_id = reasoning_ids[0] if reasoning_ids else None
688+
reasoning_text = "".join([chunk.reasoning.reasoning_text for chunk in reasoning_chunks]) if reasoning_chunks else ""
667689

668690
# Process tool calls if present in any chunk
669691
tool_call_data: dict[str, dict[str, Any]] = {} # Track tool calls by id
@@ -676,18 +698,19 @@ def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> C
676698
if tool_call.id not in tool_call_data:
677699
tool_call_data[tool_call.id] = {"name": "", "arguments": ""}
678700

679-
if tool_call.tool_name is not None:
680-
# we dont need to append the tool name as it is passed once in the start of the function call
681-
tool_call_data[tool_call.id]["name"] = tool_call.tool_name
682701
if tool_call.arguments is not None:
683702
tool_call_data[tool_call.id]["arguments"] += tool_call.arguments
684-
if tool_call.extra is not None and tool_call.extra.get("type") == "function_call":
685-
tool_call_data[tool_call.id]["extra"] = tool_call.extra
686703

687-
if chunk.reasoning:
688-
reasoning = chunk.reasoning
704+
# We capture the tool name from one of the chunks
705+
if tool_call.tool_name is not None:
706+
tool_call_data[tool_call.id]["name"] = tool_call.tool_name
707+
708+
# We capture the call_id from one of the chunks
709+
if tool_call.extra and "call_id" in tool_call.extra:
710+
tool_call_data[tool_call.id]["extra"] = {"call_id": tool_call.extra["call_id"]}
689711

690712
# Convert accumulated tool call data into ToolCall objects
713+
tool_calls = []
691714
sorted_keys = sorted(tool_call_data.keys())
692715
for key in sorted_keys:
693716
tool_call_dict = tool_call_data[key]
@@ -705,16 +728,17 @@ def _convert_streaming_chunks_to_chat_message(chunks: list[StreamingChunk]) -> C
705728
_arguments=tool_call_dict["arguments"],
706729
)
707730

708-
# the final response is the last chunk with the response metadata
731+
# We dump the entire final response into meta to be consistent with non-streaming response
709732
final_response = chunks[-1].meta.get("response")
710-
meta: dict[str, Any] = {
711-
"model": final_response.get("model") if final_response else None,
712-
"index": 0,
713-
"response_start_time": final_response.get("created_at") if final_response else None,
714-
"usage": final_response.get("usage") if final_response else None,
715-
}
716-
717-
return ChatMessage.from_assistant(text=text or None, tool_calls=tool_calls, meta=meta, reasoning=reasoning)
733+
734+
# Add reasoning content if both id and text are available
735+
reasoning = None
736+
if reasoning_id and reasoning_text:
737+
reasoning = ReasoningContent(reasoning_text=reasoning_text, extra={"id": reasoning_id, "type": "reasoning"})
738+
739+
return ChatMessage.from_assistant(
740+
text=text or None, tool_calls=tool_calls, meta=final_response, reasoning=reasoning
741+
)
718742

719743

720744
def _convert_chat_message_to_responses_api_format(message: ChatMessage) -> list[dict[str, Any]]:

0 commit comments

Comments
 (0)