Stream Agent

User Intent

"I want real-time streaming responses from AI with tool calling support"

Operation

  • SDK Method: graphlit.streamAgent()

  • GraphQL: N/A (uses streaming protocol)

  • Entity Type: Conversation

  • Common Use Cases: Chat UI with streaming, real-time AI responses, tool calling, agentic workflows

TypeScript (Canonical)

import { Graphlit } from 'graphlit-client';
import { Types, AgentStreamEvent } from 'graphlit-client/dist/generated/graphql-types';

const graphlit = new Graphlit();

await graphlit.streamAgent(
  'What are the key findings in the uploaded documents?',  // prompt
  async (event: AgentStreamEvent) => {
    // Handle different event types
    switch (event.type) {
      case 'conversation_started':
        console.log(`Conversation ID: ${event.conversationId}`);
        break;
      
      case 'message_update':
        // Streaming message chunks
        process.stdout.write(event.message.message);
        if (!event.isStreaming) {
          console.log('\n[Message complete]');
        }
        break;
      
      case 'tool_update':
        console.log(`Tool: ${event.toolCall.name} - ${event.status}`);
        break;
      
      case 'conversation_completed':
        console.log(`\nTotal tokens: ${event.usage?.tokens || 0}`);
        break;
    }
  },
  undefined,  // conversationId (optional - creates new if omitted)
  undefined,  // specification (optional - uses project default)
  [],         // tools (optional - for function calling)
  {}          // toolHandlers (optional - tool implementations)
);

Parameters

streamAgent

  • prompt (string): User's question or message

  • eventHandler (function): Callback for streaming events

    • Called for each event (message chunks, tool calls, completion)

    • Must be async function

  • conversationId (string): Optional conversation ID

    • If omitted: Creates new conversation

    • If provided: Continues existing conversation

  • specification (EntityReferenceInput): Optional LLM configuration

  • tools (ToolDefinitionInput[]): Tools for function calling

  • toolHandlers (Record<string, Function>): Tool implementations

Response (via Events)

Stream events sent to eventHandler:

conversation_started

{
  type: 'conversation_started',
  conversationId: string  // ID of conversation (new or existing)
}

message_update

{
  type: 'message_update',
  message: {
    message: string;      // Message text (chunk or full)
    isStreaming: boolean; // true = chunk, false = complete
    role: 'ASSISTANT',
    tokens?: number,      // Token count (when complete)
    model?: string,       // Model used
    throughput?: number   // Tokens per second
  },
  isStreaming: boolean    // Same as message.isStreaming
}

tool_update

{
  type: 'tool_update',
  toolCall: {
    id: string,
    name: string,       // Tool name
    arguments: any      // Tool parameters
  },
  status: 'preparing' | 'executing' | 'completed' | 'failed',
  result?: any,         // Tool result (when completed)
  error?: string        // Error message (when failed)
}

conversation_completed

{
  type: 'conversation_completed',
  conversationId: string,
  usage?: {
    tokens: number,          // Total tokens
    completionTokens: number,
    promptTokens: number,
    cost?: number           // USD cost
  }
}

Developer Hints

Streaming vs Non-Streaming

Feature
streamAgent
promptConversation

Real-time

Token-by-token

Wait for complete

Tool calling

Supported

Use promptAgent

Use case

Chat UI, streaming

Simple Q&A

Complexity

Higher (event handling)

Lower (single response)

// Use streamAgent when:
// - Building chat UI with streaming
// - Need tool calling
// - Want real-time progress

// Use promptConversation when:
// - Simple Q&A
// - No tool calling needed
// - Don't need streaming

Event Handler Must Be Async

// Correct: async function
await graphlit.streamAgent(prompt, async (event) => {
  // Can await operations
  await saveEventToDatabase(event);
});

// Wrong: sync function
await graphlit.streamAgent(prompt, (event) => {  // Missing async
  // Cannot await
});

Message Streaming Pattern

Messages stream in chunks, then complete:

await graphlit.streamAgent(prompt, async (event) => {
  if (event.type === 'message_update') {
    if (event.isStreaming) {
      // Streaming chunk
      process.stdout.write(event.message.message);
    } else {
      // Final complete message
      console.log('\n[Complete]');
      console.log(`Total tokens: ${event.message.tokens}`);
    }
  }
});

🛠 Tool Calling Lifecycle

Tools go through: preparing → executing → completed/failed:

await graphlit.streamAgent(
  'Search for information about AI',
  async (event) => {
    if (event.type === 'tool_update') {
      switch (event.status) {
        case 'preparing':
          console.log(` Preparing tool: ${event.toolCall.name}`);
          break;
        case 'executing':
          console.log(`⚙ Executing tool: ${event.toolCall.name}`);
          break;
        case 'completed':
          console.log(` Tool complete: ${event.toolCall.name}`);
          break;
        case 'failed':
          console.log(` Tool failed: ${event.error}`);
          break;
      }
    }
  },
  undefined,
  undefined,
  tools,         // Tool definitions
  toolHandlers   // Tool implementations
);

Variations

1. Basic Streaming Chat

Simple streaming without tools:

await graphlit.streamAgent(
  'Explain quantum computing in simple terms',
  async (event) => {
    if (event.type === 'message_update') {
      // Write each chunk as it arrives
      process.stdout.write(event.message.message);
      
      if (!event.isStreaming) {
        console.log('\n--- End of message ---');
      }
    }
  }
);

2. Multi-Turn Streaming Conversation

Continue conversation across multiple prompts:

let conversationId: string | undefined;

// First turn
await graphlit.streamAgent(
  'What is machine learning?',
  async (event) => {
    if (event.type === 'conversation_started') {
      conversationId = event.conversationId;
      console.log(`Started conversation: ${conversationId}`);
    }
    if (event.type === 'message_update' && !event.isStreaming) {
      console.log(event.message.message);
    }
  }
);

// Second turn (with context from first)
await graphlit.streamAgent(
  'Can you give an example?',
  async (event) => {
    if (event.type === 'message_update' && !event.isStreaming) {
      console.log(event.message.message);
    }
  },
  conversationId  // Same conversation for context
);

3. Streaming with Tool Calling

Implement tools for function calling:

// Define tools
const tools: ToolDefinitionInput[] = [
  {
    name: 'searchDocuments',
    description: 'Search through ingested documents',
    schema: JSON.stringify({
      type: 'object',
      properties: {
        query: { type: 'string', description: 'Search query' }
      },
      required: ['query']
    })
  }
];

// Implement tool handlers
const toolHandlers: Record<string, Function> = {
  searchDocuments: async (args: { query: string }) => {
    const results = await graphlit.queryContents({
      search: args.query,
      limit: 5
    });
    
    return {
      results: results.contents.results.map(c => ({
        name: c.name,
        summary: c.summary
      }))
    };
  }
};

// Stream with tools
await graphlit.streamAgent(
  'Find information about API rate limiting',
  async (event) => {
    if (event.type === 'tool_update') {
      console.log(`[Tool] ${event.toolCall.name}: ${event.status}`);
    }
    if (event.type === 'message_update' && !event.isStreaming) {
      console.log(`[AI] ${event.message.message}`);
    }
  },
  undefined,
  undefined,
  tools,
  toolHandlers
);

4. Streaming with Custom Model

Use specific LLM:

// Create specification
const specResponse = await graphlit.createSpecification({
  name: 'GPT-4 Turbo',
  type: SpecificationTypes.Completion,
  serviceType: ModelServiceTypes.OpenAI,
  openAI: {
    model: OpenAIModels.Gpt_4_Turbo
  }
});

// Stream with custom model
await graphlit.streamAgent(
  'Write a detailed analysis',
  async (event) => {
    if (event.type === 'message_update') {
      process.stdout.write(event.message.message);
    }
  },
  undefined,
  { id: specResponse.createSpecification.id }  // Use custom model
);

5. Collect Full Message from Stream

Buffer chunks to get complete message:

let fullMessage = '';

await graphlit.streamAgent(
  'Summarize the document',
  async (event) => {
    if (event.type === 'message_update') {
      if (event.isStreaming) {
        fullMessage += event.message.message;
      } else {
        // Final message
        console.log('Complete message:', fullMessage);
        console.log('Tokens used:', event.message.tokens);
      }
    }
  }
);

6. Track Streaming Metrics

Monitor performance during streaming:

let startTime = Date.now();
let firstTokenTime: number | null = null;
let tokenCount = 0;

await graphlit.streamAgent(
  'Explain AI ethics',
  async (event) => {
    if (event.type === 'message_update') {
      if (event.isStreaming && !firstTokenTime) {
        firstTokenTime = Date.now();
        const ttft = firstTokenTime - startTime;
        console.log(`Time to first token: ${ttft}ms`);
      }
      
      if (!event.isStreaming) {
        const totalTime = Date.now() - startTime;
        tokenCount = event.message.tokens || 0;
        const throughput = (tokenCount / (totalTime / 1000)).toFixed(2);
        
        console.log(`\nMetrics:`);
        console.log(`  Total time: ${totalTime}ms`);
        console.log(`  Tokens: ${tokenCount}`);
        console.log(`  Throughput: ${throughput} tokens/sec`);
      }
    }
  }
);

Common Issues

Issue: Events not firing / no response Solution: Ensure eventHandler is async. Check for errors in console/logs.

Issue: Messages streaming but not in order Solution: This shouldn't happen. If it does, ensure you're not modifying state incorrectly in event handler.

Issue: Tool calls not executing Solution: Verify tools and toolHandlers are provided. Check tool schema is valid JSON.

Issue: "Streaming not supported" error Solution: Some models don't support streaming. Falls back to promptAgent automatically.

Issue: Conversation ID not available immediately Solution: Wait for conversation_started event to get conversation ID.

Issue: Multiple message_update events with same text Solution: Check isStreaming flag. Final message comes with isStreaming: false.

Production Example

SSE (Server-Sent Events) pattern:

// Server-side streaming to client
await graphlit.streamAgent(
  userPrompt,
  async (event: AgentStreamEvent) => {
    switch (event.type) {
      case 'conversation_started':
        conversationId = event.conversationId;
        // Send SSE event to client
        sendSSE({ type: 'started', conversationId: event.conversationId });
        break;
      
      case 'message_update':
        // Stream message chunks to client
        sendSSE({
          type: 'message',
          content: event.message.message,
          isComplete: !event.isStreaming
        });
        break;
      
      case 'tool_update':
        // Send tool status to client
        sendSSE({
          type: 'tool',
          toolName: event.toolCall.name,
          status: event.status,
          result: event.result
        });
        break;
      
      case 'conversation_completed':
        // Send completion event
        sendSSE({
          type: 'completed',
          tokens: event.usage?.tokens
        });
        break;
    }
  },
  conversationId,
  specificationId,
  tools,
  toolHandlers
);

Last updated

Was this helpful?