Stream Agent
User Intent
"I want real-time streaming responses from AI with tool calling support"
Operation
SDK Method:
graphlit.streamAgent()GraphQL: N/A (uses streaming protocol)
Entity Type: Conversation
Common Use Cases: Chat UI with streaming, real-time AI responses, tool calling, agentic workflows
TypeScript (Canonical)
import { Graphlit } from 'graphlit-client';
import { Types, AgentStreamEvent } from 'graphlit-client/dist/generated/graphql-types';
const graphlit = new Graphlit();
await graphlit.streamAgent(
'What are the key findings in the uploaded documents?', // prompt
async (event: AgentStreamEvent) => {
// Handle different event types
switch (event.type) {
case 'conversation_started':
console.log(`Conversation ID: ${event.conversationId}`);
break;
case 'message_update':
// Streaming message chunks
process.stdout.write(event.message.message);
if (!event.isStreaming) {
console.log('\n[Message complete]');
}
break;
case 'tool_update':
console.log(`Tool: ${event.toolCall.name} - ${event.status}`);
break;
case 'conversation_completed':
console.log(`\nTotal tokens: ${event.usage?.tokens || 0}`);
break;
}
},
undefined, // conversationId (optional - creates new if omitted)
undefined, // specification (optional - uses project default)
[], // tools (optional - for function calling)
{} // toolHandlers (optional - tool implementations)
);Parameters
streamAgent
prompt(string): User's question or messageeventHandler(function): Callback for streaming eventsCalled for each event (message chunks, tool calls, completion)
Must be async function
conversationId(string): Optional conversation IDIf omitted: Creates new conversation
If provided: Continues existing conversation
specification(EntityReferenceInput): Optional LLM configurationtools(ToolDefinitionInput[]): Tools for function callingtoolHandlers(Record<string, Function>): Tool implementations
Response (via Events)
Stream events sent to eventHandler:
conversation_started
{
type: 'conversation_started',
conversationId: string // ID of conversation (new or existing)
}message_update
{
type: 'message_update',
message: {
message: string; // Message text (chunk or full)
isStreaming: boolean; // true = chunk, false = complete
role: 'ASSISTANT',
tokens?: number, // Token count (when complete)
model?: string, // Model used
throughput?: number // Tokens per second
},
isStreaming: boolean // Same as message.isStreaming
}tool_update
{
type: 'tool_update',
toolCall: {
id: string,
name: string, // Tool name
arguments: any // Tool parameters
},
status: 'preparing' | 'executing' | 'completed' | 'failed',
result?: any, // Tool result (when completed)
error?: string // Error message (when failed)
}conversation_completed
{
type: 'conversation_completed',
conversationId: string,
usage?: {
tokens: number, // Total tokens
completionTokens: number,
promptTokens: number,
cost?: number // USD cost
}
}Developer Hints
Streaming vs Non-Streaming
Real-time
Token-by-token
Wait for complete
Tool calling
Supported
Use promptAgent
Use case
Chat UI, streaming
Simple Q&A
Complexity
Higher (event handling)
Lower (single response)
// Use streamAgent when:
// - Building chat UI with streaming
// - Need tool calling
// - Want real-time progress
// Use promptConversation when:
// - Simple Q&A
// - No tool calling needed
// - Don't need streamingEvent Handler Must Be Async
// Correct: async function
await graphlit.streamAgent(prompt, async (event) => {
// Can await operations
await saveEventToDatabase(event);
});
// Wrong: sync function
await graphlit.streamAgent(prompt, (event) => { // Missing async
// Cannot await
});Message Streaming Pattern
Messages stream in chunks, then complete:
await graphlit.streamAgent(prompt, async (event) => {
if (event.type === 'message_update') {
if (event.isStreaming) {
// Streaming chunk
process.stdout.write(event.message.message);
} else {
// Final complete message
console.log('\n[Complete]');
console.log(`Total tokens: ${event.message.tokens}`);
}
}
});🛠 Tool Calling Lifecycle
Tools go through: preparing → executing → completed/failed:
await graphlit.streamAgent(
'Search for information about AI',
async (event) => {
if (event.type === 'tool_update') {
switch (event.status) {
case 'preparing':
console.log(` Preparing tool: ${event.toolCall.name}`);
break;
case 'executing':
console.log(`⚙ Executing tool: ${event.toolCall.name}`);
break;
case 'completed':
console.log(` Tool complete: ${event.toolCall.name}`);
break;
case 'failed':
console.log(` Tool failed: ${event.error}`);
break;
}
}
},
undefined,
undefined,
tools, // Tool definitions
toolHandlers // Tool implementations
);Variations
1. Basic Streaming Chat
Simple streaming without tools:
await graphlit.streamAgent(
'Explain quantum computing in simple terms',
async (event) => {
if (event.type === 'message_update') {
// Write each chunk as it arrives
process.stdout.write(event.message.message);
if (!event.isStreaming) {
console.log('\n--- End of message ---');
}
}
}
);2. Multi-Turn Streaming Conversation
Continue conversation across multiple prompts:
let conversationId: string | undefined;
// First turn
await graphlit.streamAgent(
'What is machine learning?',
async (event) => {
if (event.type === 'conversation_started') {
conversationId = event.conversationId;
console.log(`Started conversation: ${conversationId}`);
}
if (event.type === 'message_update' && !event.isStreaming) {
console.log(event.message.message);
}
}
);
// Second turn (with context from first)
await graphlit.streamAgent(
'Can you give an example?',
async (event) => {
if (event.type === 'message_update' && !event.isStreaming) {
console.log(event.message.message);
}
},
conversationId // Same conversation for context
);3. Streaming with Tool Calling
Implement tools for function calling:
// Define tools
const tools: ToolDefinitionInput[] = [
{
name: 'searchDocuments',
description: 'Search through ingested documents',
schema: JSON.stringify({
type: 'object',
properties: {
query: { type: 'string', description: 'Search query' }
},
required: ['query']
})
}
];
// Implement tool handlers
const toolHandlers: Record<string, Function> = {
searchDocuments: async (args: { query: string }) => {
const results = await graphlit.queryContents({
search: args.query,
limit: 5
});
return {
results: results.contents.results.map(c => ({
name: c.name,
summary: c.summary
}))
};
}
};
// Stream with tools
await graphlit.streamAgent(
'Find information about API rate limiting',
async (event) => {
if (event.type === 'tool_update') {
console.log(`[Tool] ${event.toolCall.name}: ${event.status}`);
}
if (event.type === 'message_update' && !event.isStreaming) {
console.log(`[AI] ${event.message.message}`);
}
},
undefined,
undefined,
tools,
toolHandlers
);4. Streaming with Custom Model
Use specific LLM:
// Create specification
const specResponse = await graphlit.createSpecification({
name: 'GPT-4 Turbo',
type: SpecificationTypes.Completion,
serviceType: ModelServiceTypes.OpenAI,
openAI: {
model: OpenAIModels.Gpt_4_Turbo
}
});
// Stream with custom model
await graphlit.streamAgent(
'Write a detailed analysis',
async (event) => {
if (event.type === 'message_update') {
process.stdout.write(event.message.message);
}
},
undefined,
{ id: specResponse.createSpecification.id } // Use custom model
);5. Collect Full Message from Stream
Buffer chunks to get complete message:
let fullMessage = '';
await graphlit.streamAgent(
'Summarize the document',
async (event) => {
if (event.type === 'message_update') {
if (event.isStreaming) {
fullMessage += event.message.message;
} else {
// Final message
console.log('Complete message:', fullMessage);
console.log('Tokens used:', event.message.tokens);
}
}
}
);6. Track Streaming Metrics
Monitor performance during streaming:
let startTime = Date.now();
let firstTokenTime: number | null = null;
let tokenCount = 0;
await graphlit.streamAgent(
'Explain AI ethics',
async (event) => {
if (event.type === 'message_update') {
if (event.isStreaming && !firstTokenTime) {
firstTokenTime = Date.now();
const ttft = firstTokenTime - startTime;
console.log(`Time to first token: ${ttft}ms`);
}
if (!event.isStreaming) {
const totalTime = Date.now() - startTime;
tokenCount = event.message.tokens || 0;
const throughput = (tokenCount / (totalTime / 1000)).toFixed(2);
console.log(`\nMetrics:`);
console.log(` Total time: ${totalTime}ms`);
console.log(` Tokens: ${tokenCount}`);
console.log(` Throughput: ${throughput} tokens/sec`);
}
}
}
);Common Issues
Issue: Events not firing / no response Solution: Ensure eventHandler is async. Check for errors in console/logs.
Issue: Messages streaming but not in order Solution: This shouldn't happen. If it does, ensure you're not modifying state incorrectly in event handler.
Issue: Tool calls not executing
Solution: Verify tools and toolHandlers are provided. Check tool schema is valid JSON.
Issue: "Streaming not supported" error
Solution: Some models don't support streaming. Falls back to promptAgent automatically.
Issue: Conversation ID not available immediately
Solution: Wait for conversation_started event to get conversation ID.
Issue: Multiple message_update events with same text
Solution: Check isStreaming flag. Final message comes with isStreaming: false.
Production Example
SSE (Server-Sent Events) pattern:
// Server-side streaming to client
await graphlit.streamAgent(
userPrompt,
async (event: AgentStreamEvent) => {
switch (event.type) {
case 'conversation_started':
conversationId = event.conversationId;
// Send SSE event to client
sendSSE({ type: 'started', conversationId: event.conversationId });
break;
case 'message_update':
// Stream message chunks to client
sendSSE({
type: 'message',
content: event.message.message,
isComplete: !event.isStreaming
});
break;
case 'tool_update':
// Send tool status to client
sendSSE({
type: 'tool',
toolName: event.toolCall.name,
status: event.status,
result: event.result
});
break;
case 'conversation_completed':
// Send completion event
sendSSE({
type: 'completed',
tokens: event.usage?.tokens
});
break;
}
},
conversationId,
specificationId,
tools,
toolHandlers
);Last updated
Was this helpful?