Custom Toolsets
Custom toolsets allow you to extend agents with specialized capabilities by grouping related operations into cohesive tool collections. This guide covers advanced patterns for building production-ready toolsets.
Toolset Architecture
Base Toolset Structure
All custom toolsets inherit from DSPy::Tools::Toolset
:
class MyCustomToolset < DSPy::Tools::Toolset
extend T::Sig
# Set the toolset name (used as prefix for tool names)
toolset_name "my_custom"
# Declare tools with descriptions
tool :operation_one, description: "Performs the first operation"
tool :operation_two, description: "Performs the second operation"
# Initialize any required state
def initialize
@state = {}
end
# Implement tool methods with Sorbet signatures
sig { params(input: String).returns(String) }
def operation_one(input:)
# Implementation
end
sig { params(value: Integer, multiplier: T.nilable(Integer)).returns(Integer) }
def operation_two(value:, multiplier: nil)
# Implementation
end
end
Tool Method Requirements
- Keyword Arguments: All parameters must be keyword arguments
- Sorbet Signatures: Required for automatic schema generation
- Return Values: Must return serializable values (String, Integer, Hash, Array)
- Error Handling: Should handle errors gracefully
Production Toolset Examples
1. Simple Data Storage Toolset
class DataStorageToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "data"
tool :store_data, description: "Store data with a key"
tool :retrieve_data, description: "Retrieve data by key"
tool :list_keys, description: "List all stored keys"
tool :delete_data, description: "Delete data by key"
def initialize
@data_store = {}
end
sig { params(key: String, value: String).returns(String) }
def store_data(key:, value:)
@data_store[key] = {
value: value,
stored_at: Time.now.iso8601
}
"Data stored successfully for key: #{key}"
rescue => e
"Error storing data: #{e.message}"
end
sig { params(key: String).returns(String) }
def retrieve_data(key:)
data = @data_store[key]
if data
{
key: key,
value: data[:value],
stored_at: data[:stored_at]
}.to_json
else
"No data found for key: #{key}"
end
rescue => e
"Error retrieving data: #{e.message}"
end
sig { returns(String) }
def list_keys
keys = @data_store.keys
{
keys: keys,
count: keys.length
}.to_json
rescue => e
"Error listing keys: #{e.message}"
end
sig { params(key: String).returns(String) }
def delete_data(key:)
if @data_store.delete(key)
"Data deleted successfully for key: #{key}"
else
"No data found for key: #{key}"
end
rescue => e
"Error deleting data: #{e.message}"
end
end
2. File System Operations Toolset
class FileSystemToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "fs"
tool :read_file, description: "Read contents of a file"
tool :write_file, description: "Write content to a file"
tool :list_directory, description: "List files in a directory"
tool :file_exists, description: "Check if a file exists"
tool :create_directory, description: "Create a directory"
tool :delete_file, description: "Delete a file"
tool :file_info, description: "Get file information"
def initialize(base_path:, allowed_extensions: [])
@base_path = File.expand_path(base_path)
@allowed_extensions = allowed_extensions
end
sig { params(file_path: String).returns(String) }
def read_file(file_path:)
full_path = validate_and_resolve_path(file_path)
raise "File does not exist" unless File.exist?(full_path)
raise "Path is not a file" unless File.file?(full_path)
File.read(full_path)
rescue => e
"Error reading file: #{e.message}"
end
sig { params(file_path: String, content: String).returns(String) }
def write_file(file_path:, content:)
full_path = validate_and_resolve_path(file_path)
validate_file_extension(full_path)
# Create directory if it doesn't exist
FileUtils.mkdir_p(File.dirname(full_path))
File.write(full_path, content)
"File written successfully: #{file_path}"
rescue => e
"Error writing file: #{e.message}"
end
sig { params(directory_path: String).returns(String) }
def list_directory(directory_path:)
full_path = validate_and_resolve_path(directory_path)
raise "Directory does not exist" unless File.exist?(full_path)
raise "Path is not a directory" unless File.directory?(full_path)
entries = Dir.entries(full_path).reject { |entry| entry.start_with?('.') }
file_info = entries.map do |entry|
entry_path = File.join(full_path, entry)
{
name: entry,
type: File.directory?(entry_path) ? "directory" : "file",
size: File.directory?(entry_path) ? nil : File.size(entry_path)
}
end
file_info.to_json
rescue => e
"Error listing directory: #{e.message}"
end
sig { params(file_path: String).returns(String) }
def file_exists(file_path:)
full_path = validate_and_resolve_path(file_path)
File.exist?(full_path) ? "true" : "false"
rescue => e
"Error checking file existence: #{e.message}"
end
sig { params(directory_path: String).returns(String) }
def create_directory(directory_path:)
full_path = validate_and_resolve_path(directory_path)
FileUtils.mkdir_p(full_path)
"Directory created: #{directory_path}"
rescue => e
"Error creating directory: #{e.message}"
end
sig { params(file_path: String).returns(String) }
def delete_file(file_path:)
full_path = validate_and_resolve_path(file_path)
raise "File does not exist" unless File.exist?(full_path)
File.delete(full_path)
"File deleted: #{file_path}"
rescue => e
"Error deleting file: #{e.message}"
end
sig { params(file_path: String).returns(String) }
def file_info(file_path:)
full_path = validate_and_resolve_path(file_path)
raise "File does not exist" unless File.exist?(full_path)
stat = File.stat(full_path)
info = {
path: file_path,
size: stat.size,
modified: stat.mtime.iso8601,
type: File.directory?(full_path) ? "directory" : "file",
permissions: stat.mode.to_s(8)
}
info.to_json
rescue => e
"Error getting file info: #{e.message}"
end
private
def validate_and_resolve_path(path)
# Resolve relative to base path
full_path = File.expand_path(path, @base_path)
# Security check: ensure path is within base path
unless full_path.start_with?(@base_path)
raise "Path outside allowed directory"
end
full_path
end
def validate_file_extension(full_path)
return if @allowed_extensions.empty?
extension = File.extname(full_path).downcase
unless @allowed_extensions.include?(extension)
raise "File extension not allowed: #{extension}"
end
end
end
3. HTTP API Client Toolset
class HttpApiToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "http"
tool :get_request, description: "Make a GET request to an API endpoint"
tool :post_request, description: "Make a POST request with JSON data"
tool :put_request, description: "Make a PUT request with JSON data"
tool :delete_request, description: "Make a DELETE request"
tool :head_request, description: "Make a HEAD request to check endpoint status"
def initialize(base_url:, api_key: nil, timeout: 30)
@base_url = base_url.chomp('/')
@api_key = api_key
@timeout = timeout
end
sig { params(endpoint: String, params: T::Hash[String, T.untyped]).returns(String) }
def get_request(endpoint:, params: {})
url = build_url(endpoint, params)
response = make_request(:get, url)
format_response(response)
rescue => e
"Error making GET request: #{e.message}"
end
sig { params(endpoint: String, data: T::Hash[String, T.untyped]).returns(String) }
def post_request(endpoint:, data:)
url = build_url(endpoint)
response = make_request(:post, url, data)
format_response(response)
rescue => e
"Error making POST request: #{e.message}"
end
sig { params(endpoint: String, data: T::Hash[String, T.untyped]).returns(String) }
def put_request(endpoint:, data:)
url = build_url(endpoint)
response = make_request(:put, url, data)
format_response(response)
rescue => e
"Error making PUT request: #{e.message}"
end
sig { params(endpoint: String).returns(String) }
def delete_request(endpoint:)
url = build_url(endpoint)
response = make_request(:delete, url)
format_response(response)
rescue => e
"Error making DELETE request: #{e.message}"
end
sig { params(endpoint: String).returns(String) }
def head_request(endpoint:)
url = build_url(endpoint)
response = make_request(:head, url)
{
status: response.code,
headers: response.headers,
content_length: response.headers['content-length']
}.to_json
rescue => e
"Error making HEAD request: #{e.message}"
end
private
def build_url(endpoint, params = {})
url = "#{@base_url}#{endpoint}"
unless params.empty?
query_string = params.map { |k, v| "#{k}=#{URI.encode_www_form_component(v)}" }.join('&')
url += "?#{query_string}"
end
url
end
def make_request(method, url, data = nil)
uri = URI(url)
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = uri.scheme == 'https'
http.read_timeout = @timeout
request = case method
when :get
Net::HTTP::Get.new(uri)
when :post
req = Net::HTTP::Post.new(uri)
req.body = data.to_json if data
req['Content-Type'] = 'application/json'
req
when :put
req = Net::HTTP::Put.new(uri)
req.body = data.to_json if data
req['Content-Type'] = 'application/json'
req
when :delete
Net::HTTP::Delete.new(uri)
when :head
Net::HTTP::Head.new(uri)
end
# Add authentication if available
request['Authorization'] = "Bearer #{@api_key}" if @api_key
response = http.request(request)
unless response.is_a?(Net::HTTPSuccess)
raise "HTTP #{response.code}: #{response.message}"
end
response
end
def format_response(response)
{
status: response.code.to_i,
headers: response.to_hash,
body: parse_response_body(response.body)
}.to_json
end
def parse_response_body(body)
return nil if body.nil? || body.empty?
JSON.parse(body)
rescue JSON::ParserError
body
end
end
4. Text Processing Toolset
class TextProcessingToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "text"
tool :extract_keywords, description: "Extract keywords from text"
tool :summarize_text, description: "Create a summary of text"
tool :count_words, description: "Count words in text"
tool :sentiment_analysis, description: "Analyze sentiment of text"
tool :extract_entities, description: "Extract named entities from text"
tool :clean_text, description: "Clean and normalize text"
def initialize
# Initialize any required processing tools
end
sig { params(text: String, max_keywords: Integer).returns(String) }
def extract_keywords(text:, max_keywords: 10)
words = text.downcase.split(/\W+/)
# Simple keyword extraction (frequency-based)
word_counts = words.each_with_object(Hash.new(0)) { |word, hash| hash[word] += 1 }
# Filter out common words (basic stopwords)
stopwords = %w[the and or but in on at to for of with by from]
keywords = word_counts.reject { |word, _| stopwords.include?(word) || word.length < 3 }
# Get top keywords
top_keywords = keywords.sort_by { |_, count| -count }.first(max_keywords)
{
keywords: top_keywords.map { |word, count| { word: word, frequency: count } },
total_words: words.length
}.to_json
rescue => e
"Error extracting keywords: #{e.message}"
end
sig { params(text: String, max_sentences: Integer).returns(String) }
def summarize_text(text:, max_sentences: 3)
sentences = text.split(/[.!?]+/).map(&:strip).reject(&:empty?)
return text if sentences.length <= max_sentences
# Simple extractive summarization (first, middle, last sentences)
summary_sentences = []
summary_sentences << sentences.first
if max_sentences > 2 && sentences.length > 2
middle_index = sentences.length / 2
summary_sentences << sentences[middle_index]
end
if max_sentences > 1 && sentences.length > 1
summary_sentences << sentences.last
end
{
summary: summary_sentences.join('. ') + '.',
original_sentences: sentences.length,
summary_sentences: summary_sentences.length
}.to_json
rescue => e
"Error summarizing text: #{e.message}"
end
sig { params(text: String).returns(String) }
def count_words(text:)
words = text.split(/\s+/)
characters = text.length
sentences = text.split(/[.!?]+/).length
paragraphs = text.split(/\n\s*\n/).length
{
words: words.length,
characters: characters,
sentences: sentences,
paragraphs: paragraphs
}.to_json
rescue => e
"Error counting words: #{e.message}"
end
sig { params(text: String).returns(String) }
def sentiment_analysis(text:)
# Simple rule-based sentiment analysis
positive_words = %w[good great excellent amazing wonderful fantastic happy love like]
negative_words = %w[bad terrible awful horrible hate dislike sad angry worse]
words = text.downcase.split(/\W+/)
positive_count = words.count { |word| positive_words.include?(word) }
negative_count = words.count { |word| negative_words.include?(word) }
score = positive_count - negative_count
sentiment = if score > 0
"positive"
elsif score < 0
"negative"
else
"neutral"
end
{
sentiment: sentiment,
score: score,
positive_words: positive_count,
negative_words: negative_count
}.to_json
rescue => e
"Error analyzing sentiment: #{e.message}"
end
sig { params(text: String).returns(String) }
def extract_entities(text:)
# Simple entity extraction using patterns
# Email pattern
emails = text.scan(/\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b/)
# URL pattern
urls = text.scan(/https?:\/\/[^\s]+/)
# Phone number pattern (basic)
phones = text.scan(/\b\d{3}-\d{3}-\d{4}\b/)
# Date pattern (basic)
dates = text.scan(/\b\d{1,2}\/\d{1,2}\/\d{4}\b/)
{
emails: emails,
urls: urls,
phone_numbers: phones,
dates: dates
}.to_json
rescue => e
"Error extracting entities: #{e.message}"
end
sig { params(text: String).returns(String) }
def clean_text(text:)
# Basic text cleaning
cleaned = text.dup
# Remove extra whitespace
cleaned.gsub!(/\s+/, ' ')
# Remove leading/trailing whitespace
cleaned.strip!
# Remove special characters (keep basic punctuation)
cleaned.gsub!(/[^\w\s.,!?;:'"()-]/, '')
{
original_length: text.length,
cleaned_length: cleaned.length,
cleaned_text: cleaned
}.to_json
rescue => e
"Error cleaning text: #{e.message}"
end
end
Advanced Patterns
1. Stateful Toolsets
class StatefulToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "stateful"
tool :start_session, description: "Start a new session"
tool :add_to_session, description: "Add data to current session"
tool :get_session_data, description: "Get all session data"
tool :end_session, description: "End current session"
def initialize
@sessions = {}
end
sig { params(session_id: String).returns(String) }
def start_session(session_id:)
@sessions[session_id] = {
started_at: Time.now,
data: {},
operations: []
}
"Session #{session_id} started"
end
sig { params(session_id: String, key: String, value: String).returns(String) }
def add_to_session(session_id:, key:, value:)
session = @sessions[session_id]
return "Session not found" unless session
session[:data][key] = value
session[:operations] << {
operation: "add",
key: key,
value: value,
timestamp: Time.now
}
"Added #{key} to session #{session_id}"
end
sig { params(session_id: String).returns(String) }
def get_session_data(session_id:)
session = @sessions[session_id]
return "Session not found" unless session
{
session_id: session_id,
started_at: session[:started_at],
data: session[:data],
operations_count: session[:operations].length
}.to_json
end
sig { params(session_id: String).returns(String) }
def end_session(session_id:)
session = @sessions.delete(session_id)
return "Session not found" unless session
"Session #{session_id} ended"
end
end
2. Async Toolsets
class AsyncToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "async"
tool :start_background_task, description: "Start a background task"
tool :check_task_status, description: "Check the status of a background task"
tool :get_task_result, description: "Get the result of a completed task"
def initialize
@tasks = {}
end
sig { params(task_id: String, operation: String, data: T::Hash[String, T.untyped]).returns(String) }
def start_background_task(task_id:, operation:, data:)
thread = Thread.new do
begin
# Simulate some work
sleep(2)
result = case operation
when "process_data"
{ processed: data, timestamp: Time.now }
when "calculate"
{ result: data.values.sum, timestamp: Time.now }
else
{ error: "Unknown operation" }
end
@tasks[task_id][:status] = "completed"
@tasks[task_id][:result] = result
@tasks[task_id][:completed_at] = Time.now
rescue => e
@tasks[task_id][:status] = "failed"
@tasks[task_id][:error] = e.message
end
end
@tasks[task_id] = {
status: "running",
started_at: Time.now,
thread: thread
}
"Task #{task_id} started"
end
sig { params(task_id: String).returns(String) }
def check_task_status(task_id:)
task = @tasks[task_id]
return "Task not found" unless task
{
task_id: task_id,
status: task[:status],
started_at: task[:started_at],
completed_at: task[:completed_at]
}.to_json
end
sig { params(task_id: String).returns(String) }
def get_task_result(task_id:)
task = @tasks[task_id]
return "Task not found" unless task
case task[:status]
when "running"
"Task is still running"
when "completed"
task[:result].to_json
when "failed"
"Task failed: #{task[:error]}"
else
"Unknown task status"
end
end
end
3. Validation and Security
class SecureToolset < DSPy::Tools::Toolset
extend T::Sig
toolset_name "secure"
tool :validate_input, description: "Validate input according to rules"
tool :sanitize_data, description: "Sanitize data for safe processing"
tool :encrypt_data, description: "Encrypt sensitive data"
tool :decrypt_data, description: "Decrypt encrypted data"
def initialize(encryption_key:)
@encryption_key = encryption_key
end
sig { params(data: String, rules: T::Array[String]).returns(String) }
def validate_input(data:, rules:)
errors = []
rules.each do |rule|
case rule
when "not_empty"
errors << "Data cannot be empty" if data.strip.empty?
when "max_length_100"
errors << "Data too long (max 100 chars)" if data.length > 100
when "alphanumeric"
errors << "Data must be alphanumeric" unless data.match?(/\A[a-zA-Z0-9\s]+\z/)
when "email"
errors << "Invalid email format" unless data.match?(/\A[^@\s]+@[^@\s]+\.[^@\s]+\z/)
end
end
{
valid: errors.empty?,
errors: errors,
data: data
}.to_json
end
sig { params(data: String).returns(String) }
def sanitize_data(data:)
sanitized = data.dup
# Remove potentially dangerous characters
sanitized.gsub!(/[<>]/, '')
# Escape special characters
sanitized.gsub!(/[&]/, '&')
sanitized.gsub!(/["]/, '"')
sanitized.gsub!(/[']/, ''')
# Limit length
sanitized = sanitized[0, 1000] if sanitized.length > 1000
{
original: data,
sanitized: sanitized,
changes_made: data != sanitized
}.to_json
end
sig { params(data: String).returns(String) }
def encrypt_data(data:)
# Simple encryption (in production, use proper encryption)
encrypted = Base64.encode64(data.bytes.map { |b| b ^ @encryption_key }.pack('C*'))
{
encrypted: encrypted.strip,
original_length: data.length
}.to_json
rescue => e
"Error encrypting data: #{e.message}"
end
sig { params(encrypted_data: String).returns(String) }
def decrypt_data(encrypted_data:)
# Simple decryption (in production, use proper decryption)
decoded = Base64.decode64(encrypted_data)
decrypted = decoded.bytes.map { |b| b ^ @encryption_key }.pack('C*')
{
decrypted: decrypted,
decrypted_length: decrypted.length
}.to_json
rescue => e
"Error decrypting data: #{e.message}"
end
end
Testing Custom Toolsets
Unit Testing
RSpec.describe DataStorageToolset do
let(:toolset) { described_class.new }
describe '#store_data' do
it 'stores data successfully' do
result = toolset.store_data(key: "test_key", value: "test_value")
expect(result).to include("stored successfully")
end
end
describe '#retrieve_data' do
it 'retrieves stored data' do
toolset.store_data(key: "test_key", value: "test_value")
result = toolset.retrieve_data(key: "test_key")
parsed = JSON.parse(result)
expect(parsed["key"]).to eq("test_key")
expect(parsed["value"]).to eq("test_value")
end
it 'handles missing keys gracefully' do
result = toolset.retrieve_data(key: "missing_key")
expect(result).to include("No data found")
end
end
describe '#list_keys' do
it 'lists all stored keys' do
toolset.store_data(key: "key1", value: "value1")
toolset.store_data(key: "key2", value: "value2")
result = toolset.list_keys
parsed = JSON.parse(result)
expect(parsed["keys"]).to contain_exactly("key1", "key2")
expect(parsed["count"]).to eq(2)
end
end
end
Integration Testing
RSpec.describe "Custom Toolset Integration" do
let(:toolset) { FileSystemToolset.new(base_path: "/tmp/test") }
let(:tools) { toolset.class.to_tools }
let(:agent) do
DSPy::ReAct.new(
TestSignature,
tools: tools,
max_iterations: 3
)
end
before do
FileUtils.mkdir_p("/tmp/test")
end
after do
FileUtils.rm_rf("/tmp/test")
end
it 'can use file operations in agent workflow' do
result = agent.call(
query: "Create a file called 'test.txt' with content 'Hello World'"
)
expect(result.answer).to include("file created")
expect(File.exist?("/tmp/test/test.txt")).to be true
end
end
Best Practices
1. Error Handling
# Good: Return error messages instead of raising exceptions
def risky_operation(data:)
validate_data(data)
result = perform_operation(data)
"Success: #{result}"
rescue ValidationError => e
"Validation failed: #{e.message}"
rescue => e
"Operation failed: #{e.message}"
end
# Bad: Let exceptions bubble up
def risky_operation(data:)
validate_data(data) # Could raise exception
perform_operation(data) # Could raise exception
end
2. Input Validation
# Good: Validate all inputs
def process_data(data:, options: {})
raise ArgumentError, "Data cannot be empty" if data.nil? || data.empty?
raise ArgumentError, "Options must be a hash" unless options.is_a?(Hash)
# Process data...
end
# Bad: Assume inputs are valid
def process_data(data:, options: {})
# Process data without validation
end
3. Resource Management
# Good: Clean up resources
def initialize(database_url:)
@connection = establish_connection(database_url)
end
def cleanup
@connection&.close
end
# Bad: Leave resources open
def initialize(database_url:)
@connection = establish_connection(database_url)
# No cleanup mechanism
end
4. Documentation
# Good: Clear tool descriptions
tool :complex_operation, description: "Performs complex data transformation on the input dataset"
# Bad: Vague descriptions
tool :complex_operation, description: "Does stuff"
Performance Considerations
1. Lazy Loading
class OptimizedToolset < DSPy::Tools::Toolset
def initialize
@expensive_resource = nil
end
private
def expensive_resource
@expensive_resource ||= create_expensive_resource
end
def create_expensive_resource
# Expensive initialization
end
end
2. Connection Pooling
class DatabaseToolset < DSPy::Tools::Toolset
def initialize(connection_string:, pool_size: 5)
@connection_pool = ConnectionPool.new(size: pool_size) do
establish_connection(connection_string)
end
end
def query(sql:, params: {})
@connection_pool.with do |connection|
connection.execute(sql, params)
end
end
end
3. Caching
class CachedToolset < DSPy::Tools::Toolset
def initialize
@cache = {}
end
def expensive_operation(data:)
cache_key = generate_cache_key(data)
@cache[cache_key] ||= perform_expensive_operation(data)
end
private
def generate_cache_key(data)
Digest::MD5.hexdigest(data.to_s)
end
end
Custom toolsets allow you to extend agents with domain-specific capabilities by grouping related operations together. By following these patterns and best practices, you can build robust, secure, and performant toolsets that integrate with DSPy.rb’s agent system.