A Pipeline encapsulates your entire data processing task, from start to finish. It represents a directed acyclic graph (DAG) of data transformations.
What is a Pipeline?
A Pipeline manages the execution of data processing operations. According to the source code:
Java (Pipeline.java:74-82)
Python (pipeline.py:18-28)
Go (pipeline.go:72-76)
/**
* A {@link Pipeline} manages a directed acyclic graph of {@link PTransform PTransforms}, and the
* {@link PCollection PCollections} that the {@link PTransform PTransforms} consume and produce.
*
* Each {@link Pipeline} is self-contained and isolated from any other {@link Pipeline}. The
* {@link PValue PValues} that are inputs and outputs of each of a {@link Pipeline Pipeline's}
* {@link PTransform PTransforms} are also owned by that {@link Pipeline}.
*/
Creating a Pipeline
Basic Pipeline Creation
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
// Create default options
PipelineOptions options = PipelineOptionsFactory . create ();
// Create the pipeline
Pipeline p = Pipeline . create (options);
Pipeline Options
Pipeline options configure pipeline execution, including runner selection and runtime parameters.
Standard Pipeline Options
import org.apache.beam.sdk.options. * ;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
// Parse from command line
PipelineOptions options = PipelineOptionsFactory
. fromArgs (args)
. withValidation ()
. create ();
// Configure for specific runner (e.g., Dataflow)
DataflowPipelineOptions dataflowOptions =
options . as ( DataflowPipelineOptions . class );
dataflowOptions . setRunner ( DataflowRunner . class );
dataflowOptions . setProject ( "my-project-id" );
dataflowOptions . setRegion ( "us-central1" );
dataflowOptions . setTempLocation ( "gs://my-bucket/temp" );
Pipeline p = Pipeline . create (dataflowOptions);
Custom Pipeline Options
Define custom options for application-specific parameters:
import org.apache.beam.sdk.options. * ;
public interface MyOptions extends PipelineOptions {
@ Description ( "Input file path or pattern" )
@ Default.String ( "gs://my-bucket/input/*.txt" )
String getInputFile ();
void setInputFile ( String value );
@ Description ( "Output directory" )
@ Validation . Required
String getOutput ();
void setOutput ( String value );
@ Description ( "Minimum word length to count" )
@ Default.Integer ( 5 )
Integer getMinWordLength ();
void setMinWordLength ( Integer value );
}
// Usage
MyOptions options = PipelineOptionsFactory
. fromArgs (args)
. withValidation ()
. as ( MyOptions . class );
Pipeline p = Pipeline . create (options);
// Access custom options
String inputFile = options . getInputFile ();
Integer minLength = options . getMinWordLength ();
Running a Pipeline
Direct Execution
Pipeline p = Pipeline . create (options);
// Build pipeline
p . apply (...);
// Run and wait for completion
PipelineResult result = p . run ();
result . waitUntilFinish ();
Running with Different Runners
DirectRunner
DataflowRunner
FlinkRunner
SparkRunner
The DirectRunner executes pipelines locally for development and testing. # Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Pdirect-runner
# Python
python my_pipeline.py --runner=DirectRunner
# Go
go run my_pipeline.go --runner=direct
The DataflowRunner executes on Google Cloud Dataflow. # Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Pdataflow-runner \
-Dexec.args= "--project=my-project \
--region=us-central1 \
--tempLocation=gs://my-bucket/temp"
# Python
python my_pipeline.py \
--runner=DataflowRunner \
--project=my-project \
--region=us-central1 \
--temp_location=gs://my-bucket/temp
# Go
go run my_pipeline.go \
--runner=dataflow \
--project=my-project \
--region=us-central1 \
--temp_location=gs://my-bucket/temp
The FlinkRunner executes on Apache Flink. # Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Pflink-runner \
-Dexec.args= "--runner=FlinkRunner \
--flinkMaster=localhost:8081"
# Python
python my_pipeline.py \
--runner=FlinkRunner \
--flink_master=localhost:8081
# Go
go run my_pipeline.go \
--runner=flink \
--flink_master=localhost:8081
The SparkRunner executes on Apache Spark. # Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Pspark-runner \
-Dexec.args= "--runner=SparkRunner \
--sparkMaster=spark://host:port"
# Python
python my_pipeline.py \
--runner=SparkRunner \
--spark_master_url=spark://host:port
Pipeline Lifecycle
1. Construction Phase
During construction, you build the pipeline DAG by applying transforms:
Pipeline p = Pipeline . create (options);
// Each apply() adds nodes to the DAG
PCollection < String > lines = p . apply ( "Read" , TextIO . read (). from ( "input.txt" ));
PCollection < String > words = lines . apply ( "Split" , new SplitWords ());
PCollection < KV < String , Long >> counts = words . apply ( "Count" , Count . perElement ());
// No actual data processing happens yet
2. Validation Phase
Before execution, the pipeline is validated:
Type checking ensures transforms are compatible
Graph structure is verified (no cycles, etc.)
Runner capabilities are checked
3. Optimization Phase
The runner may optimize the pipeline:
Fusing compatible transforms
Reordering operations
Eliminating redundant work
4. Execution Phase
PipelineResult result = p . run ();
The runner executes the pipeline:
Distributes work across workers
Processes data elements
Manages state and checkpointing
Handles failures and retries
5. Monitoring and Completion
// Wait for completion
result . waitUntilFinish ();
// Or monitor state
PipelineResult . State state = result . getState ();
if (state == PipelineResult . State . DONE ) {
// Pipeline completed successfully
}
Complete Example: Word Count Pipeline
Here’s a complete pipeline from the Apache Beam examples:
Java (MinimalWordCount.java)
Python (wordcount_minimal.py)
Go (wordcount.go)
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class MinimalWordCount {
public static void main ( String [] args ) {
// Create pipeline options
PipelineOptions options = PipelineOptionsFactory . create ();
// Create the pipeline
Pipeline p = Pipeline . create (options);
// Read input
p . apply ( TextIO . read (). from ( "gs://apache-beam-samples/shakespeare/kinglear.txt" ))
// Split into words
. apply ( FlatMapElements . into ( TypeDescriptors . strings ())
. via (( String line) -> Arrays . asList ( line . split ( "[^ \\ p{L}]+" ))))
// Filter empty strings
. apply ( Filter . by (( String word) -> ! word . isEmpty ()))
// Count occurrences
. apply ( Count . perElement ())
// Format output
. apply ( MapElements . into ( TypeDescriptors . strings ())
. via (( KV < String, Long > wordCount) ->
wordCount . getKey () + ": " + wordCount . getValue ()))
// Write output
. apply ( TextIO . write (). to ( "wordcounts" ));
// Run the pipeline
p . run (). waitUntilFinish ();
}
}
Best Practices
Immutability : Pipelines are immutable after construction. You cannot modify a pipeline after calling run().
Resource Management : Always clean up resources properly. Use try-with-resources (Java) or context managers (Python) when appropriate.
Testing : Use TestPipeline for unit testing. It provides the same API but runs in-memory for fast testing.
Minimize I/O : Read and write efficiently
Avoid shuffles : Reduce GroupByKey operations when possible
Use appropriate runners : Choose the right runner for your use case
Monitor metrics : Track pipeline performance and optimize bottlenecks
Next Steps
PCollections Learn about distributed data collections
Transforms Explore data transformation operations