DSPy.rb + Sidekiq: Non-blocking LLM Processing in Production
How DSPy.rb's async architecture enables efficient background processing with Sidekiq, avoiding thread blocking during LLM API calls
Vicente Reig
Fractional Engineering Lead •
LLM API calls take 2-5 seconds each. In a Sidekiq worker processing hundreds of jobs, this can quickly exhaust your thread pool and create bottlenecks. DSPy.rb’s async architecture solves this by using Ruby’s async
gem for non-blocking I/O operations.
How DSPy.rb Handles Async Operations
LM#chat Uses Sync Blocks Internally
Every DSPy predictor call uses Sync
blocks for non-blocking HTTP requests:
# From lib/dspy/lm.rb - DSPy's internal implementation
def chat(inference_module, input_values, &block)
Sync do # Non-blocking fiber context
signature_class = inference_module.signature_class
# Build messages from inference module
messages = build_messages(inference_module, input_values)
# Execute with instrumentation - HTTP calls don't block threads
response = instrument_lm_request(messages, signature_class.name) do
chat_with_strategy(messages, signature_class, &block)
end
# Parse response
parse_response(response, input_values, signature_class)
end
end
When you call predictor.call()
, DSPy automatically wraps the HTTP request in a Sync
block. This means:
- HTTP requests don’t block threads - other work can proceed
- Fibers yield control during I/O operations
- Concurrent operations are possible within the same thread
The Sidekiq Threading Problem
Blocking Approach - Inefficient Thread Usage
# ❌ Blocking approach - ties up worker thread during API calls
class BlockingLLMProcessor
include Sidekiq::Worker
sidekiq_options concurrency: 5 # Only 5 workers can run
def perform(task_id)
# Each call blocks the worker thread for 2-5 seconds
classification = classifier.call(text: input_text) # Blocks ~2s
summary = summarizer.call(content: long_content) # Blocks ~3s
analysis = analyzer.call(data: classification) # Blocks ~2s
# Total: ~7s of blocked thread time per job
# With 5 workers = maximum 5 jobs can process concurrently
# Throughput: ~43 jobs/minute (5 workers * 60s / 7s per job)
save_results(classification, summary, analysis)
end
end
Non-blocking Approach - Efficient Resource Utilization
# ✅ Non-blocking approach - efficient thread utilization
class AsyncLLMProcessor
include Sidekiq::Worker
sidekiq_options concurrency: 5
def perform(task_id)
Async do |task|
# LLM calls can run concurrently, threads yield during I/O
classification_task = task.async { classifier.call(text: input_text) }
summary_task = task.async { summarizer.call(content: long_content) }
# Wait for dependencies
classification = classification_task.wait
analysis_task = task.async { analyzer.call(data: classification) }
# Collect results
summary = summary_task.wait
analysis = analysis_task.wait
# Total wall-clock time: ~3s (longest single operation)
# Worker threads can handle other jobs during I/O waits
# Throughput: ~100 jobs/minute (much higher due to better utilization)
save_results(classification, summary, analysis)
end.wait # Ensure completion before worker finishes
end
end
Real-World Example: Document Processing Pipeline
Here’s a complete example processing documents with multiple LLM operations:
require 'sidekiq'
class DocumentProcessor
include Sidekiq::Worker
sidekiq_options queue: 'document_processing', retry: 2
def perform(document_id)
document = Document.find(document_id)
document.update!(status: :processing)
# Process with concurrent LLM operations
result = Async do |task|
# Stage 1: Parallel extraction (independent operations)
title_task = task.async { title_extractor.call(content: document.content) }
keywords_task = task.async { keyword_extractor.call(content: document.content) }
category_task = task.async { categorizer.call(content: document.content) }
# Stage 2: Wait for extraction results
title = title_task.wait
keywords = keywords_task.wait
category = category_task.wait
# Stage 3: Dependent operations using extraction results
summary_task = task.async do
summarizer.call(
content: document.content,
title: title.title,
keywords: keywords.keywords
)
end
quality_task = task.async do
quality_checker.call(
title: title.title,
category: category.category,
content: document.content
)
end
# Wait for all results
summary = summary_task.wait
quality = quality_task.wait
{
title: title,
keywords: keywords,
category: category,
summary: summary,
quality_score: quality.score
}
end.wait
# Save results and update status
document.update!(
processed_title: result[:title].title,
extracted_keywords: result[:keywords].keywords.join(', '),
category: result[:category].category,
summary: result[:summary].summary,
quality_score: result[:quality_score],
status: :completed
)
rescue => e
document.update!(status: :failed, error_message: e.message)
raise # Let Sidekiq handle retry
end
private
# Memoize DSPy predictors to avoid recreation
def title_extractor
@title_extractor ||= DSPy::Predict.new(TitleExtractor)
end
def keyword_extractor
@keyword_extractor ||= DSPy::Predict.new(KeywordExtractor)
end
def categorizer
@categorizer ||= DSPy::Predict.new(DocumentCategorizer)
end
def summarizer
@summarizer ||= DSPy::ChainOfThought.new(DocumentSummarizer)
end
def quality_checker
@quality_checker ||= DSPy::Predict.new(QualityChecker)
end
end
Performance Comparison
Sequential vs Concurrent Processing
# Sequential approach (blocking)
start_time = Time.now
title = title_extractor.call(content: content) # 2s
keywords = keyword_extractor.call(content: content) # 2s
category = categorizer.call(content: content) # 2s
summary = summarizer.call(title: title, content: content) # 3s
# Total: 9 seconds wall-clock time
# Concurrent approach (non-blocking)
start_time = Time.now
result = Async do |task|
# Stage 1: Independent operations (parallel)
title_task = task.async { title_extractor.call(content: content) }
keywords_task = task.async { keyword_extractor.call(content: content) }
category_task = task.async { categorizer.call(content: content) }
# Stage 2: Dependent operation (waits for title)
title = title_task.wait
summary_task = task.async { summarizer.call(title: title, content: content) }
# Collect results
{
title: title,
keywords: keywords_task.wait,
category: category_task.wait,
summary: summary_task.wait
}
end.wait
# Total: ~3 seconds wall-clock time (longest single operation)
Sidekiq Configuration for DSPy.rb
Optimal Worker Configuration
# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
config.concurrency = 10 # Increase since workers don't block during I/O
end
# Separate queues by priority and resource requirements
Sidekiq.configure_client do |config|
config.default_queue_name = 'default'
end
# Queue configuration
class LLMProcessor
include Sidekiq::Worker
sidekiq_options queue: 'llm_processing', # LLM operations
retry: 3,
backtrace: true
end
class FastProcessor
include Sidekiq::Worker
sidekiq_options queue: 'fast_processing', # Quick operations
retry: 5
end
Monitoring Async Performance
# Add timing instrumentation to measure async benefits
class InstrumentedProcessor
include Sidekiq::Worker
def perform(task_id)
start_time = Time.now
result = Async do |task|
# Track individual operation times
operations = []
title_task = task.async do
op_start = Time.now
result = title_extractor.call(content: content)
operations << { operation: 'title_extraction', duration: Time.now - op_start }
result
end
# ... other operations
title_task.wait
end.wait
total_time = Time.now - start_time
# Log performance metrics
Sidekiq.logger.info("Processed #{task_id} in #{total_time}s with #{operations.length} LLM calls")
operations.each do |op|
Sidekiq.logger.info(" #{op[:operation]}: #{op[:duration]}s")
end
end
end
Best Practices
1. Design for Concurrency
Structure your DSPy pipelines to maximize concurrent operations:
# ✅ Good: Independent operations can run in parallel
Async do |task|
extract_task = task.async { extract_entities(document) }
classify_task = task.async { classify_document(document) }
entities = extract_task.wait
classification = classify_task.wait
end
# ❌ Less efficient: Sequential dependencies
classification = classify_document(document) # Must finish first
entities = extract_entities(document, classification) # Depends on classification
2. Memoize DSPy Objects
Create DSPy predictors once, reuse across jobs:
class EfficientProcessor
include Sidekiq::Worker
private
# ✅ Good: Memoized predictors
def summarizer
@summarizer ||= DSPy::ChainOfThought.new(Summarizer)
end
# ❌ Bad: Creating new instances every time
def summarizer
DSPy::ChainOfThought.new(Summarizer) # Expensive recreation
end
end
3. Handle Failures Gracefully
def perform(document_id)
Async do |task|
operations = [
task.async { safe_llm_call { title_extractor.call(content) } },
task.async { safe_llm_call { categorizer.call(content) } }
]
# Wait for all, handle partial failures
results = operations.map do |op_task|
begin
op_task.wait
rescue => e
Sidekiq.logger.warn("LLM operation failed: #{e.message}")
nil # Partial failure, continue processing
end
end
# Process non-nil results
results.compact.each { |result| save_result(result) }
end.wait
end
private
def safe_llm_call(&block)
retries = 0
begin
yield
rescue => e
retries += 1
if retries < 3
sleep(retries * 0.5) # Exponential backoff
retry
else
raise
end
end
end
Key Takeaways
DSPy.rb’s async architecture enables efficient background processing:
- Non-blocking I/O: Worker threads can handle other jobs during LLM API waits
- Concurrent operations: Multiple LLM calls can run simultaneously
- Better throughput: Significantly higher jobs/minute with proper async usage
- Resource efficiency: More work with the same thread pool size
Understanding these patterns is crucial for production DSPy.rb applications that need to process high volumes of LLM operations efficiently.
For more DSPy.rb production patterns, check out our production guide and observability documentation.