Running Pipelines
Starting the Ductwork Process
Section titled “Starting the Ductwork Process”If you recall, after running the Rails generator you’ll have a new binstub at bin/ductwork. This executable launches the supervisor process, which orchestrates your entire pipeline infrastructure—forking a single pipeline advancer and a job worker for each configured pipeline. Both the pipeline advancer and job worker create multiple threads to handle concurrent work.
The supervisor continuously monitors child processes via heartbeat checks. When a child process misses heartbeats for 5 minutes—indicating a crash or hang—the supervisor immediately spawns a replacement process to ensure minimal pipeline execution interruption.
Basic Usage
Section titled “Basic Usage”bin/ductworkCustom Configuration
Section titled “Custom Configuration”Use the -c or --config flag to specify a custom YAML configuration file. This lets you run multiple Ductwork instances with different pipeline configurations and scaling settings:
bin/ductwork -c config/ductwork.ymlbin/ductwork --config config/ductwork.0.ymlPro tip: Run separate Ductwork instances with different configurations to isolate high-priority pipelines or scale specific workloads independently.
Triggering Pipelines from Your Code
Section titled “Triggering Pipelines from Your Code”With Ductwork running, you can trigger your pipelines from anywhere in your Rails application. The .trigger method enqueues the pipeline and returns a Ductwork::Pipeline instance that you can query for status, progress, or results. The .trigger method takes a single argument that will be passed to the first step.
Example: Rake Task
Section titled “Example: Rake Task”task enrich_user_data: :environment do pipeline = EnrichAllUsersDataPipeline.trigger(7) puts "Pipeline #{pipeline.id} started!"endExample: Controller Action
Section titled “Example: Controller Action”class DataEnrichmentController < ApplicationController def create days_outdated = params[:days_outdated] || 7 pipeline = EnrichAllUsersDataPipeline.trigger(days_outdated)
render json: { pipeline_id: pipeline.id, status: pipeline.status } endendExample: Scheduled Task
Section titled “Example: Scheduled Task”# config/schedule.rb (using whenever gem)every 1.day, at: '3:00 am' do runner "EnrichAllUsersDataPipeline.trigger(7)"endMonitoring Your Pipeline
Section titled “Monitoring Your Pipeline”The returned pipeline instance gives you real-time access to pipeline state:
pipeline = EnrichAllUsersDataPipeline.trigger(7)
pipeline.id #=> "123"pipeline.status #=> "in_progress", "completed", "failed", etc.pipeline.steps #=> Collection of all steps in the pipelinepipeline.created_at #=> TimestampThat’s it! Your pipelines are now running and processing work.