Overview
Apache Beam provides a rich ecosystem of built-in I/O connectors for interacting with various data sources and sinks. This guide covers the most commonly used connectors with practical examples.
File-Based I/O Connectors
TextIO
Read and write plain text files line-by-line.
Reading Text Files
import apache_beam as beam
from apache_beam.io import ReadFromText
with beam.Pipeline() as p:
lines = p | ReadFromText( 'input.txt' )
# With glob pattern
lines = p | ReadFromText( 'data/*.txt' )
# Skip header lines
lines = p | ReadFromText( 'data.csv' , skip_header_lines = 1 )
# With compression
from apache_beam.io.filesystem import CompressionTypes
lines = p | ReadFromText(
'input.txt.gz' ,
compression_type = CompressionTypes. GZIP
)
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.values.PCollection;
Pipeline p = Pipeline . create ();
// Basic read
PCollection < String > lines = p . apply (
TextIO . read (). from ( "input.txt" )
);
// With glob pattern
PCollection < String > lines = p . apply (
TextIO . read (). from ( "data/*.txt" )
);
// Skip header lines
PCollection < String > lines = p . apply (
TextIO . read ()
. from ( "data.csv" )
. withDelimiter ( new byte []{ ' \n ' })
);
p . run (). waitUntilFinish ();
Writing Text Files
import apache_beam as beam
from apache_beam.io import WriteToText
with beam.Pipeline() as p:
(p
| beam.Create([ 'line1' , 'line2' , 'line3' ])
| WriteToText( 'output.txt' ))
# With custom sharding
(p
| beam.Create(data)
| WriteToText(
'output' ,
file_name_suffix = '.txt' ,
num_shards = 5 ))
# With compression
from apache_beam.io.filesystem import CompressionTypes
(p
| beam.Create(data)
| WriteToText(
'output.txt' ,
compression_type = CompressionTypes. GZIP ))
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.transforms.Create;
Pipeline p = Pipeline . create ();
// Basic write
p . apply ( Create . of ( "line1" , "line2" , "line3" ))
. apply ( TextIO . write (). to ( "output.txt" ));
// With custom sharding
data . apply ( TextIO . write ()
. to ( "output" )
. withSuffix ( ".txt" )
. withNumShards ( 5 ));
// With compression
data . apply ( TextIO . write ()
. to ( "output.txt" )
. withCompression ( Compression . GZIP ));
p . run (). waitUntilFinish ();
TextIO Configuration
File path or glob pattern (supports * and ? wildcards)
compression_type
CompressionTypes
default: "AUTO"
Compression format: AUTO, GZIP, BZIP2, DEFLATE, ZSTD, UNCOMPRESSED
Number of header lines to skip when reading
Whether to validate file existence during pipeline construction
Number of output shards (files). If not specified, runner decides automatically
Template for naming shards (e.g., -SSSSS-of-NNNNN)
AvroIO
Read and write Apache Avro files with schema support.
import apache_beam as beam
from apache_beam.io import ReadFromAvro, WriteToAvro
# Define schema
schema = {
'namespace' : 'example.avro' ,
'type' : 'record' ,
'name' : 'User' ,
'fields' : [
{ 'name' : 'name' , 'type' : 'string' },
{ 'name' : 'age' , 'type' : 'int' },
{ 'name' : 'email' , 'type' : [ 'string' , 'null' ]}
]
}
# Reading Avro files
with beam.Pipeline() as p:
records = p | ReadFromAvro( 'users.avro' )
# Returns dictionaries: {'name': 'Alice', 'age': 30, 'email': 'alice@example.com'}
# Reading with schema
records = p | ReadFromAvro(
'users.avro' ,
use_fastavro = True # Use fastavro for better performance
)
# Writing Avro files
with beam.Pipeline() as p:
users = [
{ 'name' : 'Alice' , 'age' : 30 , 'email' : 'alice@example.com' },
{ 'name' : 'Bob' , 'age' : 25 , 'email' : 'bob@example.com' }
]
(p
| beam.Create(users)
| WriteToAvro(
'output.avro' ,
schema = schema,
use_fastavro = True ))
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.io.AvroIO;
// Define schema
String schemaJson = "{ \" namespace \" : \" example.avro \" ," +
" \" type \" : \" record \" ," +
" \" name \" : \" User \" ," +
" \" fields \" : [" +
"{ \" name \" : \" name \" , \" type \" : \" string \" }," +
"{ \" name \" : \" age \" , \" type \" : \" int \" }" +
"]}" ;
Schema schema = new Schema. Parser (). parse (schemaJson);
Pipeline p = Pipeline . create ();
// Reading Avro
PCollection < GenericRecord > records = p . apply (
AvroIO . readGenericRecords (schema)
. from ( "users.avro" )
);
// Writing Avro
records . apply (
AvroIO . writeGenericRecords (schema)
. to ( "output.avro" )
);
p . run (). waitUntilFinish ();
AvroIO Configuration
File path or glob pattern for Avro files
Avro schema definition (for writing)
Use fastavro library for better performance (Python)
Compression codec: null, deflate, snappy, bzip2, zstandard
ParquetIO
Read and write Apache Parquet files for columnar storage.
import apache_beam as beam
from apache_beam.io import ReadFromParquet, WriteToParquet
import pyarrow as pa
# Define PyArrow schema
schema = pa.schema([
( 'name' , pa.string()),
( 'age' , pa.int64()),
( 'salary' , pa.float64()),
])
# Reading Parquet
with beam.Pipeline() as p:
records = p | ReadFromParquet( 'data.parquet' )
# Returns dictionaries
# Reading with columns selection
records = p | ReadFromParquet(
'data.parquet' ,
columns = [ 'name' , 'age' ] # Read only specific columns
)
# Writing Parquet
with beam.Pipeline() as p:
data = [
{ 'name' : 'Alice' , 'age' : 30 , 'salary' : 75000.0 },
{ 'name' : 'Bob' , 'age' : 25 , 'salary' : 65000.0 }
]
(p
| beam.Create(data)
| WriteToParquet(
'output.parquet' ,
schema = schema,
codec = 'snappy' ))
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.avro.generic.GenericRecord;
Pipeline p = Pipeline . create ();
// Reading Parquet
PCollection < GenericRecord > records = p . apply (
ParquetIO . read (schema)
. from ( "data.parquet" )
);
// Writing Parquet
records . apply (
ParquetIO . write (schema)
. to ( "output.parquet" )
);
p . run (). waitUntilFinish ();
ParquetIO Configuration
File path or glob pattern
Schema definition (required for writing)
List of columns to read (enables column pruning)
Compression codec: none, snappy, gzip, lzo, brotli, lz4, zstd
Cloud Storage I/O Connectors
BigQueryIO
Read from and write to Google BigQuery.
import apache_beam as beam
from apache_beam.io.gcp.bigquery import ReadFromBigQuery, WriteToBigQuery
# Reading from BigQuery
with beam.Pipeline() as p:
# Read from table
rows = p | ReadFromBigQuery(
table = 'project:dataset.table'
)
# Read with SQL query
rows = p | ReadFromBigQuery(
query = 'SELECT name, age FROM `project.dataset.table` WHERE age > 25'
)
# Read with table reference
rows = p | ReadFromBigQuery(
project = 'my-project' ,
dataset = 'my-dataset' ,
table = 'my-table'
)
# Writing to BigQuery
with beam.Pipeline() as p:
# Define schema
schema = {
'fields' : [
{ 'name' : 'name' , 'type' : 'STRING' , 'mode' : 'REQUIRED' },
{ 'name' : 'age' , 'type' : 'INTEGER' , 'mode' : 'NULLABLE' },
{ 'name' : 'email' , 'type' : 'STRING' , 'mode' : 'NULLABLE' }
]
}
data = [
{ 'name' : 'Alice' , 'age' : 30 , 'email' : 'alice@example.com' },
{ 'name' : 'Bob' , 'age' : 25 , 'email' : 'bob@example.com' }
]
(p
| beam.Create(data)
| WriteToBigQuery(
'project:dataset.table' ,
schema = schema,
create_disposition = beam.io.BigQueryDisposition. CREATE_IF_NEEDED ,
write_disposition = beam.io.BigQueryDisposition. WRITE_APPEND ))
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
Pipeline p = Pipeline . create ();
// Reading from BigQuery
PCollection < TableRow > rows = p . apply (
BigQueryIO . readTableRows ()
. from ( "project:dataset.table" )
);
// Reading with query
PCollection < TableRow > rows = p . apply (
BigQueryIO . readTableRows ()
. fromQuery ( "SELECT name, age FROM `project.dataset.table` WHERE age > 25" )
. usingStandardSql ()
);
// Writing to BigQuery
TableSchema schema = new TableSchema (). setFields (
Arrays . asList (
new TableFieldSchema (). setName ( "name" ). setType ( "STRING" ). setMode ( "REQUIRED" ),
new TableFieldSchema (). setName ( "age" ). setType ( "INTEGER" ). setMode ( "NULLABLE" )
)
);
rows . apply (
BigQueryIO . writeTableRows ()
. to ( "project:dataset.table" )
. withSchema (schema)
. withCreateDisposition ( BigQueryIO . Write . CreateDisposition . CREATE_IF_NEEDED )
. withWriteDisposition ( BigQueryIO . Write . WriteDisposition . WRITE_APPEND )
);
p . run (). waitUntilFinish ();
BigQueryIO Configuration
Table reference in format project:dataset.table or dataset.table
SQL query to execute (alternative to table)
Table schema (required for writing if table doesn’t exist)
create_disposition
CreateDisposition
default: "CREATE_IF_NEEDED"
CREATE_IF_NEEDED: Create table if it doesn’t exist
CREATE_NEVER: Fail if table doesn’t exist
write_disposition
WriteDisposition
default: "WRITE_APPEND"
WRITE_APPEND: Append to existing data
WRITE_TRUNCATE: Overwrite existing data
WRITE_EMPTY: Fail if table is not empty
Write method: DEFAULT, FILE_LOADS, STREAMING_INSERTS, STORAGE_WRITE_API
PubSubIO
Read from and write to Google Cloud Pub/Sub (streaming).
import apache_beam as beam
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub
import json
# Reading from Pub/Sub
with beam.Pipeline() as p:
# Read messages as bytes
messages = p | ReadFromPubSub(
topic = 'projects/my-project/topics/my-topic'
)
# Read from subscription
messages = p | ReadFromPubSub(
subscription = 'projects/my-project/subscriptions/my-sub'
)
# Read with attributes
messages = p | ReadFromPubSub(
topic = 'projects/my-project/topics/my-topic' ,
with_attributes = True
)
# Returns PubsubMessage(data=b'...', attributes={...})
# Writing to Pub/Sub
with beam.Pipeline() as p:
(p
| beam.Create([ 'message1' , 'message2' ])
| beam.Map( lambda x : x.encode( 'utf-8' ))
| WriteToPubSub(
topic = 'projects/my-project/topics/output-topic' ))
# Write with attributes
def add_attributes ( element ):
from apache_beam.io.gcp.pubsub import PubsubMessage
return PubsubMessage(
data = json.dumps(element).encode( 'utf-8' ),
attributes = { 'timestamp' : str (element[ 'time' ])}
)
(p
| beam.Create(data)
| beam.Map(add_attributes)
| WriteToPubSub(
topic = 'projects/my-project/topics/output' ,
with_attributes = True ))
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
Pipeline p = Pipeline . create ();
// Reading from Pub/Sub
PCollection < String > messages = p . apply (
PubsubIO . readStrings ()
. fromTopic ( "projects/my-project/topics/my-topic" )
);
// Read from subscription
PCollection < String > messages = p . apply (
PubsubIO . readStrings ()
. fromSubscription ( "projects/my-project/subscriptions/my-sub" )
);
// Writing to Pub/Sub
messages . apply (
PubsubIO . writeStrings ()
. to ( "projects/my-project/topics/output-topic" )
);
p . run ();
PubSubIO Configuration
Topic name in format projects/{project}/topics/{topic}
Subscription name in format projects/{project}/subscriptions/{subscription}
Whether to include message attributes
Attribute key for message deduplication
Attribute key containing message timestamp
Message Queue I/O Connectors
KafkaIO
Read from and write to Apache Kafka.
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka, WriteToKafka
# Reading from Kafka
with beam.Pipeline() as p:
messages = p | ReadFromKafka(
consumer_config = {
'bootstrap.servers' : 'localhost:9092' ,
'group.id' : 'my-group'
},
topics = [ 'my-topic' ],
max_num_records = 1000 , # For bounded read
start_read_time = 0 # Unix timestamp in seconds
)
# Returns KV pairs: (key, value)
# Writing to Kafka
with beam.Pipeline() as p:
(p
| beam.Create([( 'key1' , b 'value1' ), ( 'key2' , b 'value2' )])
| WriteToKafka(
producer_config = {
'bootstrap.servers' : 'localhost:9092'
},
topic = 'output-topic' ))
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
Pipeline p = Pipeline . create ();
// Reading from Kafka
PCollection < KV < String , String >> messages = p . apply (
KafkaIO. < String, String > read ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "my-topic" )
. withKeyDeserializer ( StringDeserializer . class )
. withValueDeserializer ( StringDeserializer . class )
. withConsumerConfigUpdates (
ImmutableMap . of ( "group.id" , "my-group" )
)
);
// Writing to Kafka
messages . apply (
KafkaIO. < String, String > write ()
. withBootstrapServers ( "localhost:9092" )
. withTopic ( "output-topic" )
. withKeySerializer ( StringSerializer . class )
. withValueSerializer ( StringSerializer . class )
);
p . run ();
KafkaIO Configuration
Kafka broker addresses (comma-separated)
List of topics to read from
Maximum records to read (makes source bounded)
Start reading from this timestamp (Unix seconds)
Database I/O Connectors
JdbcIO
Read from and write to JDBC-compatible databases.
import apache_beam as beam
from apache_beam.io.jdbc import ReadFromJdbc, WriteToJdbc
# Reading from JDBC
with beam.Pipeline() as p:
rows = p | ReadFromJdbc(
table_name = 'users' ,
driver_class_name = 'org.postgresql.Driver' ,
jdbc_url = 'jdbc:postgresql://localhost:5432/mydb' ,
username = 'user' ,
password = 'password'
)
# Read with custom query
rows = p | ReadFromJdbc(
query = 'SELECT * FROM users WHERE age > 25' ,
driver_class_name = 'org.postgresql.Driver' ,
jdbc_url = 'jdbc:postgresql://localhost:5432/mydb' ,
username = 'user' ,
password = 'password'
)
# Writing to JDBC
with beam.Pipeline() as p:
(p
| beam.Create([
beam.Row( name = 'Alice' , age = 30 ),
beam.Row( name = 'Bob' , age = 25 )
])
| WriteToJdbc(
table_name = 'users' ,
driver_class_name = 'org.postgresql.Driver' ,
jdbc_url = 'jdbc:postgresql://localhost:5432/mydb' ,
username = 'user' ,
password = 'password' ))
import org.apache.beam.sdk.io.jdbc.JdbcIO;
Pipeline p = Pipeline . create ();
// Reading from JDBC
PCollection < Row > rows = p . apply (
JdbcIO. < Row > read ()
. withDataSourceConfiguration (
JdbcIO . DataSourceConfiguration . create (
"org.postgresql.Driver" ,
"jdbc:postgresql://localhost:5432/mydb"
)
. withUsername ( "user" )
. withPassword ( "password" )
)
. withQuery ( "SELECT * FROM users" )
. withRowMapper ( new JdbcIO . RowMapper < Row >() {
public Row mapRow ( ResultSet resultSet ) throws Exception {
return Row . withSchema (schema)
. addValue ( resultSet . getString ( "name" ))
. addValue ( resultSet . getInt ( "age" ))
. build ();
}
})
);
// Writing to JDBC
rows . apply (
JdbcIO. < Row > write ()
. withDataSourceConfiguration (
JdbcIO . DataSourceConfiguration . create (
"org.postgresql.Driver" ,
"jdbc:postgresql://localhost:5432/mydb"
)
. withUsername ( "user" )
. withPassword ( "password" )
)
. withStatement ( "INSERT INTO users (name, age) VALUES (?, ?)" )
. withPreparedStatementSetter ((element, statement) -> {
statement . setString ( 1 , element . getString ( "name" ));
statement . setInt ( 2 , element . getInt ( "age" ));
})
);
p . run (). waitUntilFinish ();
JdbcIO Configuration
JDBC driver class (e.g., org.postgresql.Driver, com.mysql.jdbc.Driver)
Custom SQL query (alternative to table_name)
SQL statement for writing (e.g., INSERT, UPDATE)
Complete Example: Multi-Source Pipeline
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.io import ReadFromAvro, WriteToParquet
from apache_beam.io.gcp.bigquery import WriteToBigQuery
def transform_record ( record ):
"""Transform record with business logic."""
return {
'name' : record[ 'name' ].upper(),
'value' : record[ 'value' ] * 2 ,
'timestamp' : beam.utils.timestamp.Timestamp.now().to_utc_datetime()
}
with beam.Pipeline() as p:
# Read from multiple sources
text_data = p | 'ReadText' >> ReadFromText( 'input.txt' )
avro_data = p | 'ReadAvro' >> ReadFromAvro( 'data.avro' )
# Transform and combine
results = (
(text_data, avro_data)
| 'Flatten' >> beam.Flatten()
| 'Transform' >> beam.Map(transform_record)
)
# Write to multiple sinks
results | 'WriteParquet' >> WriteToParquet( 'output.parquet' )
results | 'WriteBQ' >> WriteToBigQuery( 'project:dataset.table' )
Best Practices
Use appropriate file formats :
Parquet for analytics (columnar)
Avro for schema evolution
Text for simplicity
Enable compression :
Reduces I/O and storage costs
Balance CPU usage vs. bandwidth
Configure sharding :
More shards = better parallelism
Fewer shards = fewer output files
Error Handling
import apache_beam as beam
from apache_beam.io import ReadFromText, WriteToText
with beam.Pipeline() as p:
results = (
p
| ReadFromText( 'input.txt' )
| beam.Map( lambda x : x.split( ',' ))
.with_exception_handling(
error_handler = WriteToText( 'errors.txt' )
)
)
results.success | WriteToText( 'success.txt' )
Next Steps
Custom I/O Transforms Learn how to create custom sources and sinks for your specific needs