Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The WordCount examples demonstrate core Beam concepts through progressively complex implementations. Each example builds on the previous one, introducing new features and best practices.
Overview
WordCount is the “Hello World” of data processing pipelines. These examples show you how to:
Read text data from files
Transform and process data using Beam transforms
Count and aggregate results
Write output to files
Configure and run pipelines on different runners
Start with MinimalWordCount
Begin with the simplest implementation to understand the basic pipeline structure.
Explore WordCount
Learn best practices including custom transforms, pipeline options, and metrics.
Debug with DebuggingWordCount
Add logging, metrics, and testing to your pipelines.
Handle streaming with WindowedWordCount
Process unbounded data with windowing and timestamps.
MinimalWordCount
The simplest WordCount implementation focuses on the core pipeline structure without additional complexity.
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class MinimalWordCount {
public static void main ( String [] args ) {
// Create pipeline options
PipelineOptions options = PipelineOptionsFactory . create ();
Pipeline p = Pipeline . create (options);
p . apply ( TextIO . read (). from ( "gs://apache-beam-samples/shakespeare/kinglear.txt" ))
// Split lines into words
. apply ( FlatMapElements
. into ( TypeDescriptors . strings ())
. via (( String line) -> Arrays . asList ( line . split ( "[^ \\ p{L}]+" ))))
// Filter empty words
. apply ( Filter . by (( String word) -> ! word . isEmpty ()))
// Count occurrences
. apply ( Count . perElement ())
// Format as text
. apply ( MapElements
. into ( TypeDescriptors . strings ())
. via (( KV < String, Long > wordCount) ->
wordCount . getKey () + ": " + wordCount . getValue ()))
// Write results
. apply ( TextIO . write (). to ( "wordcounts" ));
p . run (). waitUntilFinish ();
}
}
Key concepts:
TextIO.read(): Reads text files line by line
FlatMapElements: Splits each line into multiple words
Filter: Removes empty strings
Count.perElement(): Counts occurrences of each unique word
MapElements: Formats output as “word: count”
TextIO.write(): Writes results to files
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re
def main ():
# Create pipeline options
pipeline_options = PipelineOptions()
with beam.Pipeline( options = pipeline_options) as p:
# Read input file
lines = p | ReadFromText( 'gs://dataflow-samples/shakespeare/kinglear.txt' )
# Count words
counts = (
lines
| 'Split' >> beam.FlatMap( lambda x : re.findall( r ' [ A-Za-z \' ] + ' , x))
| 'PairWithOne' >> beam.Map( lambda x : (x, 1 ))
| 'GroupAndSum' >> beam.CombinePerKey( sum ))
# Format output
output = counts | 'Format' >> beam.Map(
lambda word_count : ' %s : %s ' % (word_count[ 0 ], word_count[ 1 ]))
# Write results
output | WriteToText( 'wordcounts' )
if __name__ == '__main__' :
main()
Key concepts:
ReadFromText: Reads text files
FlatMap: Splits lines into words using regex
Map: Pairs each word with 1
CombinePerKey: Sums counts for each word
WriteToText: Writes formatted results
package main
import (
" context "
" fmt "
" regexp "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
" github.com/apache/beam/sdks/v2/go/pkg/beam/register "
" github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism "
)
var wordRE = regexp . MustCompile ( `[a-zA-Z]+('[a-z])?` )
func splitWords ( line string , emit func ( string )) {
for _ , word := range wordRE . FindAllString ( line , - 1 ) {
emit ( word )
}
}
func formatCounts ( w string , c int ) string {
return fmt . Sprintf ( " %s : %v " , w , c )
}
func init () {
register . Function2x0 ( splitWords )
register . Function2x1 ( formatCounts )
register . Emitter1 [ string ]()
}
func main () {
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Read input
lines := textio . Read ( s , "gs://apache-beam-samples/shakespeare/kinglear.txt" )
// Split into words
words := beam . ParDo ( s , splitWords , lines )
// Count words
counted := stats . Count ( s , words )
// Format output
formatted := beam . ParDo ( s , formatCounts , counted )
// Write results
textio . Write ( s , "wordcounts.txt" , formatted )
prism . Execute ( context . Background (), p )
}
Key concepts:
textio.Read: Reads text files
beam.ParDo: Applies functions to split words
stats.Count: Counts occurrences
register: Registers functions for portable execution
textio.Write: Writes results
WordCount with Best Practices
The full WordCount example demonstrates production-ready patterns:
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options. * ;
import org.apache.beam.sdk.transforms. * ;
import org.apache.beam.sdk.values. * ;
import java.util.regex.Pattern;
import java.util.stream.Stream;
public class WordCount {
// Custom DoFn with metrics
static class ExtractWordsFn extends DoFn < String , String > {
private final Counter emptyLines =
Metrics . counter ( ExtractWordsFn . class , "emptyLines" );
private final Distribution lineLenDist =
Metrics . distribution ( ExtractWordsFn . class , "lineLenDistro" );
private final Pattern splitPattern = Pattern . compile ( "[^ \\ p{L}]+" );
@ ProcessElement
public void processElement (@ Element String element ,
OutputReceiver < String > receiver ) {
lineLenDist . update ( element . length ());
if ( element . trim (). isEmpty ()) {
emptyLines . inc ();
}
Stream < String > stream = splitPattern . splitAsStream (element);
stream . forEach (word -> {
if ( ! word . isEmpty ()) {
receiver . output (word);
}
});
}
}
// Composite transform for reusability
public static class CountWords
extends PTransform < PCollection < String >, PCollection < KV < String , Long >>> {
@ Override
public PCollection < KV < String , Long >> expand ( PCollection < String > lines ) {
PCollection < String > words = lines . apply ( ParDo . of ( new ExtractWordsFn ()));
PCollection < KV < String , Long >> wordCounts = words . apply ( Count . perElement ());
return wordCounts;
}
}
// Custom pipeline options
public interface WordCountOptions extends PipelineOptions {
@ Description ( "Path of the file to read from" )
@ Default.String ( "gs://apache-beam-samples/shakespeare/kinglear.txt" )
String getInputFile ();
void setInputFile ( String value );
@ Description ( "Path of the file to write to" )
@ Validation . Required
String getOutput ();
void setOutput ( String value );
}
static void runWordCount ( WordCountOptions options ) {
Pipeline p = Pipeline . create (options);
p . apply ( "ReadLines" , TextIO . read (). from ( options . getInputFile ()))
. apply ( new CountWords ())
. apply ( MapElements . via ( new SimpleFunction < KV < String , Long >, String >() {
@ Override
public String apply ( KV < String , Long > input ) {
return input . getKey () + ": " + input . getValue ();
}
}))
. apply ( "WriteCounts" , TextIO . write (). to ( options . getOutput ()));
p . run (). waitUntilFinish ();
}
public static void main ( String [] args ) {
WordCountOptions options = PipelineOptionsFactory
. fromArgs (args)
. withValidation ()
. as ( WordCountOptions . class );
runWordCount (options);
}
}
New concepts:
Custom DoFn classes : Define reusable processing logic
Metrics : Track pipeline behavior (counters, distributions)
Composite transforms : Bundle multiple transforms for reuse
Pipeline options : Configure pipelines via command-line arguments
Named transforms : Easier debugging and monitoring
Running WordCount
Local Execution
Run with the DirectRunner for local testing:
mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args= "--output=./wordcounts"
python wordcount.py --output ./wordcounts
Running on Cloud Dataflow
mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args= "--runner=DataflowRunner \
--project=YOUR_PROJECT_ID \
--region=us-central1 \
--tempLocation=gs://YOUR_BUCKET/temp/ \
--output=gs://YOUR_BUCKET/output/wordcounts"
python wordcount.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--region us-central1 \
--temp_location gs://YOUR_BUCKET/temp/ \
--output gs://YOUR_BUCKET/output/wordcounts
Running on Apache Flink
mvn compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args= "--runner=FlinkRunner \
--flinkMaster=localhost:8081 \
--output=./wordcounts"
python wordcount.py \
--runner FlinkRunner \
--flink_master localhost:8081 \
--output ./wordcounts
WindowedWordCount
The windowed example shows how to process streaming data with time-based windows:
// Add timestamps to data
PCollection < String > input = pipeline
. apply ( TextIO . read (). from ( options . getInputFile ()))
. apply ( ParDo . of ( new AddTimestampFn (minTimestamp, maxTimestamp)));
// Apply fixed windows
PCollection < String > windowedWords = input . apply (
Window . into ( FixedWindows . of ( Duration . standardMinutes ( options . getWindowSize ()))));
// Reuse CountWords transform
PCollection < KV < String , Long >> wordCounts =
windowedWords . apply ( new WordCount. CountWords ());
// Write one file per window
wordCounts
. apply ( MapElements . via ( new WordCount. FormatAsTextFn ()))
. apply ( new WriteOneFilePerWindow (output, options . getNumShards ()));
Windowing concepts:
Timestamps : Associate data with event times
Windows : Group data into time-based buckets
FixedWindows : Non-overlapping time intervals
Per-window processing : Process each window independently
Key Takeaways
Start Simple Begin with MinimalWordCount to understand core concepts before adding complexity.
Add Metrics Use counters and distributions to monitor pipeline behavior in production.
Create Composite Transforms Bundle related transforms for reusability and better testing.
Use Pipeline Options Make pipelines configurable via command-line arguments instead of hardcoding values.
Next Steps
Cookbook Examples Explore common patterns like filtering, joining, and combining data.
Transforms Guide Learn about all available Beam transforms.
I/O Connectors Read from and write to various data sources.
Runners Execute pipelines on different distributed processing backends.