Multi-stage Pipelines
DSPy.rb supports building complex, multi-stage pipelines by composing multiple DSPy modules together. You can create sequential workflows, conditional processing, and reusable pipeline components for sophisticated LLM applications.
Overview
DSPy.rb enables pipeline creation through:
- Module Composition: Combine multiple DSPy::Module instances
- Sequential Processing: Chain operations in order
- Data Flow: Pass results between pipeline stages
- Error Handling: Graceful failure management
- Reusable Components: Build modular, testable pipelines
Basic Pipeline Concepts
Module-Based Architecture
# Individual modules for each stage
class DocumentClassificationSignature < DSPy::Signature
description "Classify document type"
input { const :content, String }
output { const :document_type, String }
end
class DocumentClassifier < DSPy::Module
def initialize
super
@predictor = DSPy::Predict.new(DocumentClassificationSignature)
end
def forward(content:)
@predictor.call(content: content)
end
end
class SummaryGenerationSignature < DSPy::Signature
description "Generate document summary"
input { const :content, String }
output { const :summary, String }
end
class SummaryGenerator < DSPy::Module
def initialize
super
@predictor = DSPy::Predict.new(SummaryGenerationSignature)
end
def forward(content:)
@predictor.call(content: content)
end
end
Sequential Pipeline
class DocumentProcessor < DSPy::Module
def initialize
super
@classifier = DocumentClassifier.new
@summarizer = SummaryGenerator.new
end
def forward(content:)
# Stage 1: Classify document
classification = @classifier.call(content: content)
# Stage 2: Generate summary
summary = @summarizer.call(content: content)
# Return combined results
{
document_type: classification.document_type,
summary: summary.summary,
original_length: content.length,
summary_length: summary.summary.length
}
end
end
# Usage
processor = DocumentProcessor.new
result = processor.call(content: "Long document content...")
puts "Type: #{result[:document_type]}"
puts "Summary: #{result[:summary]}"
Advanced Pipeline Patterns
Conditional Processing
class AdaptiveDocumentProcessor < DSPy::Module
def initialize
super
@classifier = DocumentClassifier.new
@technical_summarizer = TechnicalSummarizer.new
@general_summarizer = GeneralSummarizer.new
@legal_analyzer = LegalAnalyzer.new
end
def forward(content:)
# Stage 1: Classify
classification = @classifier.call(content: content)
doc_type = classification.document_type
# Stage 2: Conditional processing based on type
case doc_type.downcase
when 'technical'
summary = @technical_summarizer.call(content: content)
analysis = nil
when 'legal'
summary = @general_summarizer.call(content: content)
analysis = @legal_analyzer.call(content: content)
else
summary = @general_summarizer.call(content: content)
analysis = nil
end
# Combine results
result = {
document_type: doc_type,
summary: summary.summary
}
result[:legal_analysis] = analysis if analysis
result
end
end
Pipeline with Error Handling
class RobustPipeline < DSPy::Module
def initialize
super
@stages = [
DocumentClassifier.new,
SummaryGenerator.new,
KeywordExtractor.new
]
end
def forward(content:)
results = { input_length: content.length }
current_content = content
@stages.each_with_index do |stage, index|
begin
case index
when 0 # Classification
result = stage.call(content: current_content)
results[:document_type] = result.document_type
when 1 # Summarization
result = stage.call(content: current_content)
results[:summary] = result.summary
current_content = result.summary # Use summary for next stage
when 2 # Keyword extraction
result = stage.call(content: current_content)
results[:keywords] = result.keywords
end
rescue => e
results[:errors] ||= []
results[:errors] << {
stage: index,
stage_name: stage.class.name,
error: e.message
}
# Continue with original content if stage fails
current_content = content
end
end
results
end
end
Data Transformation Pipeline
class EmailProcessor < DSPy::Module
def initialize
super
@spam_detector = SpamDetector.new
@sentiment_analyzer = SentimentAnalyzer.new
@priority_classifier = PriorityClassifier.new
@response_generator = ResponseGenerator.new
end
def forward(email_content:, sender:)
pipeline_data = {
content: email_content,
sender: sender,
processing_steps: []
}
# Stage 1: Spam detection
spam_result = @spam_detector.call(content: email_content)
pipeline_data[:is_spam] = spam_result.is_spam
pipeline_data[:processing_steps] << "spam_detection"
# Skip further processing if spam
return pipeline_data if pipeline_data[:is_spam]
# Stage 2: Sentiment analysis
sentiment_result = @sentiment_analyzer.call(content: email_content)
pipeline_data[:sentiment] = sentiment_result.sentiment
pipeline_data[:processing_steps] << "sentiment_analysis"
# Stage 3: Priority classification
priority_result = @priority_classifier.call(
content: email_content,
sentiment: pipeline_data[:sentiment]
)
pipeline_data[:priority] = priority_result.priority
pipeline_data[:processing_steps] << "priority_classification"
# Stage 4: Generate response if high priority
if priority_result.priority == "high"
response_result = @response_generator.call(
content: email_content,
sentiment: pipeline_data[:sentiment]
)
pipeline_data[:suggested_response] = response_result.response
pipeline_data[:processing_steps] << "response_generation"
end
pipeline_data
end
end
Parallel Processing Simulation
class ParallelAnalysisPipeline < DSPy::Module
def initialize
super
@analyzers = {
sentiment: SentimentAnalyzer.new,
topics: TopicExtractor.new,
entities: EntityExtractor.new,
readability: ReadabilityAnalyzer.new
}
end
def forward(content:)
# Simulate parallel processing with sequential calls
# In a real implementation, you might use threads or async processing
results = {}
errors = {}
@analyzers.each do |name, analyzer|
begin
start_time = Time.now
result = analyzer.call(content: content)
duration = Time.now - start_time
results[name] = {
result: result,
processing_time: duration
}
rescue => e
errors[name] = e.message
end
end
{
content_length: content.length,
analysis_results: results,
errors: errors,
total_analyzers: @analyzers.size,
successful_analyzers: results.size,
failed_analyzers: errors.size
}
end
end
Pipeline Optimization
Caching Pipeline Results
class CachedPipeline < DSPy::Module
def initialize(base_pipeline)
super
@base_pipeline = base_pipeline
@cache = {}
end
def forward(**inputs)
cache_key = generate_cache_key(inputs)
if @cache.key?(cache_key)
puts "Cache hit for #{cache_key[0..10]}..."
return @cache[cache_key]
end
puts "Cache miss, processing..."
result = @base_pipeline.call(**inputs)
# Store in cache (in production, consider cache size limits)
@cache[cache_key] = result
result
end
private
def generate_cache_key(inputs)
# Simple cache key generation
Digest::SHA256.hexdigest(inputs.to_s)
end
end
# Usage
base_processor = DocumentProcessor.new
cached_processor = CachedPipeline.new(base_processor)
# First call processes normally
result1 = cached_processor.call(content: document)
# Second call with same content uses cache
result2 = cached_processor.call(content: document)
Performance Monitoring
class MonitoredPipeline < DSPy::Module
def initialize(base_pipeline)
super
@base_pipeline = base_pipeline
@metrics = {
total_calls: 0,
total_time: 0.0,
errors: 0
}
end
def forward(**inputs)
@metrics[:total_calls] += 1
start_time = Time.now
begin
result = @base_pipeline.call(**inputs)
# Add performance metadata
duration = Time.now - start_time
@metrics[:total_time] += duration
result_with_metrics = result.dup
result_with_metrics[:performance] = {
processing_time: duration,
average_time: @metrics[:total_time] / @metrics[:total_calls],
call_number: @metrics[:total_calls]
}
result_with_metrics
rescue => e
@metrics[:errors] += 1
duration = Time.now - start_time
@metrics[:total_time] += duration
raise e
end
end
def stats
{
total_calls: @metrics[:total_calls],
total_time: @metrics[:total_time],
average_time: @metrics[:total_calls] > 0 ? @metrics[:total_time] / @metrics[:total_calls] : 0,
error_rate: @metrics[:total_calls] > 0 ? @metrics[:errors].to_f / @metrics[:total_calls] : 0,
errors: @metrics[:errors]
}
end
end
# Usage
base_pipeline = DocumentProcessor.new
monitored_pipeline = MonitoredPipeline.new(base_pipeline)
# Process documents
results = documents.map { |doc| monitored_pipeline.call(content: doc) }
# Check performance
puts "Pipeline stats: #{monitored_pipeline.stats}"
Complex Pipeline Example
class ContentAnalysisPipeline < DSPy::Module
def initialize
super
# Initialize all pipeline stages
@content_classifier = ContentClassifier.new
@language_detector = LanguageDetector.new
@sentiment_analyzer = SentimentAnalyzer.new
@topic_extractor = TopicExtractor.new
@summarizer = ContentSummarizer.new
@quality_assessor = QualityAssessor.new
end
def forward(content:, metadata: {})
analysis = {
timestamp: Time.now,
input_metadata: metadata,
processing_chain: []
}
# Stage 1: Basic content analysis
begin
classification = @content_classifier.call(content: content)
analysis[:content_type] = classification.content_type
analysis[:processing_chain] << :content_classification
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :content_classification, error: e.message }
end
# Stage 2: Language detection
begin
language = @language_detector.call(content: content)
analysis[:language] = language.language
analysis[:language_confidence] = language.confidence
analysis[:processing_chain] << :language_detection
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :language_detection, error: e.message }
analysis[:language] = 'unknown'
end
# Stage 3: Sentiment analysis (only for certain content types)
if ['article', 'review', 'social_post'].include?(analysis[:content_type])
begin
sentiment = @sentiment_analyzer.call(content: content)
analysis[:sentiment] = sentiment.sentiment
analysis[:sentiment_score] = sentiment.confidence
analysis[:processing_chain] << :sentiment_analysis
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :sentiment_analysis, error: e.message }
end
end
# Stage 4: Topic extraction
begin
topics = @topic_extractor.call(content: content)
analysis[:topics] = topics.topics
analysis[:topic_confidence] = topics.confidence
analysis[:processing_chain] << :topic_extraction
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :topic_extraction, error: e.message }
end
# Stage 5: Summarization (for long content)
if content.length > 1000
begin
summary = @summarizer.call(content: content)
analysis[:summary] = summary.summary
analysis[:summary_ratio] = summary.summary.length.to_f / content.length
analysis[:processing_chain] << :summarization
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :summarization, error: e.message }
end
end
# Stage 6: Quality assessment
begin
quality = @quality_assessor.call(
content: content,
content_type: analysis[:content_type],
language: analysis[:language]
)
analysis[:quality_score] = quality.score
analysis[:quality_issues] = quality.issues
analysis[:processing_chain] << :quality_assessment
rescue => e
analysis[:errors] ||= []
analysis[:errors] << { stage: :quality_assessment, error: e.message }
end
# Add processing summary
analysis[:processing_summary] = {
total_stages: 6,
completed_stages: analysis[:processing_chain].size,
error_count: analysis[:errors]&.size || 0,
processing_time: Time.now - analysis[:timestamp]
}
analysis
end
end
# Usage
pipeline = ContentAnalysisPipeline.new
# Process a single document
result = pipeline.call(
content: "Long article content...",
metadata: { source: 'web', author: 'John Doe' }
)
puts "Content type: #{result[:content_type]}"
puts "Language: #{result[:language]}"
puts "Topics: #{result[:topics]}"
puts "Processing completed #{result[:processing_summary][:completed_stages]}/#{result[:processing_summary][:total_stages]} stages"
Best Practices
1. Modular Design
# Good: Each stage is a separate, testable module
class EmailTriagePipeline < DSPy::Module
def initialize
super
@spam_filter = SpamFilter.new # Focused responsibility
@categorizer = EmailCategorizer.new # Single concern
@prioritizer = PriorityAssigner.new # Clear purpose
end
end
# Good: Reusable components
urgent_filter = PriorityAssigner.new
customer_emails = urgent_filter.call(emails: customer_emails)
support_emails = urgent_filter.call(emails: support_emails)
2. Error Recovery
def forward_with_fallback(content:)
begin
# Try primary processing
advanced_analysis(content)
rescue AdvancedAnalysisError => e
# Fall back to basic processing
puts "Advanced analysis failed, using basic: #{e.message}"
basic_analysis(content)
rescue => e
# Final fallback
puts "All analysis failed: #{e.message}"
{ error: e.message, content_length: content.length }
end
end
3. Pipeline Testing
# Test individual stages
describe DocumentClassifier do
it "classifies technical documents" do
classifier = DocumentClassifier.new
result = classifier.call(content: "API documentation...")
expect(result.document_type).to eq("technical")
end
end
# Test full pipeline
describe DocumentProcessor do
it "processes documents end-to-end" do
processor = DocumentProcessor.new
result = processor.call(content: sample_document)
expect(result).to have_key(:document_type)
expect(result).to have_key(:summary)
expect(result[:summary]).not_to be_empty
end
end
4. Performance Considerations
# Avoid reprocessing
class EfficientPipeline < DSPy::Module
def forward(content:)
# Process once, use multiple times
base_analysis = analyze_content(content)
{
classification: classify_from_analysis(base_analysis),
sentiment: sentiment_from_analysis(base_analysis),
topics: topics_from_analysis(base_analysis)
}
end
end