The Beam programming model provides a structured approach to building data processing pipelines that work across batch and streaming data sources.
Pipeline Construction
A Beam pipeline follows a clear pattern:
Create a Pipeline object
Read data from one or more sources into PCollections
Apply transforms to process and transform PCollections
Write results to one or more sinks
Run the pipeline on a runner
Basic Pipeline Pattern
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms. * ;
import org.apache.beam.sdk.values.PCollection;
public class BasicPipeline {
public static void main ( String [] args ) {
// 1. Create pipeline options
PipelineOptions options = PipelineOptionsFactory . create ();
// 2. Create the pipeline
Pipeline p = Pipeline . create (options);
// 3. Read data (creates a PCollection)
PCollection < String > lines = p . apply (
TextIO . read (). from ( "gs://apache-beam-samples/shakespeare/kinglear.txt" ));
// 4. Transform data
PCollection < String > words = lines . apply (
FlatMapElements . into ( TypeDescriptors . strings ())
. via (( String line) -> Arrays . asList ( line . split ( "[^ \\ p{L}]+" ))));
// 5. Write results
words . apply ( TextIO . write (). to ( "output" ));
// 6. Run the pipeline
p . run (). waitUntilFinish ();
}
}
Pipeline Options
Pipeline options configure how your pipeline executes. They specify:
Which runner to use (DirectRunner, DataflowRunner, FlinkRunner, etc.)
Runner-specific settings (project ID, staging location, etc.)
Custom application parameters
import org.apache.beam.sdk.options. * ;
public interface MyOptions extends PipelineOptions {
@ Description ( "Input file pattern" )
@ Default.String ( "gs://my-bucket/input/*.txt" )
String getInputFile ();
void setInputFile ( String value );
@ Description ( "Output path" )
@ Validation . Required
String getOutputPath ();
void setOutputPath ( String value );
}
// Parse options from command line
MyOptions options = PipelineOptionsFactory
. fromArgs (args)
. withValidation ()
. as ( MyOptions . class );
Pipeline p = Pipeline . create (options);
Transforms can be chained to create a processing pipeline:
PCollection < String > results = input
. apply ( "Step1" , ParDo . of ( new TransformFn1 ()))
. apply ( "Step2" , ParDo . of ( new TransformFn2 ()))
. apply ( "Step3" , Combine . globally ( new CombineFn ()));
Branching Pipelines
A single PCollection can be used as input to multiple transforms:
PCollection < String > input = p . apply ( TextIO . read (). from ( "input.txt" ));
// Branch 1: Count words
PCollection < KV < String , Long >> wordCounts = input
. apply ( "CountWords" , new CountWords ());
// Branch 2: Find long lines
PCollection < String > longLines = input
. apply ( "FilterLongLines" , Filter . by (line -> line . length () > 100 ));
// Each branch processes independently
wordCounts . apply ( "WriteCount" , TextIO . write (). to ( "counts" ));
longLines . apply ( "WriteLong" , TextIO . write (). to ( "long-lines" ));
Create reusable transform components by combining multiple transforms:
static class CountWords
extends PTransform < PCollection < String >, PCollection < KV < String , Long >>> {
@ Override
public PCollection < KV < String , Long >> expand ( PCollection < String > lines ) {
return lines
. apply ( "ExtractWords" ,
FlatMapElements . into ( TypeDescriptors . strings ())
. via (line -> Arrays . asList ( line . split ( " \\ s+" ))))
. apply ( "FilterEmpty" ,
Filter . by (word -> ! word . isEmpty ()))
. apply ( "CountPerElement" ,
Count . perElement ());
}
}
// Usage
PCollection < String > lines = ...;
PCollection < KV < String , Long >> counts = lines . apply ( new CountWords ());
Composite transforms improve code organization, enable reuse, and make pipelines easier to understand and maintain.
Multiple Outputs (Tagged)
// Define output tags
final TupleTag < String > upperTag = new TupleTag < String >(){};
final TupleTag < String > lowerTag = new TupleTag < String >(){};
PCollectionTuple results = input . apply (
ParDo . of ( new DoFn < String , String >() {
@ ProcessElement
public void processElement (@ Element String word , MultiOutputReceiver out ) {
if ( word . equals ( word . toUpperCase ())) {
out . get (upperTag). output (word);
} else {
out . get (lowerTag). output (word);
}
}
}). withOutputTags (upperTag, TupleTagList . of (lowerTag))
);
PCollection < String > upperWords = results . get (upperTag);
PCollection < String > lowerWords = results . get (lowerTag);
PCollection < String > pc1 = ...;
PCollection < String > pc2 = ...;
PCollection < String > pc3 = ...;
// Flatten multiple PCollections
PCollection < String > merged = PCollectionList
. of (pc1). and (pc2). and (pc3)
. apply ( Flatten . pCollections ());
Best Practices
Naming transforms : Always provide meaningful names to transforms. This helps with monitoring, debugging, and understanding the pipeline structure.
Avoid side effects : PTransforms should be deterministic and avoid side effects. The same input should always produce the same output, regardless of how many times the transform is executed.
Start simple : Begin with a minimal pipeline and add complexity gradually. Test each transform independently before combining them.
Testing Pipelines
Beam provides testing utilities to validate pipeline logic:
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
@ Rule
public final transient TestPipeline p = TestPipeline . create ();
@ Test
public void testPipeline () {
PCollection < String > input = p . apply ( Create . of ( "hello" , "world" ));
PCollection < Integer > lengths = input . apply (
MapElements . into ( TypeDescriptors . integers ())
. via (String :: length));
PAssert . that (lengths). containsInAnyOrder ( 5 , 5 );
p . run ();
}
Next Steps
Pipelines Deep dive into Pipeline creation and configuration
PCollections Learn about working with distributed data
Transforms Explore the full range of available transforms
Windowing Handle unbounded data with windows