Skip to main content

Introduction to I/O in Apache Beam

Apache Beam provides a rich set of I/O connectors for reading from and writing to various data sources and sinks. I/O transforms enable your pipelines to interact with external storage systems, databases, message queues, and more.

Core I/O Concepts

Sources and Sinks

Beam pipelines typically follow this pattern:
  1. Read data from a source
  2. Transform the data
  3. Write results to a sink
import apache_beam as beam

with beam.Pipeline() as pipeline:
    (pipeline
     | 'Read' >> beam.io.ReadFromText('input.txt')
     | 'Transform' >> beam.Map(lambda x: x.upper())
     | 'Write' >> beam.io.WriteToText('output.txt'))

Reading Data

Beam provides two primary patterns for reading data:
import apache_beam as beam
from apache_beam.io import ReadFromText

# Pattern 1: Direct read transform
with beam.Pipeline() as p:
    lines = p | ReadFromText('gs://bucket/file.txt')

# Pattern 2: Using Read with a source
from apache_beam.io.textio import _TextSource

with beam.Pipeline() as p:
    lines = p | beam.io.Read(_TextSource('file.txt'))

Writing Data

Writing data follows a similar pattern:
import apache_beam as beam

with beam.Pipeline() as p:
    (p 
     | beam.Create(['Hello', 'World'])
     | beam.io.WriteToText('output'))

Bounded vs Unbounded Data

Bounded Sources (Batch)

Bounded sources read a finite amount of data, such as files or database tables. Characteristics:
  • Fixed dataset size
  • Processing completes when all data is read
  • Examples: Files (text, Avro, Parquet), database snapshots
import apache_beam as beam

# Reading bounded data from files
with beam.Pipeline() as p:
    data = p | beam.io.ReadFromText('input/*.txt')
    # Process data...

Unbounded Sources (Streaming)

Unbounded sources continuously read data from streams. Characteristics:
  • Infinite or ongoing data streams
  • Processing continues indefinitely
  • Requires windowing for aggregations
  • Examples: Pub/Sub, Kafka, Kinesis
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub

# Reading unbounded data from Pub/Sub
with beam.Pipeline() as p:
    messages = p | ReadFromPubSub(topic='projects/my-project/topics/my-topic')
    # Apply windowing and process...

File-Based vs Streaming I/O

File-Based I/O

File-based I/O connectors read and write data to file systems:
  • Local files: /path/to/file.txt
  • Cloud Storage: gs://, s3://, wasb://
  • HDFS: hdfs://namenode:port/path
Common file formats:
FormatConnectorUse Case
TextTextIOPlain text, CSV, JSON lines
AvroAvroIOSchema evolution, compact binary
ParquetParquetIOColumnar storage, analytics
TFRecordTFRecordIOTensorFlow integration

Streaming I/O

Streaming I/O connectors interact with message queues and streaming platforms:
SystemConnectorType
Google Pub/SubPubSubIOUnbounded
Apache KafkaKafkaIOUnbounded
AWS KinesisKinesisIOUnbounded
Azure Event HubsEventHubsIOUnbounded

Data Transformation Flow

A typical I/O pattern in Beam:
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText

def process_data(element):
    return element.upper()

with beam.Pipeline() as p:
    (p
     | 'Read Source' >> ReadFromText('input.txt')
     | 'Transform' >> beam.Map(process_data)
     | 'Write Sink' >> WriteToText('output.txt'))

Splittable Sources

Beam automatically splits bounded sources for parallel processing:
from apache_beam.io import BoundedSource

class CustomSource(BoundedSource):
    def split(self, desired_bundle_size, start_position=None, stop_position=None):
        # Split source into bundles for parallel processing
        pass
    
    def estimate_size(self):
        # Estimate total size in bytes
        pass
    
    def read(self, range_tracker):
        # Read data within the given range
        pass

Compression Support

Many file-based I/O connectors support compression:
from apache_beam.io.filesystem import CompressionTypes

# Reading compressed files
p | beam.io.ReadFromText(
    'input.txt.gz',
    compression_type=CompressionTypes.GZIP
)

# Writing compressed files
data | beam.io.WriteToText(
    'output.txt',
    compression_type=CompressionTypes.GZIP
)
Supported compression types:
  • GZIP (.gz)
  • BZIP2 (.bz2)
  • DEFLATE
  • ZSTD
  • LZO
  • AUTO (automatic detection)

Configuration Patterns

Basic Configuration

file_pattern
string
required
File path or glob pattern to read/write
compression_type
CompressionTypes
Compression format (AUTO, GZIP, BZIP2, etc.)
validate
boolean
default:"true"
Whether to validate the source during pipeline construction

Advanced Options

skip_header_lines
int
default:"0"
Number of header lines to skip (TextIO)
coder
Coder
Custom coder for encoding/decoding elements
shard_name_template
string
Template for output shard naming

Performance Considerations

File-Based I/O

  1. Sharding: Distribute reads/writes across multiple workers
  2. Compression: Balance CPU usage vs. I/O bandwidth
  3. Bundle Size: Optimize for parallelism and overhead

Streaming I/O

  1. Watermarks: Track event time progress
  2. Checkpointing: Enable fault tolerance
  3. Throughput: Configure batch sizes and parallelism

Error Handling

import apache_beam as beam
from apache_beam.io import ReadFromText

class SafeRead(beam.DoFn):
    def process(self, element):
        try:
            yield element.decode('utf-8')
        except Exception as e:
            # Log or write to dead letter queue
            yield beam.pvalue.TaggedOutput('errors', element)

with beam.Pipeline() as p:
    results = (
        p 
        | ReadFromText('input.txt')
        | beam.ParDo(SafeRead()).with_outputs('errors', main='processed')
    )
    
    results.processed | 'Write Success' >> beam.io.WriteToText('output.txt')
    results.errors | 'Write Errors' >> beam.io.WriteToText('errors.txt')

Next Steps

Built-in I/O Connectors

Explore the comprehensive list of built-in connectors for various data sources

Custom I/O Transforms

Learn how to create custom sources and sinks for your specific needs

Common Use Cases

ETL Pipelines

import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | 'Extract' >> beam.io.ReadFromAvro('source.avro')
     | 'Transform' >> beam.Map(transform_fn)
     | 'Load' >> beam.io.WriteToBigQuery('project:dataset.table'))

Stream Processing

import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub

with beam.Pipeline() as p:
    (p
     | 'Read Stream' >> ReadFromPubSub(subscription='my-sub')
     | 'Parse' >> beam.Map(json.loads)
     | 'Window' >> beam.WindowInto(beam.window.FixedWindows(60))
     | 'Aggregate' >> beam.CombinePerKey(sum)
     | 'Write' >> beam.io.WriteToText('results'))

Data Migration

import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | 'Read JDBC' >> beam.io.ReadFromJdbc(
           table_name='source_table',
           driver_class_name='org.postgresql.Driver',
           jdbc_url='jdbc:postgresql://host:5432/db')
     | 'Write Parquet' >> beam.io.WriteToParquet('output.parquet'))