Skip to main content
The Apache Beam Python SDK enables you to create powerful batch and streaming data pipelines using Python’s expressive syntax and rich ecosystem of libraries.

Installation

1

Install Python

Ensure you have Python 3.8 or later installed:
python --version
2

Create a virtual environment (recommended)

python -m venv beam-env
source beam-env/bin/activate  # On Windows: beam-env\Scripts\activate
3

Install Apache Beam

pip install apache-beam
4

Verify installation

python -c "import apache_beam as beam; print(beam.__version__)"

Quick Start

Here’s a simple word count example:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import re

class WordExtractingDoFn(beam.DoFn):
    """Parse each line of input text into words."""
    
    def process(self, element):
        """Returns an iterator over the words of this element."""
        return re.findall(r'[\w\']+', element, re.UNICODE)

def run():
    # Create pipeline options
    pipeline_options = PipelineOptions()
    
    # Create the pipeline
    with beam.Pipeline(options=pipeline_options) as pipeline:
        (
            pipeline
            | 'Read' >> ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
            | 'Split' >> beam.ParDo(WordExtractingDoFn())
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)
            | 'Format' >> beam.MapTuple(lambda word, count: f'{word}: {count}')
            | 'Write' >> WriteToText('output/wordcount')
        )

if __name__ == '__main__':
    run()
Run the pipeline:
python wordcount.py --output output.txt

Core Concepts

Pipeline

A pipeline encapsulates your data processing workflow:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions()
with beam.Pipeline(options=options) as pipeline:
    # Build your pipeline here
    pass

PCollection

PCollection represents a distributed dataset:
# Create from memory
data = pipeline | beam.Create(['Hello', 'World', 'Beam'])

# Read from file
lines = pipeline | beam.io.ReadFromText('input.txt')

# PCollections are immutable
processed = data | beam.Map(str.upper)

Transforms

Transforms process data in your pipeline:
# Map: 1-to-1 transformation
upper = words | beam.Map(str.upper)

# FlatMap: 1-to-many transformation
words = lines | beam.FlatMap(lambda line: line.split())

# Filter: Keep elements matching condition
long_words = words | beam.Filter(lambda word: len(word) > 5)

# CombinePerKey: Aggregate values by key
sums = pairs | beam.CombinePerKey(sum)

DoFn (Do Functions)

DoFn allows for more complex processing:
class SplitWords(beam.DoFn):
    def process(self, element):
        """Yields individual words from each line."""
        for word in element.split():
            yield word

words = lines | beam.ParDo(SplitWords())

Python-Specific Features

Type Hints

Improve pipeline validation with type hints:
import apache_beam as beam
from apache_beam.typehints import TypeCheckError

def process_data(element: str) -> int:
    return len(element)

results = (
    pipeline
    | beam.Create(['a', 'bb', 'ccc'])
    | beam.Map(process_data).with_output_types(int)
)

Lambda Functions

Use Python lambdas for simple transformations:
results = (
    data
    | beam.Map(lambda x: x * 2)
    | beam.Filter(lambda x: x > 10)
    | beam.FlatMap(lambda x: [x, x + 1])
)

DataFrame API

Use familiar pandas-like operations:
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe, to_pcollection

with beam.Pipeline() as pipeline:
    # Create PCollection
    data = pipeline | beam.Create([
        {'word': 'hello', 'count': 3},
        {'word': 'world', 'count': 5},
    ])
    
    # Convert to DataFrame
    df = to_dataframe(data)
    
    # Use pandas operations
    df['count'] = df['count'] * 2
    result_df = df.groupby('word').sum()
    
    # Convert back to PCollection
    result = to_pcollection(result_df, pipeline=pipeline)

Machine Learning with RunInference

Integrate ML models directly into your pipeline:
from apache_beam.ml.inference.base import RunInference
from apache_beam.ml.inference.sklearn_inference import SklearnModelHandlerNumpy
import pickle

# Load your trained model
model_handler = SklearnModelHandlerNumpy(
    model_uri='gs://my-bucket/model.pkl'
)

predictions = (
    data
    | 'Preprocess' >> beam.Map(preprocess_fn)
    | 'RunInference' >> RunInference(model_handler)
    | 'Postprocess' >> beam.Map(postprocess_fn)
)

Streaming Pipelines

Process unbounded data streams:
import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import StandardOptions

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        | 'Read from Pub/Sub' >> beam.io.ReadFromPubSub(
            subscription='projects/myproject/subscriptions/mysub'
        )
        | 'Parse JSON' >> beam.Map(json.loads)
        | 'Window' >> beam.WindowInto(window.FixedWindows(60))  # 1-minute windows
        | 'Count per key' >> beam.CombinePerKey(sum)
        | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
            'myproject:mydataset.mytable'
        )
    )

Windowing

Group streaming data into windows:
from apache_beam import window

# Fixed time windows
windowed_data = (
    data
    | beam.WindowInto(window.FixedWindows(60))  # 60 seconds
)

# Sliding windows
sliding_windows = (
    data
    | beam.WindowInto(window.SlidingWindows(60, 30))  # 60s windows every 30s
)

# Session windows
session_windows = (
    data
    | beam.WindowInto(window.Sessions(600))  # 10-minute gap
)

I/O Connectors

The Python SDK supports various data sources:

Files

# Text files
lines = pipeline | beam.io.ReadFromText('input.txt')
lines | beam.io.WriteToText('output.txt')

# Avro
records = pipeline | beam.io.ReadFromAvro('data.avro')
data | beam.io.WriteToAvro('output.avro', schema=avro_schema)

# Parquet
df = pipeline | beam.io.ReadFromParquet('data.parquet')
data | beam.io.WriteToParquet('output.parquet')

Google Cloud Platform

# BigQuery
rows = pipeline | beam.io.ReadFromBigQuery(
    table='project:dataset.table'
)
data | beam.io.WriteToBigQuery(
    'project:dataset.table',
    schema={'fields': [{'name': 'field1', 'type': 'STRING'}]}
)

# Cloud Pub/Sub
messages = pipeline | beam.io.ReadFromPubSub(
    topic='projects/myproject/topics/mytopic'
)
data | beam.io.WriteToPubSub(
    topic='projects/myproject/topics/output'
)

# Cloud Storage
files = pipeline | beam.io.ReadFromText('gs://bucket/path/*.txt')

Databases

# MongoDB
from apache_beam.io.mongodbio import ReadFromMongoDB

data = pipeline | ReadFromMongoDB(
    uri='mongodb://localhost:27017',
    db='mydb',
    coll='mycollection'
)

# JDBC (via Java)
from apache_beam.io.jdbc import ReadFromJdbc

rows = pipeline | ReadFromJdbc(
    table_name='my_table',
    driver_class_name='org.postgresql.Driver',
    jdbc_url='jdbc:postgresql://localhost:5432/mydb',
    username='user',
    password='pass'
)

Running Pipelines

Direct Runner (Local)

python my_pipeline.py \
  --runner DirectRunner \
  --output output.txt

Google Cloud Dataflow

python my_pipeline.py \
  --runner DataflowRunner \
  --project YOUR_PROJECT_ID \
  --region us-central1 \
  --temp_location gs://YOUR_BUCKET/temp \
  --staging_location gs://YOUR_BUCKET/staging
python my_pipeline.py \
  --runner FlinkRunner \
  --flink_master localhost:8081 \
  --environment_type=DOCKER \
  --environment_config=apache/beam_python3.9_sdk:latest

Best Practices

Define configurable options for your pipeline:
from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument('--input', required=True)
        parser.add_argument('--output', required=True)

options = MyOptions()
with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        | beam.io.ReadFromText(options.input)
        | beam.io.WriteToText(options.output)
    )
Manage pipeline dependencies properly:
# Use setup.py for custom dependencies
options = PipelineOptions(
    setup_file='./setup.py'
)

# Or specify requirements
options.view_as(SetupOptions).requirements_file = 'requirements.txt'
Use lifecycle methods for expensive operations:
class ProcessWithResource(beam.DoFn):
    def setup(self):
        # Initialize expensive resources once
        self.client = create_client()
    
    def process(self, element):
        # Process using self.client
        result = self.client.query(element)
        yield result
    
    def teardown(self):
        # Clean up resources
        self.client.close()
Prefer Combine transforms for better performance:
import apache_beam as beam
from apache_beam import combiners

# Good: Uses Combine for efficient aggregation
totals = data | beam.CombinePerKey(sum)

# Also good: Built-in combiners
stats = data | beam.CombineGlobally(
    combiners.MeanCombineFn()
)

Interactive Beam

Develop and debug pipelines in Jupyter notebooks:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib

# Create pipeline
p = beam.Pipeline(interactive_runner.InteractiveRunner())

data = p | beam.Create([1, 2, 3, 4, 5])
results = data | beam.Map(lambda x: x * 2)

# Show results interactively
ib.show(results)

Testing Pipelines

Test your pipelines effectively:
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to

class MyPipelineTest(unittest.TestCase):
    def test_word_count(self):
        with TestPipeline() as p:
            input_data = ['hello world', 'hello beam']
            expected = [('hello', 2), ('world', 1), ('beam', 1)]
            
            result = (
                p
                | beam.Create(input_data)
                | beam.FlatMap(lambda x: x.split())
                | beam.Map(lambda x: (x, 1))
                | beam.CombinePerKey(sum)
            )
            
            assert_that(result, equal_to(expected))

Resources

API Reference

Complete Python API documentation

Code Examples

Example pipelines and patterns

ML Guide

Machine learning with Beam

Interactive Beam

Jupyter notebook development

Next Steps