Skip to main content

Overview

Apache Beam provides a rich ecosystem of built-in I/O connectors for interacting with various data sources and sinks. This guide covers the most commonly used connectors with practical examples.

File-Based I/O Connectors

TextIO

Read and write plain text files line-by-line.

Reading Text Files

import apache_beam as beam
from apache_beam.io import ReadFromText

with beam.Pipeline() as p:
    lines = p | ReadFromText('input.txt')
    
    # With glob pattern
    lines = p | ReadFromText('data/*.txt')
    
    # Skip header lines
    lines = p | ReadFromText('data.csv', skip_header_lines=1)
    
    # With compression
    from apache_beam.io.filesystem import CompressionTypes
    lines = p | ReadFromText(
        'input.txt.gz',
        compression_type=CompressionTypes.GZIP
    )

Writing Text Files

import apache_beam as beam
from apache_beam.io import WriteToText

with beam.Pipeline() as p:
    (p 
     | beam.Create(['line1', 'line2', 'line3'])
     | WriteToText('output.txt'))
    
    # With custom sharding
    (p 
     | beam.Create(data)
     | WriteToText(
           'output',
           file_name_suffix='.txt',
           num_shards=5))
    
    # With compression
    from apache_beam.io.filesystem import CompressionTypes
    (p 
     | beam.Create(data)
     | WriteToText(
           'output.txt',
           compression_type=CompressionTypes.GZIP))

TextIO Configuration

file_pattern
string
required
File path or glob pattern (supports * and ? wildcards)
compression_type
CompressionTypes
default:"AUTO"
Compression format: AUTO, GZIP, BZIP2, DEFLATE, ZSTD, UNCOMPRESSED
skip_header_lines
int
default:"0"
Number of header lines to skip when reading
validate
boolean
default:"true"
Whether to validate file existence during pipeline construction
num_shards
int
Number of output shards (files). If not specified, runner decides automatically
shard_name_template
string
Template for naming shards (e.g., -SSSSS-of-NNNNN)

AvroIO

Read and write Apache Avro files with schema support.
import apache_beam as beam
from apache_beam.io import ReadFromAvro, WriteToAvro

# Define schema
schema = {
    'namespace': 'example.avro',
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'},
        {'name': 'email', 'type': ['string', 'null']}
    ]
}

# Reading Avro files
with beam.Pipeline() as p:
    records = p | ReadFromAvro('users.avro')
    # Returns dictionaries: {'name': 'Alice', 'age': 30, 'email': 'alice@example.com'}
    
    # Reading with schema
    records = p | ReadFromAvro(
        'users.avro',
        use_fastavro=True  # Use fastavro for better performance
    )

# Writing Avro files
with beam.Pipeline() as p:
    users = [
        {'name': 'Alice', 'age': 30, 'email': 'alice@example.com'},
        {'name': 'Bob', 'age': 25, 'email': 'bob@example.com'}
    ]
    
    (p 
     | beam.Create(users)
     | WriteToAvro(
           'output.avro',
           schema=schema,
           use_fastavro=True))

AvroIO Configuration

file_pattern
string
required
File path or glob pattern for Avro files
schema
dict/Schema
required
Avro schema definition (for writing)
use_fastavro
boolean
default:"true"
Use fastavro library for better performance (Python)
codec
string
default:"deflate"
Compression codec: null, deflate, snappy, bzip2, zstandard

ParquetIO

Read and write Apache Parquet files for columnar storage.
import apache_beam as beam
from apache_beam.io import ReadFromParquet, WriteToParquet
import pyarrow as pa

# Define PyArrow schema
schema = pa.schema([
    ('name', pa.string()),
    ('age', pa.int64()),
    ('salary', pa.float64()),
])

# Reading Parquet
with beam.Pipeline() as p:
    records = p | ReadFromParquet('data.parquet')
    # Returns dictionaries
    
    # Reading with columns selection
    records = p | ReadFromParquet(
        'data.parquet',
        columns=['name', 'age']  # Read only specific columns
    )

# Writing Parquet
with beam.Pipeline() as p:
    data = [
        {'name': 'Alice', 'age': 30, 'salary': 75000.0},
        {'name': 'Bob', 'age': 25, 'salary': 65000.0}
    ]
    
    (p 
     | beam.Create(data)
     | WriteToParquet(
           'output.parquet',
           schema=schema,
           codec='snappy'))

ParquetIO Configuration

file_pattern
string
required
File path or glob pattern
schema
PyArrow Schema
Schema definition (required for writing)
columns
list[string]
List of columns to read (enables column pruning)
codec
string
default:"snappy"
Compression codec: none, snappy, gzip, lzo, brotli, lz4, zstd

Cloud Storage I/O Connectors

BigQueryIO

Read from and write to Google BigQuery.
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery

# Reading from BigQuery
with beam.Pipeline() as p:
    # Read from table
    rows = p | ReadFromBigQuery(
        table='project:dataset.table'
    )
    
    # Read with SQL query
    rows = p | ReadFromBigQuery(
        query='SELECT name, age FROM `project.dataset.table` WHERE age > 25'
    )
    
    # Read with table reference
    rows = p | ReadFromBigQuery(
        project='my-project',
        dataset='my-dataset',
        table='my-table'
    )

# Writing to BigQuery
with beam.Pipeline() as p:
    # Define schema
    schema = {
        'fields': [
            {'name': 'name', 'type': 'STRING', 'mode': 'REQUIRED'},
            {'name': 'age', 'type': 'INTEGER', 'mode': 'NULLABLE'},
            {'name': 'email', 'type': 'STRING', 'mode': 'NULLABLE'}
        ]
    }
    
    data = [
        {'name': 'Alice', 'age': 30, 'email': 'alice@example.com'},
        {'name': 'Bob', 'age': 25, 'email': 'bob@example.com'}
    ]
    
    (p 
     | beam.Create(data)
     | WriteToBigQuery(
           'project:dataset.table',
           schema=schema,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))

BigQueryIO Configuration

table
string
Table reference in format project:dataset.table or dataset.table
query
string
SQL query to execute (alternative to table)
schema
dict/TableSchema
Table schema (required for writing if table doesn’t exist)
create_disposition
CreateDisposition
default:"CREATE_IF_NEEDED"
  • CREATE_IF_NEEDED: Create table if it doesn’t exist
  • CREATE_NEVER: Fail if table doesn’t exist
write_disposition
WriteDisposition
default:"WRITE_APPEND"
  • WRITE_APPEND: Append to existing data
  • WRITE_TRUNCATE: Overwrite existing data
  • WRITE_EMPTY: Fail if table is not empty
method
string
default:"DEFAULT"
Write method: DEFAULT, FILE_LOADS, STREAMING_INSERTS, STORAGE_WRITE_API

PubSubIO

Read from and write to Google Cloud Pub/Sub (streaming).
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub
import json

# Reading from Pub/Sub
with beam.Pipeline() as p:
    # Read messages as bytes
    messages = p | ReadFromPubSub(
        topic='projects/my-project/topics/my-topic'
    )
    
    # Read from subscription
    messages = p | ReadFromPubSub(
        subscription='projects/my-project/subscriptions/my-sub'
    )
    
    # Read with attributes
    messages = p | ReadFromPubSub(
        topic='projects/my-project/topics/my-topic',
        with_attributes=True
    )
    # Returns PubsubMessage(data=b'...', attributes={...})

# Writing to Pub/Sub
with beam.Pipeline() as p:
    (p 
     | beam.Create(['message1', 'message2'])
     | beam.Map(lambda x: x.encode('utf-8'))
     | WriteToPubSub(
           topic='projects/my-project/topics/output-topic'))
    
    # Write with attributes
    def add_attributes(element):
        from apache_beam.io.gcp.pubsub import PubsubMessage
        return PubsubMessage(
            data=json.dumps(element).encode('utf-8'),
            attributes={'timestamp': str(element['time'])}
        )
    
    (p 
     | beam.Create(data)
     | beam.Map(add_attributes)
     | WriteToPubSub(
           topic='projects/my-project/topics/output',
           with_attributes=True))

PubSubIO Configuration

topic
string
Topic name in format projects/{project}/topics/{topic}
subscription
string
Subscription name in format projects/{project}/subscriptions/{subscription}
with_attributes
boolean
default:"false"
Whether to include message attributes
id_label
string
Attribute key for message deduplication
timestamp_attribute
string
Attribute key containing message timestamp

Message Queue I/O Connectors

KafkaIO

Read from and write to Apache Kafka.
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka

# Reading from Kafka
with beam.Pipeline() as p:
    messages = p | ReadFromKafka(
        consumer_config={
            'bootstrap.servers': 'localhost:9092',
            'group.id': 'my-group'
        },
        topics=['my-topic'],
        max_num_records=1000,  # For bounded read
        start_read_time=0  # Unix timestamp in seconds
    )
    # Returns KV pairs: (key, value)

# Writing to Kafka
with beam.Pipeline() as p:
    (p 
     | beam.Create([('key1', b'value1'), ('key2', b'value2')])
     | WriteToKafka(
           producer_config={
               'bootstrap.servers': 'localhost:9092'
           },
           topic='output-topic'))

KafkaIO Configuration

bootstrap.servers
string
required
Kafka broker addresses (comma-separated)
topics
list[string]
required
List of topics to read from
group.id
string
required
Consumer group ID
max_num_records
int
Maximum records to read (makes source bounded)
start_read_time
int
Start reading from this timestamp (Unix seconds)

Database I/O Connectors

JdbcIO

Read from and write to JDBC-compatible databases.
import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc, WriteToJdbc

# Reading from JDBC
with beam.Pipeline() as p:
    rows = p | ReadFromJdbc(
        table_name='users',
        driver_class_name='org.postgresql.Driver',
        jdbc_url='jdbc:postgresql://localhost:5432/mydb',
        username='user',
        password='password'
    )
    
    # Read with custom query
    rows = p | ReadFromJdbc(
        query='SELECT * FROM users WHERE age > 25',
        driver_class_name='org.postgresql.Driver',
        jdbc_url='jdbc:postgresql://localhost:5432/mydb',
        username='user',
        password='password'
    )

# Writing to JDBC
with beam.Pipeline() as p:
    (p 
     | beam.Create([
           beam.Row(name='Alice', age=30),
           beam.Row(name='Bob', age=25)
       ])
     | WriteToJdbc(
           table_name='users',
           driver_class_name='org.postgresql.Driver',
           jdbc_url='jdbc:postgresql://localhost:5432/mydb',
           username='user',
           password='password'))

JdbcIO Configuration

driver_class_name
string
required
JDBC driver class (e.g., org.postgresql.Driver, com.mysql.jdbc.Driver)
jdbc_url
string
required
JDBC connection URL
username
string
Database username
password
string
Database password
table_name
string
Table name to read/write
query
string
Custom SQL query (alternative to table_name)
statement
string
SQL statement for writing (e.g., INSERT, UPDATE)

Complete Example: Multi-Source Pipeline

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.io import ReadFromAvro, WriteToParquet
from apache_beam.io.gcp.bigquery import WriteToBigQuery

def transform_record(record):
    """Transform record with business logic."""
    return {
        'name': record['name'].upper(),
        'value': record['value'] * 2,
        'timestamp': beam.utils.timestamp.Timestamp.now().to_utc_datetime()
    }

with beam.Pipeline() as p:
    # Read from multiple sources
    text_data = p | 'ReadText' >> ReadFromText('input.txt')
    avro_data = p | 'ReadAvro' >> ReadFromAvro('data.avro')
    
    # Transform and combine
    results = (
        (text_data, avro_data)
        | 'Flatten' >> beam.Flatten()
        | 'Transform' >> beam.Map(transform_record)
    )
    
    # Write to multiple sinks
    results | 'WriteParquet' >> WriteToParquet('output.parquet')
    results | 'WriteBQ' >> WriteToBigQuery('project:dataset.table')

Best Practices

Performance Optimization

  1. Use appropriate file formats:
    • Parquet for analytics (columnar)
    • Avro for schema evolution
    • Text for simplicity
  2. Enable compression:
    • Reduces I/O and storage costs
    • Balance CPU usage vs. bandwidth
  3. Configure sharding:
    • More shards = better parallelism
    • Fewer shards = fewer output files

Error Handling

import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

with beam.Pipeline() as p:
    results = (
        p 
        | ReadFromText('input.txt')
        | beam.Map(lambda x: x.split(','))
        .with_exception_handling(
            error_handler=WriteToText('errors.txt')
        )
    )
    
    results.success | WriteToText('success.txt')

Next Steps

Custom I/O Transforms

Learn how to create custom sources and sinks for your specific needs