Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The Apache Beam Python SDK enables you to create powerful batch and streaming data pipelines using Python’s expressive syntax and rich ecosystem of libraries.
Installation
Install Python
Ensure you have Python 3.8 or later installed:
Create a virtual environment (recommended)
python -m venv beam-env
source beam-env/bin/activate # On Windows: beam-env\Scripts\activate
Install Apache Beam
pip
pip (with extras)
requirements.txt
Verify installation
python -c "import apache_beam as beam; print(beam.__version__)"
Quick Start
Here’s a simple word count example:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re
class WordExtractingDoFn ( beam . DoFn ):
"""Parse each line of input text into words."""
def process ( self , element ):
"""Returns an iterator over the words of this element."""
return re.findall( r ' [ \w \' ] + ' , element, re. UNICODE )
def run ():
# Create pipeline options
pipeline_options = PipelineOptions()
# Create the pipeline
with beam.Pipeline( options = pipeline_options) as pipeline:
(
pipeline
| 'Read' >> ReadFromText( 'gs://dataflow-samples/shakespeare/kinglear.txt' )
| 'Split' >> beam.ParDo(WordExtractingDoFn())
| 'PairWithOne' >> beam.Map( lambda x : (x, 1 ))
| 'GroupAndSum' >> beam.CombinePerKey( sum )
| 'Format' >> beam.MapTuple( lambda word , count : f ' { word } : { count } ' )
| 'Write' >> WriteToText( 'output/wordcount' )
)
if __name__ == '__main__' :
run()
Run the pipeline:
python wordcount.py --output output.txt
Core Concepts
Pipeline
A pipeline encapsulates your data processing workflow:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions()
with beam.Pipeline( options = options) as pipeline:
# Build your pipeline here
pass
PCollection
PCollection represents a distributed dataset:
# Create from memory
data = pipeline | beam.Create([ 'Hello' , 'World' , 'Beam' ])
# Read from file
lines = pipeline | beam.io.ReadFromText( 'input.txt' )
# PCollections are immutable
processed = data | beam.Map( str .upper)
Transforms process data in your pipeline:
# Map: 1-to-1 transformation
upper = words | beam.Map( str .upper)
# FlatMap: 1-to-many transformation
words = lines | beam.FlatMap( lambda line : line.split())
# Filter: Keep elements matching condition
long_words = words | beam.Filter( lambda word : len (word) > 5 )
# CombinePerKey: Aggregate values by key
sums = pairs | beam.CombinePerKey( sum )
DoFn (Do Functions)
DoFn allows for more complex processing:
class SplitWords ( beam . DoFn ):
def process ( self , element ):
"""Yields individual words from each line."""
for word in element.split():
yield word
words = lines | beam.ParDo(SplitWords())
Python-Specific Features
Type Hints
Improve pipeline validation with type hints:
import apache_beam as beam
from apache_beam.typehints import TypeCheckError
def process_data ( element : str ) -> int :
return len (element)
results = (
pipeline
| beam.Create([ 'a' , 'bb' , 'ccc' ])
| beam.Map(process_data).with_output_types( int )
)
Lambda Functions
Use Python lambdas for simple transformations:
results = (
data
| beam.Map( lambda x : x * 2 )
| beam.Filter( lambda x : x > 10 )
| beam.FlatMap( lambda x : [x, x + 1 ])
)
DataFrame API
Use familiar pandas-like operations:
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe, to_pcollection
with beam.Pipeline() as pipeline:
# Create PCollection
data = pipeline | beam.Create([
{ 'word' : 'hello' , 'count' : 3 },
{ 'word' : 'world' , 'count' : 5 },
])
# Convert to DataFrame
df = to_dataframe(data)
# Use pandas operations
df[ 'count' ] = df[ 'count' ] * 2
result_df = df.groupby( 'word' ).sum()
# Convert back to PCollection
result = to_pcollection(result_df, pipeline = pipeline)
Machine Learning with RunInference
Integrate ML models directly into your pipeline:
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
import pickle
# Load your trained model
model_handler = SklearnModelHandlerNumpy(
model_uri = 'gs://my-bucket/model.pkl'
)
predictions = (
data
| 'Preprocess' >> beam.Map(preprocess_fn)
| 'RunInference' >> RunInference(model_handler)
| 'Postprocess' >> beam.Map(postprocess_fn)
)
Streaming Pipelines
Process unbounded data streams:
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import StandardOptions
options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
with beam.Pipeline( options = options) as pipeline:
(
pipeline
| 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
subscription = 'projects/myproject/subscriptions/mysub'
)
| 'Parse JSON' >> beam.Map(json.loads)
| 'Window' >> beam.WindowInto(window.FixedWindows( 60 )) # 1-minute windows
| 'Count per key' >> beam.CombinePerKey( sum )
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
'myproject:mydataset.mytable'
)
)
Windowing
Group streaming data into windows:
from apache_beam import window
# Fixed time windows
windowed_data = (
data
| beam.WindowInto(window.FixedWindows( 60 )) # 60 seconds
)
# Sliding windows
sliding_windows = (
data
| beam.WindowInto(window.SlidingWindows( 60 , 30 )) # 60s windows every 30s
)
# Session windows
session_windows = (
data
| beam.WindowInto(window.Sessions( 600 )) # 10-minute gap
)
I/O Connectors
The Python SDK supports various data sources:
Files
# Text files
lines = pipeline | beam.io.ReadFromText( 'input.txt' )
lines | beam.io.WriteToText( 'output.txt' )
# Avro
records = pipeline | beam.io.ReadFromAvro( 'data.avro' )
data | beam.io.WriteToAvro( 'output.avro' , schema = avro_schema)
# Parquet
df = pipeline | beam.io.ReadFromParquet( 'data.parquet' )
data | beam.io.WriteToParquet( 'output.parquet' )
# BigQuery
rows = pipeline | beam.io.ReadFromBigQuery(
table = 'project:dataset.table'
)
data | beam.io.WriteToBigQuery(
'project:dataset.table' ,
schema = { 'fields' : [{ 'name' : 'field1' , 'type' : 'STRING' }]}
)
# Cloud Pub/Sub
messages = pipeline | beam.io.ReadFromPubSub(
topic = 'projects/myproject/topics/mytopic'
)
data | beam.io.WriteToPubSub(
topic = 'projects/myproject/topics/output'
)
# Cloud Storage
files = pipeline | beam.io.ReadFromText( 'gs://bucket/path/*.txt' )
Databases
# MongoDB
from apache_beam.io.mongodbio import ReadFromMongoDB
data = pipeline | ReadFromMongoDB(
uri = 'mongodb://localhost:27017' ,
db = 'mydb' ,
coll = 'mycollection'
)
# JDBC (via Java)
from apache_beam.io.jdbc import ReadFromJdbc
rows = pipeline | ReadFromJdbc(
table_name = 'my_table' ,
driver_class_name = 'org.postgresql.Driver' ,
jdbc_url = 'jdbc:postgresql://localhost:5432/mydb' ,
username = 'user' ,
password = 'pass'
)
Running Pipelines
Direct Runner (Local)
python my_pipeline.py \
--runner DirectRunner \
--output output.txt
Google Cloud Dataflow
python my_pipeline.py \
--runner DataflowRunner \
--project YOUR_PROJECT_ID \
--region us-central1 \
--temp_location gs://YOUR_BUCKET/temp \
--staging_location gs://YOUR_BUCKET/staging
Apache Flink
python my_pipeline.py \
--runner FlinkRunner \
--flink_master localhost:8081 \
--environment_type=DOCKER \
--environment_config=apache/beam_python3.9_sdk:latest
Best Practices
Define configurable options for your pipeline: from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions ( PipelineOptions ):
@ classmethod
def _add_argparse_args ( cls , parser ):
parser.add_argument( '--input' , required = True )
parser.add_argument( '--output' , required = True )
options = MyOptions()
with beam.Pipeline( options = options) as pipeline:
(
pipeline
| beam.io.ReadFromText(options.input)
| beam.io.WriteToText(options.output)
)
Manage pipeline dependencies properly: # Use setup.py for custom dependencies
options = PipelineOptions(
setup_file = './setup.py'
)
# Or specify requirements
options.view_as(SetupOptions).requirements_file = 'requirements.txt'
Efficient DoFn Implementation
Use lifecycle methods for expensive operations: class ProcessWithResource ( beam . DoFn ):
def setup ( self ):
# Initialize expensive resources once
self .client = create_client()
def process ( self , element ):
# Process using self.client
result = self .client.query(element)
yield result
def teardown ( self ):
# Clean up resources
self .client.close()
Use Combine for Aggregations
Prefer Combine transforms for better performance: import apache_beam as beam
from apache_beam import combiners
# Good: Uses Combine for efficient aggregation
totals = data | beam.CombinePerKey( sum )
# Also good: Built-in combiners
stats = data | beam.CombineGlobally(
combiners.MeanCombineFn()
)
Interactive Beam
Develop and debug pipelines in Jupyter notebooks:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
# Create pipeline
p = beam.Pipeline(interactive_runner.InteractiveRunner())
data = p | beam.Create([ 1 , 2 , 3 , 4 , 5 ])
results = data | beam.Map( lambda x : x * 2 )
# Show results interactively
ib.show(results)
Testing Pipelines
Test your pipelines effectively:
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
class MyPipelineTest ( unittest . TestCase ):
def test_word_count ( self ):
with TestPipeline() as p:
input_data = [ 'hello world' , 'hello beam' ]
expected = [( 'hello' , 2 ), ( 'world' , 1 ), ( 'beam' , 1 )]
result = (
p
| beam.Create(input_data)
| beam.FlatMap( lambda x : x.split())
| beam.Map( lambda x : (x, 1 ))
| beam.CombinePerKey( sum )
)
assert_that(result, equal_to(expected))
Resources
API Reference Complete Python API documentation
Code Examples Example pipelines and patterns
ML Guide Machine learning with Beam
Interactive Beam Jupyter notebook development
Next Steps