Observability

DSPy.rb provides comprehensive observability capabilities for production environments, including distributed tracing, metrics collection, logging, and integration with popular monitoring platforms.

Overview

The observability system offers:

  • Distributed Tracing: Track requests across DSPy modules and external services
  • Metrics Collection: Performance, accuracy, and business metrics
  • Structured Logging: Detailed operation logs with context
  • Multi-Platform Integration: OpenTelemetry, New Relic, Langfuse support
  • Custom Instrumentation: Add domain-specific observability
  • Real-time Monitoring: Live dashboards and alerting

Basic Configuration

Enable Instrumentation

DSPy.configure do |config|
  # Enable instrumentation
  config.instrumentation.enabled = true
  
  # Configure subscribers
  config.instrumentation.subscribers = [
    :logger,      # Structured logging
    :otel,        # OpenTelemetry tracing
    :newrelic,    # New Relic APM
    :langfuse     # LLM-specific observability
  ]
  
  # Sampling configuration
  config.instrumentation.sampling_rate = 1.0  # 100% in development
  
  # Timestamp format for OpenTelemetry compliance
  config.instrumentation.timestamp_format = DSPy::TimestampFormat::ISO8601
end

Production Configuration

DSPy.configure do |config|
  config.instrumentation.enabled = true
  
  # Production subscribers
  config.instrumentation.subscribers = [:otel, :newrelic, :custom_metrics]
  
  # Sampling for performance
  config.instrumentation.sampling_rate = 0.1  # 10% sampling in production
  
  # Performance settings
  config.instrumentation.async_processing = true
  config.instrumentation.buffer_size = 1000
  config.instrumentation.flush_interval = 30.seconds
  
  # Error handling
  config.instrumentation.error_reporting = true
  config.instrumentation.error_service = :sentry
  
  # Timestamp format for production monitoring
  config.instrumentation.timestamp_format = DSPy::TimestampFormat::UNIX_NANO
end

Smart Event Consolidation

DSPy.rb automatically reduces instrumentation noise by detecting when you’re using higher-level modules like ChainOfThought or ReAct. Instead of emitting redundant nested events, it only logs the top-level operation.

Real Examples

Direct Predict call emits detailed events:

event=lm_request timestamp=2025-07-04T13:46:24+02:00 provider=openai model=gpt-4o-mini status=success duration_ms=3.82
event=lm_tokens timestamp=2025-07-04T13:46:24+02:00 provider=openai model=gpt-4o-mini input_tokens=290 output_tokens=23 total_tokens=313
event=prediction timestamp=2025-07-04T13:46:24+02:00 signature=TestEventConsolidation status=success duration_ms=5.13

ChainOfThought call emits only the consolidated event:

event=chain_of_thought timestamp=2025-07-04T13:46:24+02:00 signature=TestQuestionAnswering status=success duration_ms=2.44

This cuts instrumentation noise significantly for nested operations while keeping full detail for direct calls.

Token Reporting Standardization

Both OpenAI and Anthropic providers report tokens using consistent field names:

event=lm_tokens timestamp=2025-07-04T13:46:24+02:00 provider=openai model=gpt-4o-mini input_tokens=290 output_tokens=23 total_tokens=313

The token fields are standardized as input_tokens, output_tokens, and total_tokens regardless of provider.

Timestamp Formats

Configure timestamp formats for different monitoring platforms:

# ISO8601 format (default)
config.instrumentation.timestamp_format = DSPy::TimestampFormat::ISO8601
# Example: timestamp=2025-07-04T13:46:24+02:00

# RFC3339 with nanosecond precision
config.instrumentation.timestamp_format = DSPy::TimestampFormat::RFC3339_NANO
# Example: timestamp=2025-07-04T13:46:24.367503000+0200

# Unix nanoseconds for high-precision monitoring
config.instrumentation.timestamp_format = DSPy::TimestampFormat::UNIX_NANO
# Example: timestamp_ns=1751629584365840896

Distributed Tracing

Automatic Instrumentation

DSPy automatically instruments all core operations:

# This code is automatically instrumented
classifier = DSPy::Predict.new(ClassifyText)
result = classifier.call(text: "Sample text")

# Generated trace includes:
# - dspy.predict.call
#   - dspy.lm.request
#   - dspy.validation.check
#   - dspy.result.format

Manual Instrumentation

class CustomProcessor < DSPy::Module
  def call(input)
    DSPy.tracer.in_span('custom_processor.call') do |span|
      # Add custom attributes
      span.set_attribute('input.length', input.length)
      span.set_attribute('processor.version', '2.1.0')
      
      # Process with nested spans
      validated_input = DSPy.tracer.in_span('validation') do
        validate_input(input)
      end
      
      result = DSPy.tracer.in_span('core_processing') do |core_span|
        core_span.set_attribute('complexity', assess_complexity(validated_input))
        process_core_logic(validated_input)
      end
      
      # Record custom metrics
      span.add_event('processing_completed', {
        'result.confidence' => result.confidence,
        'processing.duration' => Time.current - span.start_time
      })
      
      result
    end
  end
end

Correlation IDs

# Automatic correlation ID generation
DSPy.configure do |config|
  config.instrumentation.correlation_id.enabled = true
  config.instrumentation.correlation_id.header = 'X-Correlation-ID'
  config.instrumentation.correlation_id.generator = -> { SecureRandom.uuid }
end

# Manual correlation ID
DSPy.with_correlation_id('user-request-12345') do
  result = classifier.call(text: "User feedback text")
  # All nested operations will include this correlation ID
end

Metrics Collection

Built-in Metrics

DSPy automatically collects performance and accuracy metrics:

# Performance metrics
- dspy.prediction.duration
- dspy.prediction.token_usage
- dspy.prediction.cost
- dspy.lm.request.duration
- dspy.lm.request.input_tokens
- dspy.lm.request.output_tokens
- dspy.lm.request.total_tokens

# Accuracy metrics (when ground truth available)
- dspy.prediction.accuracy
- dspy.prediction.confidence_accuracy_correlation
- dspy.signature.field_accuracy

# Error metrics
- dspy.prediction.errors_total
- dspy.lm.errors_total
- dspy.validation.errors_total

Custom Metrics

class BusinessMetricsCollector
  include DSPy::Instrumentation::Metrics
  
  def initialize
    # Define custom metrics
    @user_satisfaction = histogram(
      'dspy.business.user_satisfaction',
      description: 'User satisfaction scores',
      buckets: [1, 2, 3, 4, 5]
    )
    
    @prediction_confidence = histogram(
      'dspy.prediction.confidence_distribution',
      description: 'Distribution of prediction confidence scores',
      buckets: [0.0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0]
    )
    
    @business_impact = counter(
      'dspy.business.decisions_automated',
      description: 'Number of business decisions automated by DSPy'
    )
  end
  
  def record_prediction_result(result, context = {})
    # Record confidence distribution
    @prediction_confidence.record(result.confidence)
    
    # Record business impact
    if context[:automated_decision]
      @business_impact.increment(
        tags: {
          decision_type: context[:decision_type],
          confidence_level: confidence_bucket(result.confidence)
        }
      )
    end
    
    # Record user satisfaction if available
    if context[:user_feedback]
      @user_satisfaction.record(
        context[:user_feedback][:satisfaction_score],
        tags: {
          prediction_category: result.category,
          confidence_level: confidence_bucket(result.confidence)
        }
      )
    end
  end
  
  private
  
  def confidence_bucket(confidence)
    case confidence
    when 0.0...0.3 then 'low'
    when 0.3...0.7 then 'medium'
    else 'high'
    end
  end
end

# Usage
metrics_collector = BusinessMetricsCollector.new

# In your application
result = classifier.call(text: "Customer feedback")
metrics_collector.record_prediction_result(
  result,
  context: {
    automated_decision: true,
    decision_type: 'customer_routing',
    user_feedback: { satisfaction_score: 4 }
  }
)

Platform Integrations

OpenTelemetry

# OpenTelemetry configuration
require 'opentelemetry/sdk'
require 'opentelemetry/exporter/otlp'

OpenTelemetry::SDK.configure do |c|
  c.service_name = 'dspy-application'
  c.service_version = '1.0.0'
  
  c.add_span_processor(
    OpenTelemetry::SDK::Trace::Export::BatchSpanProcessor.new(
      OpenTelemetry::Exporter::OTLP::Exporter.new(
        endpoint: ENV['OTEL_EXPORTER_OTLP_ENDPOINT']
      )
    )
  )
end

# DSPy will automatically use the configured tracer
DSPy.configure do |config|
  config.instrumentation.subscribers = [:otel]
  config.instrumentation.otel.tracer_name = 'dspy-ruby'
end

New Relic

# New Relic configuration
DSPy.configure do |config|
  config.instrumentation.subscribers = [:newrelic]
  
  config.instrumentation.newrelic.app_name = 'DSPy Application'
  config.instrumentation.newrelic.license_key = ENV['NEW_RELIC_LICENSE_KEY']
  
  # Custom attributes
  config.instrumentation.newrelic.custom_attributes = {
    'dspy.version' => DSPy::VERSION,
    'deployment.environment' => Rails.env
  }
end

# Custom New Relic events
class NewRelicDSPySubscriber < DSPy::Subscribers::NewRelicSubscriber
  def prediction_completed(event)
    super
    
    # Record custom New Relic event
    NewRelic::Agent.record_custom_event('DSPyPrediction', {
      signature: event.payload[:signature],
      confidence: event.payload[:result][:confidence],
      processing_time: event.payload[:duration],
      success: event.payload[:success]
    })
    
    # Add custom attributes to transaction
    NewRelic::Agent.add_custom_attributes({
      'dspy.prediction.confidence' => event.payload[:result][:confidence],
      'dspy.signature.name' => event.payload[:signature]
    })
  end
end

Langfuse (LLM Observability)

# Langfuse configuration for LLM-specific observability
DSPy.configure do |config|
  config.instrumentation.subscribers = [:langfuse]
  
  config.instrumentation.langfuse.public_key = ENV['LANGFUSE_PUBLIC_KEY']
  config.instrumentation.langfuse.secret_key = ENV['LANGFUSE_SECRET_KEY']
  config.instrumentation.langfuse.host = ENV['LANGFUSE_HOST']
  
  # LLM-specific tracking
  config.instrumentation.langfuse.track_tokens = true
  config.instrumentation.langfuse.track_costs = true
  config.instrumentation.langfuse.track_prompts = true
end

# Custom Langfuse traces
class LangfuseDSPySubscriber < DSPy::Subscribers::LangfuseSubscriber
  def lm_request_started(event)
    super
    
    # Create Langfuse generation
    @current_generation = @langfuse.generation(
      name: "dspy_#{event.payload[:signature]}_prediction",
      input: event.payload[:prompt],
      model: event.payload[:model],
      start_time: event.payload[:start_time]
    )
  end
  
  def lm_request_completed(event)
    super
    
    # Update Langfuse generation
    @current_generation.end(
      output: event.payload[:response],
      end_time: event.payload[:end_time],
      usage: {
        input_tokens: event.payload[:input_tokens],
        output_tokens: event.payload[:output_tokens],
        total_tokens: event.payload[:total_tokens]
      },
      level: event.payload[:success] ? 'INFO' : 'ERROR'
    )
  end
end

Logging

Structured Logging

# Configure structured logging
DSPy.configure do |config|
  config.logger = ActiveSupport::Logger.new(STDOUT)
  config.logger.formatter = DSPy::Logging::StructuredFormatter.new
  
  config.instrumentation.logger.level = :info
  config.instrumentation.logger.include_payloads = true
  config.instrumentation.logger.correlation_id = true
end

# Example log output
{
  "timestamp": "2024-01-15T10:30:45.123Z",
  "level": "INFO",
  "event": "dspy.prediction.completed",
  "correlation_id": "req-12345",
  "signature": "ClassifyText",
  "duration_ms": 245,
  "success": true,
  "result": {
    "confidence": 0.92,
    "category": "positive"
  },
  "metadata": {
    "model": "gpt-4o-mini",
    "tokens_used": 45,
    "cost": 0.001
  }
}

Log Sampling

# Configure log sampling for high-volume applications
DSPy.configure do |config|
  config.instrumentation.logger.sampling = {
    prediction_events: 0.1,     # Sample 10% of predictions
    lm_request_events: 0.05,    # Sample 5% of LM requests
    error_events: 1.0,          # Log all errors
    slow_requests: 1.0          # Log all slow requests (>2s)
  }
  
  # Conditional sampling
  config.instrumentation.logger.sampling_conditions = {
    low_confidence: ->(event) { 
      event.payload.dig(:result, :confidence) < 0.7 
    },
    high_value_users: ->(event) {
      event.payload.dig(:context, :user_tier) == 'premium'
    }
  }
end

Real-time Monitoring

Dashboards

class DSPyDashboard
  def initialize(metrics_store)
    @metrics = metrics_store
  end
  
  def generate_dashboard_data(time_range: 1.hour)
    {
      overview: {
        total_predictions: @metrics.count('dspy.prediction.total', time_range),
        average_accuracy: @metrics.average('dspy.prediction.accuracy', time_range),
        average_latency: @metrics.average('dspy.prediction.duration', time_range),
        error_rate: @metrics.rate('dspy.prediction.errors', time_range)
      },
      
      performance_trends: {
        accuracy_trend: @metrics.trend('dspy.prediction.accuracy', time_range, interval: 5.minutes),
        latency_trend: @metrics.trend('dspy.prediction.duration', time_range, interval: 5.minutes),
        throughput_trend: @metrics.trend('dspy.prediction.rate', time_range, interval: 5.minutes)
      },
      
      signature_breakdown: @metrics.group_by('dspy.prediction.accuracy', 'signature', time_range),
      
      model_usage: @metrics.group_by('dspy.lm.request.total', 'model', time_range),
      
      cost_analysis: {
        total_cost: @metrics.sum('dspy.prediction.cost', time_range),
        cost_by_model: @metrics.group_by('dspy.prediction.cost', 'model', time_range),
        cost_trend: @metrics.trend('dspy.prediction.cost', time_range, interval: 1.hour)
      }
    }
  end
end

Alerting

class DSPyAlerting
  def initialize(metrics_store, notification_service)
    @metrics = metrics_store
    @notifications = notification_service
    @alert_rules = []
  end
  
  def add_alert_rule(name, condition, notification_config)
    @alert_rules << {
      name: name,
      condition: condition,
      notification: notification_config,
      last_triggered: nil,
      cooldown: notification_config[:cooldown] || 10.minutes
    }
  end
  
  def check_alerts
    @alert_rules.each do |rule|
      next if in_cooldown?(rule)
      
      if rule[:condition].call(@metrics)
        trigger_alert(rule)
        rule[:last_triggered] = Time.current
      end
    end
  end
  
  def setup_default_alerts
    # High error rate alert
    add_alert_rule(
      'high_error_rate',
      ->(metrics) { 
        metrics.rate('dspy.prediction.errors', 5.minutes) > 0.05 
      },
      {
        channels: [:slack, :pagerduty],
        severity: :high,
        cooldown: 15.minutes
      }
    )
    
    # Low accuracy alert
    add_alert_rule(
      'accuracy_degradation',
      ->(metrics) {
        current = metrics.average('dspy.prediction.accuracy', 10.minutes)
        baseline = metrics.average('dspy.prediction.accuracy', 24.hours)
        current < baseline * 0.9  # 10% degradation
      },
      {
        channels: [:slack, :email],
        severity: :medium,
        cooldown: 30.minutes
      }
    )
    
    # High latency alert
    add_alert_rule(
      'high_latency',
      ->(metrics) {
        metrics.percentile('dspy.prediction.duration', 95, 5.minutes) > 2000  # 2 seconds
      },
      {
        channels: [:slack],
        severity: :medium,
        cooldown: 10.minutes
      }
    )
  end
  
  private
  
  def trigger_alert(rule)
    alert_data = {
      rule_name: rule[:name],
      severity: rule[:notification][:severity],
      triggered_at: Time.current,
      metrics_snapshot: capture_metrics_snapshot
    }
    
    rule[:notification][:channels].each do |channel|
      @notifications.send_alert(channel, alert_data)
    end
  end
end

Performance Monitoring

Custom Performance Tracking

class PerformanceTracker
  def initialize
    @performance_data = {}
    @benchmarks = {}
  end
  
  def track_operation(operation_name, &block)
    start_time = Time.current
    start_memory = memory_usage
    
    result = yield
    
    end_time = Time.current
    end_memory = memory_usage
    
    record_performance(operation_name, {
      duration: end_time - start_time,
      memory_delta: end_memory - start_memory,
      timestamp: start_time,
      success: !result.nil?
    })
    
    result
  rescue StandardError => e
    record_performance(operation_name, {
      duration: Time.current - start_time,
      memory_delta: memory_usage - start_memory,
      timestamp: start_time,
      success: false,
      error: e.class.name
    })
    
    raise
  end
  
  def benchmark_against_baseline(operation_name, baseline_percentile: 95)
    recent_performance = @performance_data[operation_name]&.last(100) || []
    return nil if recent_performance.empty?
    
    baseline = @benchmarks[operation_name]
    return nil unless baseline
    
    current_p95 = percentile(recent_performance.map { |p| p[:duration] }, baseline_percentile)
    baseline_p95 = baseline[:p95_duration]
    
    {
      current_p95: current_p95,
      baseline_p95: baseline_p95,
      performance_ratio: current_p95 / baseline_p95,
      regression: current_p95 > baseline_p95 * 1.2  # 20% regression threshold
    }
  end
  
  def establish_baseline(operation_name, sample_size: 100)
    recent_data = @performance_data[operation_name]&.last(sample_size) || []
    return false if recent_data.size < sample_size
    
    durations = recent_data.map { |p| p[:duration] }
    
    @benchmarks[operation_name] = {
      established_at: Time.current,
      sample_size: sample_size,
      mean_duration: durations.sum / durations.size,
      p50_duration: percentile(durations, 50),
      p95_duration: percentile(durations, 95),
      p99_duration: percentile(durations, 99)
    }
    
    true
  end
end

Configuration Management

Environment-Specific Observability

# Development configuration
if Rails.env.development?
  DSPy.configure do |config|
    config.instrumentation.enabled = true
    config.instrumentation.subscribers = [:logger]
    config.instrumentation.logger.level = :debug
    config.instrumentation.sampling_rate = 1.0
  end
end

# Staging configuration
if Rails.env.staging?
  DSPy.configure do |config|
    config.instrumentation.enabled = true
    config.instrumentation.subscribers = [:logger, :otel]
    config.instrumentation.logger.level = :info
    config.instrumentation.sampling_rate = 0.5
  end
end

# Production configuration
if Rails.env.production?
  DSPy.configure do |config|
    config.instrumentation.enabled = true
    config.instrumentation.subscribers = [:otel, :newrelic, :langfuse]
    config.instrumentation.logger.level = :warn
    config.instrumentation.sampling_rate = 0.1
    config.instrumentation.async_processing = true
    config.instrumentation.error_reporting = true
  end
end

Best Practices

1. Sampling Strategy

# Use intelligent sampling
DSPy.configure do |config|
  config.instrumentation.sampling_strategy = :intelligent
  config.instrumentation.sampling_rules = {
    # Always sample errors
    error_events: 1.0,
    
    # Always sample slow requests
    slow_requests: { threshold: 2.seconds, rate: 1.0 },
    
    # Sample based on confidence
    low_confidence: { 
      condition: ->(event) { event.payload.dig(:result, :confidence) < 0.7 },
      rate: 0.5
    },
    
    # Sample high-value operations more
    important_signatures: {
      condition: ->(event) { ['CriticalClassifier', 'SecurityCheck'].include?(event.payload[:signature]) },
      rate: 0.3
    },
    
    # Default sampling
    default: 0.1
  }
end

2. Context Preservation

# Preserve context across async operations
class AsyncProcessor
  def process_batch(items)
    current_context = DSPy.current_trace_context
    
    items.map do |item|
      Async do
        DSPy.with_trace_context(current_context) do
          process_item(item)
        end
      end
    end.map(&:wait)
  end
end

3. Custom Metrics for Business Value

# Track business outcomes, not just technical metrics
class BusinessOutcomeTracker
  def track_automation_success(prediction, actual_outcome)
    # Track prediction accuracy
    accuracy = prediction.matches?(actual_outcome) ? 1.0 : 0.0
    
    DSPy.metrics.histogram('business.automation.accuracy').record(accuracy)
    
    # Track business impact
    if prediction.automated_decision?
      DSPy.metrics.counter('business.decisions.automated').increment
      
      if accuracy == 1.0
        DSPy.metrics.counter('business.decisions.successful').increment
      end
    end
    
    # Track cost savings
    if prediction.replaced_human_decision?
      estimated_savings = calculate_cost_savings(prediction)
      DSPy.metrics.histogram('business.cost_savings').record(estimated_savings)
    end
  end
end

4. Proactive Monitoring

# Set up proactive health checks
class DSPyHealthChecker
  def run_health_checks
    checks = [
      check_model_availability,
      check_prediction_accuracy,
      check_response_times,
      check_error_rates
    ]
    
    overall_health = checks.all? { |check| check[:healthy] }
    
    DSPy.metrics.gauge('dspy.health.overall').set(overall_health ? 1 : 0)
    
    checks.each do |check|
      DSPy.metrics.gauge("dspy.health.#{check[:name]}").set(check[:healthy] ? 1 : 0)
    end
    
    overall_health
  end
end

Comprehensive observability is essential for production DSPy applications. Use these tools and patterns to maintain visibility into your system’s performance, accuracy, and business impact.