Defining Pipelines
A pipeline consists of steps and the transitions between them. Ductwork provides a simple DSL for defining these steps and transitions in a clear, readable way.
Creating a Pipeline Class
Section titled “Creating a Pipeline Class”Pipeline classes must live under app/pipelines to be discovered correctly. While class names don’t require a “Pipeline” suffix, adding one can help prevent naming collisions depending on your conventions.
Create a pipeline by inheriting from Ductwork::Pipeline:
class EnrichAllUsersData < Ductwork::Pipelineend
# Or with a suffix for clarity# app/pipelines/enrich_all_users_data_pipeline.rbclass EnrichAllUsersDataPipeline < Ductwork::PipelineendEach pipeline class automatically gets a default scope, allowing you to query for pipelines of that specific type:
:001> EnrichAllUsersDataPipeline.in_progress.to_sql#=> "SELECT \"ductwork_pipelines\".* FROM \"ductwork_pipelines\"# WHERE \"ductwork_pipelines\".\"klass\" = 'EnrichAllUsersDataPipeline'# AND \"ductwork_pipelines\".\"status\" = 'in_progress'"Defining Steps
Section titled “Defining Steps”Steps are Ruby objects that inherit from Ductwork::Step with a simple interface. They must live under app/steps and implement an #execute instance method that takes no arguments.
The initializer’s parameters depend on either:
- The arguments passed when the pipeline is triggered (for the first step)
- The return value from the previous step (for subsequent steps)
This simple interface makes steps highly testable without external dependencies. Like pipelines, step class names don’t require a “Step” suffix, though it may help with organization.
Example Step:
class QueryUsersRequiringEnrichment < Ductwork::Step def initialize(days_outdated) @days_outdated = days_outdated end
def execute ids = User.where("data_last_refreshed_at < ?", @days_outdated.days.ago).ids Rails.logger.info("Enriching #{ids.length} users' data")
# Return the collection of IDs to pass to the next step ids endendDesign philosophy: Steps represent checkpoints in your pipeline and should complete quickly. Break down long-running jobs into smaller work units by chaining steps together and passing data between them. The examples throughout this page demonstrate this approach.
Understanding Transitions
Section titled “Understanding Transitions”Transitions connect steps together. The key principle: the return value of one step becomes the input to the next step. This means you need to align the initializer’s arity between connected steps.
Important considerations:
- Align parameter counts between connected steps
- Use the splat operator (
*args) if you prefer treating everything as an array - Return values must be JSON-serializable
The Ductwork DSL uses a fluent interface pattern, enabling clean method chaining.
Transition Types
Section titled “Transition Types”start - Define the First Step
Section titled “start - Define the First Step”The start transition defines your pipeline’s entry point. This step receives the arguments passed to the .trigger method (see Running Pipelines).
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) endendexpand - Fan Out to Multiple Steps
Section titled “expand - Fan Out to Multiple Steps”The expand transition acts like a “foreach” loop. It takes the return value from the previous step (which must be a collection) and creates a new step instance for each element. All expanded steps may run concurrently, depending on your job worker scaling.
The return value must be JSON-serializable and respond to #each.
Syntax: expand(to: StepClass)
# Remember in our step class that we returned an array of IDs. Each scalar ID# will get passed to a new `LoadUserData` step.class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) endenddivide - Split Into Parallel Branches
Section titled “divide - Split Into Parallel Branches”The divide transition passes a copy of the return value to multiple steps simultaneously. Each step receives identical input and may run concurrently.
Syntax: divide(to: [StepClass1, StepClass2, ...])
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divide(to: [FetchDataFromSourceA, FetchDataFromSourceB]) endendBlock syntax: The divide transition accepts an optional block, yielding a branch for each step. This allows you to chain different transitions onto each branch independently:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(StepA) .divide(to: [StepB, StepC, StepD]) do |branch1, branch2, branch3| branch1.chain(to: StepD) branch2.expand(to: StepE) branch3.chain(to: StepF).chain(to: StepG) end endendcombine - Merge Branches Back Together
Section titled “combine - Merge Branches Back Together”The combine transition is the inverse of divide. It merges multiple branches into a single step, combining the return values from all previous steps into an array. The combined step waits for all previous steps to complete before starting.
Syntax: combine(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divide(to: [FetchDataFromSourceA, FetchDataFromSourceB]) .combine(into: CollateUserData) endendBlock syntax: For improved readability, you can call combine on any branch and pass the other branches as arguments:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(StepA).divide(to: [StepB, StepC, StepD]) do |branch1, branch2, branch3| branch1.combine(branch2, branch3, into: StepE) end endendNote: Calling combine on a pipeline that hasn’t been divided raises a Ductwork::DSL::DefinitionBuilder::CombineError.
chain - Connect Steps Sequentially
Section titled “chain - Connect Steps Sequentially”The chain transition is the simplest transition. It connects two steps sequentially—the second step only runs if the first succeeds. There’s no concurrency with chaining.
Syntax: chain(to: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divide(to: [FetchDataFromSourceA, FetchDataFromSourceB]) .combine(into: CollateUserData) .chain(to: UpdateUserData) endendcollapse - Gather Expanded Steps
Section titled “collapse - Gather Expanded Steps”The collapse transition is the inverse of expand. It gathers all steps from an expanded branch back into a single step, combining their return values into an array. Like combine, the collapsed step waits for all expanded steps to complete.
Syntax: collapse(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divide(to: [FetchDataFromSourceA, FetchDataFromSourceB]) .combine(into: CollateUserData) .chain(to: UpdateUserData) .collapse(into: ReportUserEnrichmentSuccess) endendNote: Calling collapse on a pipeline that hasn’t been expanded raises a Ductwork::DSL::DefinitionBuilder::CollapseError.
divert - Conditional Branching
Section titled “divert - Conditional Branching”The divert transition routes pipeline execution to different steps based on the return value of the previous step like a case/switch statement. Unlike divide, which sends the same input to all branches in parallel, divert selects only one branch to execute. The return value from the previous step is converted to a string and matched against the hash keys.
An otherwise key is always required as a fallback when no keys match.
Syntax: divert(to: { key1: StepClass1, key2: StepClass2, ..., otherwise: FallbackStepClass })
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divert(to: { premium: EnrichFromPremiumSource, standard: EnrichFromStandardSource, otherwise: EnrichFromDefaultSource }) endendBlock syntax: Like divide, the divert transition accepts an optional block, yielding a branch for each step. This allows you to chain different transitions onto each branch independently:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(StepA) .divert(to: { bar: StepB, otherwise: StepC }) do |branch1, branch2| branch1.chain(to: StepD) branch2.chain(to: StepE).chain(to: StepF) end endendNote: Omitting the otherwise key raises a Ductwork::DSL::DefinitionBuilder::DivertError. If at runtime the return value doesn’t match any key and no otherwise branch exists, the pipeline will halt.
converge - Merge Diverted Branches
Section titled “converge - Merge Diverted Branches”The converge transition is the inverse of divert. It merges diverted branches back into a single step. The converged step receives the return value from whichever branch was selected at runtime.
Syntax: converge(into: StepClass)
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(QueryUsersRequiringEnrichment) .expand(to: LoadUserData) .divert(to: { premium: EnrichFromPremiumSource, standard: EnrichFromStandardSource, otherwise: EnrichFromDefaultSource }) .converge(into: UpdateUserData) endendBlock syntax: For improved readability, you can call converge on any branch and pass the other branches as arguments:
class EnrichAllUsersDataPipeline < Ductwork::Pipeline define do |pipeline| pipeline.start(StepA) .divert(to: { bar: StepB, otherwise: StepC }) do |branch1, branch2| branch1.converge(branch2, into: StepD) end endendNote: Calling converge on a pipeline that hasn’t been diverted raises a Ductwork::DSL::DefinitionBuilder::ConvergeError.
Next, we’re ready to run the ductwork processes.