Transforms are the operations that process data in your Beam pipeline. A PTransform takes one or more PCollections as input and produces one or more PCollections as output.
From the Apache Beam source code:
Java (PTransform.java:45-78)
Python (ptransform.py)
/**
* A {@code PTransform<InputT, OutputT>} is an operation that takes an {@code InputT}
* (some subtype of {@link PInput}) and produces an {@code OutputT} (some subtype of
* {@link POutput}).
*
* PTransforms include root PTransforms like TextIO.Read, processing and conversion
* operations like ParDo, GroupByKey, Combine, and Count, and outputting PTransforms
* like TextIO.Write.
*
* Example usage:
* PCollection<T1> pc1 = ...;
* PCollection<T2> pc2 =
* pc1.apply(ParDo.of(new MyDoFn<T1,KV<K,V>>()))
* .apply(GroupByKey.<K, V>create())
* .apply(Combine.perKey(new MyKeyedCombineFn<K,V>()))
* .apply(ParDo.of(new MyDoFn2<KV<K,V>,T2>()));
*/
Beam provides several categories of built-in transforms:
Process each element independently.
Combine multiple elements into aggregated results.
Combine multiple transforms into reusable components.
Read from and write to external systems.
ParDo
ParDo is the most general transform - it processes each element with a user-defined function (DoFn).
Basic ParDo
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
static class ComputeLengthFn extends DoFn < String , Integer > {
@ ProcessElement
public void processElement (@ Element String word , OutputReceiver < Integer > out ) {
out . output ( word . length ());
}
}
PCollection < String > words = ...;
PCollection < Integer > lengths = words . apply ( ParDo . of ( new ComputeLengthFn ()));
DoFn Lifecycle
DoFns have a lifecycle with several methods:
class MyDoFn extends DoFn < String , String > {
@ Setup
public void setup () {
// Called once per DoFn instance before processing
// Initialize expensive resources
}
@ StartBundle
public void startBundle () {
// Called at the start of each bundle
}
@ ProcessElement
public void processElement (@ Element String element , OutputReceiver < String > out ) {
// Called for each element
out . output ( element . toUpperCase ());
}
@ FinishBundle
public void finishBundle ( FinishBundleContext context ) {
// Called at the end of each bundle
}
@ Teardown
public void teardown () {
// Called once per DoFn instance after processing
// Clean up resources
}
}
DoFn restrictions : DoFns must be serializable and thread-safe. Avoid mutable shared state.
Multiple Outputs
DoFns can output to multiple PCollections:
final TupleTag < Integer > evenTag = new TupleTag < Integer >(){};
final TupleTag < Integer > oddTag = new TupleTag < Integer >(){};
PCollectionTuple results = numbers . apply (
ParDo . of ( new DoFn < Integer , Integer >() {
@ ProcessElement
public void processElement (@ Element Integer number , MultiOutputReceiver out ) {
if (number % 2 == 0 ) {
out . get (evenTag). output (number);
} else {
out . get (oddTag). output (number);
}
}
}). withOutputTags (evenTag, TupleTagList . of (oddTag))
);
PCollection < Integer > evens = results . get (evenTag);
PCollection < Integer > odds = results . get (oddTag);
Map
Apply a simple function to each element:
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
// Using MapElements
PCollection < Integer > lengths = words . apply (
MapElements . into ( TypeDescriptors . integers ())
. via (String :: length));
// Using lambda
PCollection < String > upper = words . apply (
MapElements . into ( TypeDescriptors . strings ())
. via (word -> word . toUpperCase ()));
FlatMap
Produce zero or more outputs per input:
import org.apache.beam.sdk.transforms.FlatMapElements;
import java.util.Arrays;
PCollection < String > words = lines . apply (
FlatMapElements . into ( TypeDescriptors . strings ())
. via (( String line) -> Arrays . asList ( line . split ( " \\ s+" ))));
Filter
Keep only elements that match a predicate:
import org.apache.beam.sdk.transforms.Filter;
// Keep words longer than 5 characters
PCollection < String > longWords = words . apply (
Filter . by (word -> word . length () > 5 ));
// Remove empty strings
PCollection < String > nonEmpty = words . apply (
Filter . by (word -> ! word . isEmpty ()));
GroupByKey
Group values by their key:
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.values.KV;
// Input: [("cat", 1), ("dog", 5), ("cat", 2), ("dog", 1)]
PCollection < KV < String , Integer >> pairs = ...;
// Group by key
PCollection < KV < String , Iterable < Integer >>> grouped =
pairs . apply ( GroupByKey . create ());
// Output: [("cat", [1, 2]), ("dog", [5, 1])]
GroupByKey triggers a shuffle operation, which can be expensive. Use it judiciously.
CoGroupByKey
Join multiple PCollections by key:
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
final TupleTag < String > emailsTag = new TupleTag <>();
final TupleTag < String > phonesTag = new TupleTag <>();
PCollection < KV < String , String >> emails = ...; // (name, email)
PCollection < KV < String , String >> phones = ...; // (name, phone)
PCollection < KV < String , CoGbkResult >> joined =
KeyedPCollectionTuple . of (emailsTag, emails)
. and (phonesTag, phones)
. apply ( CoGroupByKey . create ());
Combine
Aggregate values:
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.Count;
// Count globally
PCollection < Long > count = words . apply ( Count . globally ());
// Count per element
PCollection < KV < String , Long >> wordCounts = words . apply ( Count . perElement ());
// Sum integers
PCollection < Integer > sum = numbers . apply ( Sum . integersGlobally ());
// Sum per key
PCollection < KV < String , Integer >> sums = pairs . apply ( Sum . integersPerKey ());
// Maximum
PCollection < Integer > max = numbers . apply ( Max . integersGlobally ());
Custom CombineFn
Create custom aggregation logic:
import org.apache.beam.sdk.transforms.Combine.CombineFn;
static class AverageFn extends CombineFn < Integer , AverageFn . Accum , Double > {
static class Accum {
int sum = 0 ;
int count = 0 ;
}
@ Override
public Accum createAccumulator () {
return new Accum ();
}
@ Override
public Accum addInput ( Accum accum , Integer input ) {
accum . sum += input;
accum . count ++ ;
return accum;
}
@ Override
public Accum mergeAccumulators ( Iterable < Accum > accums ) {
Accum merged = createAccumulator ();
for ( Accum accum : accums) {
merged . sum += accum . sum ;
merged . count += accum . count ;
}
return merged;
}
@ Override
public Double extractOutput ( Accum accum ) {
return accum . count == 0 ? 0.0 : ( double ) accum . sum / accum . count ;
}
}
PCollection < Double > average = numbers . apply ( Combine . globally ( new AverageFn ()));
Flatten
Merge multiple PCollections of the same type:
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollectionList;
PCollection < String > pc1 = ...;
PCollection < String > pc2 = ...;
PCollection < String > pc3 = ...;
PCollection < String > merged = PCollectionList
. of (pc1). and (pc2). and (pc3)
. apply ( Flatten . pCollections ());
Partition
Split a PCollection into multiple partitions:
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.values.PCollectionList;
PCollectionList < Integer > partitioned = numbers . apply (
Partition . of ( 3 , ( Integer num, int numPartitions) -> num % numPartitions));
PCollection < Integer > partition0 = partitioned . get ( 0 );
PCollection < Integer > partition1 = partitioned . get ( 1 );
PCollection < Integer > partition2 = partitioned . get ( 2 );
Create reusable transform pipelines:
static class CountWords
extends PTransform < PCollection < String >, PCollection < KV < String , Long >>> {
@ Override
public PCollection < KV < String , Long >> expand ( PCollection < String > lines ) {
return lines
. apply ( "ExtractWords" , FlatMapElements
. into ( TypeDescriptors . strings ())
. via (line -> Arrays . asList ( line . split ( " \\ s+" ))))
. apply ( "FilterEmpty" , Filter . by (word -> ! word . isEmpty ()))
. apply ( "CountPerElement" , Count . perElement ());
}
}
// Usage
PCollection < String > lines = ...;
PCollection < KV < String , Long >> counts = lines . apply ( new CountWords ());
Composite transforms improve code reusability and make your pipeline structure clearer.
Best Practices
Name your transforms : Always provide descriptive names to help with debugging and monitoring.
Avoid non-deterministic operations : Transforms should produce the same output for the same input to support fault tolerance.
Next Steps
Windowing Learn how to divide data into time-based windows
Triggers Control when results are emitted