Streaming Responses
What You’ll Build
Section titled “What You’ll Build”A prompt that streams responses chunk by chunk — ideal for chat UIs where
users see tokens appear in real time. Prompty wraps the raw SDK stream in a
tracing-aware PromptyStream so you get full observability without losing
any data.
Step 1: Enable Streaming
Section titled “Step 1: Enable Streaming”Set stream: true in the model’s additionalProperties — either in the
.prompty file or at runtime.
In the .prompty File
Section titled “In the .prompty File”---name: streaming-chatdescription: A chat prompt with streaming enabledmodel: id: gpt-4o-mini provider: openai apiType: chat connection: kind: key apiKey: ${env:OPENAI_API_KEY} options: temperature: 0.7 additionalProperties: stream: trueinputs: - name: question kind: string default: Tell me a joke---system:You are a helpful assistant.
user:{{question}}At Runtime
Section titled “At Runtime”If your .prompty file doesn’t have streaming enabled, you can toggle it
before execution:
from prompty import load
agent = load("chat.prompty")agent.model.options.additionalProperties["stream"] = TrueStep 2: Consume the Stream
Section titled “Step 2: Consume the Stream”Use run() with raw=True to get the unprocessed PromptyStream, then
pass it to process() which yields text chunks.
from prompty import load, prepare, run, process
agent = load("chat.prompty")agent.model.options.additionalProperties["stream"] = True
# prepare() renders the template and parses into messagesmessages = prepare(agent, inputs={"question": "Tell me a joke"})
# run() with raw=True returns the PromptyStreamstream = run(agent, messages, raw=True)
# process() yields text chunksfor chunk in process(agent, stream): print(chunk, end="", flush=True)print() # newline after stream completesimport asynciofrom prompty import load_async, prepare_async, run_async, process_async
async def main(): agent = await load_async("chat.prompty") agent.model.options.additionalProperties["stream"] = True
messages = await prepare_async( agent, inputs={"question": "Tell me a joke"} ) stream = await run_async(agent, messages, raw=True)
async for chunk in process_async(agent, stream): print(chunk, end="", flush=True) print()
asyncio.run(main())import { load, prepare, run, process as processResponse } from "@prompty/core";import "@prompty/openai"; // registers provider
const agent = await load("chat.prompty");agent.model.options.additionalProperties.stream = true;
const messages = await prepare(agent, { question: "Tell me a joke" });const stream = await run(agent, messages, { raw: true });
for await (const chunk of processResponse(agent, stream)) { process.stdout.write(chunk);}console.log();using Prompty.Core;
var agent = PromptyLoader.Load("chat.prompty");var messages = await Pipeline.PrepareAsync( agent, new() { ["question"] = "Tell me a joke" });
// raw: true returns a PromptyStream (IAsyncEnumerable<object>)var raw = await Pipeline.RunAsync(agent, messages, raw: true);if (raw is PromptyStream stream){ await foreach (var chunk in stream) { Console.Write(chunk); }}
Console.WriteLine();use prompty::{StreamChunk, consume_stream_chunks};use serde_json::json;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { prompty::register_defaults(); prompty_openai::register();
let agent = prompty::load("chat.prompty")?; let inputs = json!({ "question": "Tell me a joke" }); let messages = prompty::prepare(&agent, Some(&inputs)).await?; let result = prompty::run(&agent, &messages).await?;
let stream = prompty::from_structured_value::<prompty::PromptyStream>(&result)?; consume_stream_chunks(stream, |chunk| match chunk { StreamChunk::Text(t) => print!("{t}"), StreamChunk::Thinking(t) => { /* reasoning tokens */ }, StreamChunk::Tool(tc) => { /* tool call */ }, StreamChunk::Error(e) => eprintln!("{e}"), }).await; println!(); Ok(())}Each chunk is a string— the processor extracts delta.content from
the raw API response objects so you don’t handle the wire format yourself.
How Streaming + Tracing Works
Section titled “How Streaming + Tracing Works”A common concern with streaming is losing observability — if chunks are consumed lazily, when does the trace fire?
Prompty’s PromptyStream wrapper solves this:
- The executor wraps the raw SDK iterator in a
PromptyStream. - As you iterate, each chunk is forwarded to your code and appended to an internal accumulator.
- When the iterator is exhausted (
StopIteration), the wrapper flushes the complete accumulated response to the active tracer.
iterate chunk 1 → yield + accumulateiterate chunk 2 → yield + accumulateiterate chunk 3 → yield + accumulate ...StopIteration → flush accumulated data to tracer ✓What the Processor Handles
Section titled “What the Processor Handles”The streaming processor does more than forward raw chunks:
| Scenario | Behavior |
|---|---|
| Content deltas | delta.content strings are yielded directly to the caller |
| Tool-call deltas | Argument fragments are accumulated; a complete ToolCall is yielded when the stream ends |
| Refusal | If delta.refusal is present, the processor raises a ValueError |
| Empty / heartbeat chunks | Chunks with no content or tool-call data are silently skipped |
Complete Example: Streaming Chat App
Section titled “Complete Example: Streaming Chat App”Here’s a self-contained example you can copy and run:
---name: stream-demomodel: id: gpt-4o-mini provider: openai apiType: chat connection: kind: key apiKey: ${env:OPENAI_API_KEY} options: temperature: 0.9 additionalProperties: stream: trueinputs: - name: topic kind: string default: space exploration---system:You are a creative storyteller.
user:Write a short story about {{topic}}.from prompty import load, prepare, run, process
agent = load("stream-demo.prompty")messages = prepare(agent, inputs={"topic": "a robot learning to paint"})stream = run(agent, messages, raw=True)
print("Story: ", end="")for chunk in process(agent, stream): print(chunk, end="", flush=True)print("\n--- Done ---")Complete Tested Example
Section titled “Complete Tested Example”A full, tested example you can copy and run:
"""Streaming chat completion.
This example shows how to consume streaming responses.Used in: how-to/streaming.mdx"""from __future__ import annotations
from prompty import invoke, load
agent = load("streaming-chat.prompty")for chunk in invoke(agent, inputs={"question": "Tell me a short story"}): print(chunk, end="", flush=True)print()/** * Streaming chat completion — consume response chunks as they arrive. * * The streaming-chat.prompty file sets `stream: true` in model options. * The executor wraps the response in a PromptyStream, and the * processor yields content strings from each chunk. * * @example * ```bash * OPENAI_API_KEY=sk-... npx tsx examples/streaming.ts * ``` */import "@prompty/openai";import { invoke } from "@prompty/core";import { resolve } from "node:path";
const promptyFile = resolve(import.meta.dirname, "../../prompts/streaming-chat.prompty");
export async function streamingChat(question?: string): Promise<string> { // invoke() returns a PromptyStream when the prompty has stream: true const stream = await invoke(promptyFile, { question: question ?? "Tell me a short story", });
// If the result is an async iterable, consume chunks if (stream && typeof stream === "object" && Symbol.asyncIterator in stream) { const chunks: string[] = []; for await (const chunk of stream as AsyncIterable<string>) { process.stdout.write(String(chunk)); chunks.push(String(chunk)); } console.log(); // newline after streaming return chunks.join(""); }
// Non-streaming fallback return stream as string;}
// Run directlyconst response = await streamingChat();console.log("\nComplete response length:", response.length);// Copyright (c) Microsoft. All rights reserved.
using Prompty.Core;using Prompty.OpenAI;
namespace DocsExamples.Examples;
/// <summary>/// Streaming chat completion — consume response chunks as they arrive./// </summary>public static class Streaming{ /// <summary> /// Invokes a streaming .prompty and iterates over the PromptyStream chunks. /// The streaming-chat.prompty sets stream:true via additionalProperties. /// </summary> public static async Task<List<object>> RunAsync( string promptyPath, Dictionary<string, object?>? inputs = null) { // One-time setup new PromptyBuilder() .AddOpenAI();
// Load the agent — stream: true is set in the prompty's additionalProperties var agent = PromptyLoader.Load(promptyPath);
// Set streaming flag in metadata (the executor checks this) agent.Metadata ??= new Dictionary<string, object>(); agent.Metadata["stream"] = true;
// Prepare messages var messages = await Pipeline.PrepareAsync(agent, inputs);
// Execute returns a PromptyStream when streaming is enabled var response = await Pipeline.ExecuteAsync(agent, messages);
var chunks = new List<object>(); if (response is PromptyStream stream) { await foreach (var chunk in stream) { chunks.Add(chunk); } }
return chunks; }}use prompty::{StreamChunk, consume_stream_chunks};use serde_json::json;
#[tokio::main]async fn main() -> Result<(), Box<dyn std::error::Error>> { prompty::register_defaults(); prompty_openai::register();
let agent = prompty::load("chat.prompty")?; let inputs = json!({ "question": "Tell me a short joke" }); let messages = prompty::prepare(&agent, Some(&inputs)).await?; let result = prompty::run(&agent, &messages).await?;
let stream = prompty::from_structured_value::<prompty::PromptyStream>(&result)?; consume_stream_chunks(stream, |chunk| match chunk { StreamChunk::Text(t) => print!("{t}"), StreamChunk::Thinking(_) => {}, StreamChunk::Tool(_) => {}, StreamChunk::Error(e) => eprintln!("{e}"), }).await; println!(); Ok(())}Further Reading
Section titled “Further Reading”- Streaming concept — architecture details on
PromptyStreamandAsyncPromptyStream - Tracing — how traces capture streaming data
- Troubleshooting — common issues and debugging tips