Skip to content

Running Pipelines

If you recall, after running the Rails generator you’ll have a new binstub at bin/ductwork. This executable launches the supervisor process, which orchestrates your entire pipeline infrastructure—forking a single pipeline advancer and a job worker for each configured pipeline. Both the pipeline advancer and job worker create multiple threads to handle concurrent work.

The supervisor continuously monitors child processes via heartbeat checks. When a child process misses heartbeats for 5 minutes—indicating a crash or hang—the supervisor immediately spawns a replacement process to ensure minimal pipeline execution interruption.

Terminal window
bin/ductwork

Use the -c or --config flag to specify a custom YAML configuration file. This lets you run multiple Ductwork instances with different pipeline configurations and scaling settings:

Terminal window
bin/ductwork -c config/ductwork.yml
bin/ductwork --config config/ductwork.0.yml

Pro tip: Run separate Ductwork instances with different configurations to isolate high-priority pipelines or scale specific workloads independently.

With Ductwork running, you can trigger your pipelines from anywhere in your Rails application. The .trigger method enqueues the pipeline and returns a Ductwork::Pipeline instance that you can query for status, progress, or results. The .trigger method takes a single argument that will be passed to the first step.

task enrich_user_data: :environment do
pipeline = EnrichAllUsersDataPipeline.trigger(7)
puts "Pipeline #{pipeline.id} started!"
end
class DataEnrichmentController < ApplicationController
def create
days_outdated = params[:days_outdated] || 7
pipeline = EnrichAllUsersDataPipeline.trigger(days_outdated)
render json: {
pipeline_id: pipeline.id,
status: pipeline.status
}
end
end
# config/schedule.rb (using whenever gem)
every 1.day, at: '3:00 am' do
runner "EnrichAllUsersDataPipeline.trigger(7)"
end

The returned pipeline instance gives you real-time access to pipeline state:

pipeline = EnrichAllUsersDataPipeline.trigger(7)
pipeline.id #=> "123"
pipeline.status #=> "in_progress", "completed", "failed", etc.
pipeline.steps #=> Collection of all steps in the pipeline
pipeline.created_at #=> Timestamp

That’s it! Your pipelines are now running and processing work.