Files
Max dml 7e5abd504f feat: add DBOS skills for TypeScript, Python, and Go (#94)
Add three DBOS SDK skills with reference documentation for building
reliable, fault-tolerant applications with durable workflows.

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-17 23:26:51 +01:00

2.3 KiB

title, impact, impactDescription, tags
title impact impactDescription tags
Use Streams for Real-Time Data MEDIUM Enables streaming results from long-running workflows communication, stream, real-time, channel

Use Streams for Real-Time Data

Workflows can stream data to clients in real-time using dbos.WriteStream, dbos.CloseStream, and dbos.ReadStream/dbos.ReadStreamAsync. Useful for LLM output streaming or progress reporting.

Incorrect (accumulating results then returning at end):

func processWorkflow(ctx dbos.DBOSContext, items []string) ([]string, error) {
	var results []string
	for _, item := range items {
		result, _ := dbos.RunAsStep(ctx, func(ctx context.Context) (string, error) {
			return processItem(item)
		}, dbos.WithStepName("process"))
		results = append(results, result)
	}
	return results, nil // Client must wait for entire workflow to complete
}

Correct (streaming results as they become available):

func processWorkflow(ctx dbos.DBOSContext, items []string) (string, error) {
	for _, item := range items {
		result, err := dbos.RunAsStep(ctx, func(ctx context.Context) (string, error) {
			return processItem(item)
		}, dbos.WithStepName("process"))
		if err != nil {
			return "", err
		}
		dbos.WriteStream(ctx, "results", result)
	}
	dbos.CloseStream(ctx, "results") // Signal completion
	return "done", nil
}

// Read the stream synchronously (blocks until closed)
handle, _ := dbos.RunWorkflow(ctx, processWorkflow, items)
values, closed, err := dbos.ReadStream[string](ctx, handle.GetWorkflowID(), "results")

Async stream reading with channels:

ch, err := dbos.ReadStreamAsync[string](ctx, handle.GetWorkflowID(), "results")
if err != nil {
	log.Fatal(err)
}
for sv := range ch {
	if sv.Err != nil {
		log.Fatal(sv.Err)
	}
	if sv.Closed {
		break
	}
	fmt.Println("Received:", sv.Value)
}

Key behaviors:

  • A workflow may have any number of streams, each identified by a unique key
  • Streams are immutable and append-only
  • Writes from workflows happen exactly-once
  • Streams are automatically closed when the workflow terminates
  • ReadStream blocks until the workflow is inactive or the stream is closed
  • ReadStreamAsync returns a channel of StreamValue[R] for non-blocking reads

Reference: Workflow Streaming