Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt

Use this file to discover all available pages before exploring further.

Coders are used to encode and decode data for transmission and storage in Apache Beam pipelines.

Overview

Coders handle the serialization and deserialization of elements in a PCollection. They are essential for:
  • Moving data between pipeline stages
  • Storing intermediate results
  • Shuffling data in GroupByKey operations
  • Checkpointing and state management

Coder Base Class

Base class for all coders.
class MyCoder(beam.coders.Coder):
    def encode(self, value):
        # Return bytes representation
        return value.encode('utf-8')
    
    def decode(self, encoded):
        # Return decoded value
        return encoded.decode('utf-8')
    
    def is_deterministic(self):
        # Return True if encoding is deterministic
        return True

Methods

encode
(value: T) -> bytes
Encodes a value to bytes.
decode
(encoded: bytes) -> T
Decodes bytes back to the original value.
is_deterministic
() -> bool
Returns whether encoding is deterministic. Required for GroupByKey keys.
estimate_size
(value: T) -> int
Estimates the encoded size in bytes.

Common Coders

BytesCoder

Encodes bytes without modification.
from apache_beam.coders import BytesCoder

coder = BytesCoder()
encoded = coder.encode(b'hello')
decoded = coder.decode(encoded)
Properties:
  • Deterministic: Yes
  • Use case: Binary data, raw bytes

StrUtf8Coder

Encodes strings using UTF-8 encoding.
from apache_beam.coders import StrUtf8Coder

coder = StrUtf8Coder()
encoded = coder.encode('Hello, World!')
decoded = coder.decode(encoded)  # Returns 'Hello, World!'
Properties:
  • Deterministic: Yes
  • Use case: Text data, string elements

VarIntCoder

Encodes integers using variable-length encoding.
from apache_beam.coders import VarIntCoder

coder = VarIntCoder()
encoded = coder.encode(12345)
decoded = coder.decode(encoded)  # Returns 12345
Properties:
  • Deterministic: Yes
  • Use case: Integer values, efficient for small integers

FloatCoder

Encodes floating-point numbers (double precision).
from apache_beam.coders import FloatCoder

coder = FloatCoder()
encoded = coder.encode(3.14159)
decoded = coder.decode(encoded)  # Returns 3.14159
Properties:
  • Deterministic: No (due to floating-point representation)
  • Use case: Floating-point numbers

BooleanCoder

Encodes boolean values.
from apache_beam.coders import BooleanCoder

coder = BooleanCoder()
encoded = coder.encode(True)
decoded = coder.decode(encoded)  # Returns True
Properties:
  • Deterministic: Yes
  • Use case: Boolean flags

TupleCoder

Encodes tuples with a coder for each component.
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder

# Coder for (str, int) tuples
coder = TupleCoder([StrUtf8Coder(), VarIntCoder()])
encoded = coder.encode(('Alice', 30))
decoded = coder.decode(encoded)  # Returns ('Alice', 30)
Properties:
  • Deterministic: If all component coders are deterministic
  • Use case: Key-value pairs, structured data

ListCoder

Encodes lists with a coder for elements.
from apache_beam.coders import ListCoder, VarIntCoder

coder = ListCoder(VarIntCoder())
encoded = coder.encode([1, 2, 3, 4, 5])
decoded = coder.decode(encoded)  # Returns [1, 2, 3, 4, 5]
Properties:
  • Deterministic: If element coder is deterministic
  • Use case: Lists, arrays

IterableCoder

Similar to ListCoder but for iterables.
from apache_beam.coders import IterableCoder, StrUtf8Coder

coder = IterableCoder(StrUtf8Coder())
encoded = coder.encode(['a', 'b', 'c'])
decoded = coder.decode(encoded)  # Returns iterable

MapCoder

Encodes dictionaries.
from apache_beam.coders import MapCoder, StrUtf8Coder, VarIntCoder

coder = MapCoder(StrUtf8Coder(), VarIntCoder())
data = {'a': 1, 'b': 2, 'c': 3}
encoded = coder.encode(data)
decoded = coder.decode(encoded)  # Returns dictionary
Properties:
  • Deterministic: No (dict order may vary)
  • Use case: Dictionary/map data

PickleCoder

Encodes arbitrary Python objects using pickle.
from apache_beam.coders import PickleCoder

class CustomClass:
    def __init__(self, value):
        self.value = value

coder = PickleCoder()
obj = CustomClass(42)
encoded = coder.encode(obj)
decoded = coder.decode(encoded)  # Returns CustomClass instance
Properties:
  • Deterministic: No
  • Use case: Custom objects, fallback coder
PickleCoder is not deterministic and should not be used for GroupByKey keys. Use deterministic coders when possible.

ProtoCoder

Encodes Protocol Buffer messages.
from apache_beam.coders import ProtoCoder
import my_proto_pb2

coder = ProtoCoder(my_proto_pb2.MyMessage)
message = my_proto_pb2.MyMessage(field='value')
encoded = coder.encode(message)
decoded = coder.decode(encoded)
Properties:
  • Deterministic: Yes
  • Use case: Protocol Buffer messages

AvroGenericCoder

Encodes data using Avro schema.
from apache_beam.coders import AvroGenericCoder

schema = {
    'type': 'record',
    'name': 'User',
    'fields': [
        {'name': 'name', 'type': 'string'},
        {'name': 'age', 'type': 'int'}
    ]
}

coder = AvroGenericCoder(schema)
data = {'name': 'Alice', 'age': 30}
encoded = coder.encode(data)
decoded = coder.decode(encoded)

WindowedValueCoder

Encodes windowed values (value + timestamp + windows).
from apache_beam.coders import WindowedValueCoder, StrUtf8Coder
from apache_beam.transforms.window import GlobalWindow

value_coder = StrUtf8Coder()
window_coder = GlobalWindow().get_window_coder()

coder = WindowedValueCoder(value_coder, window_coder)

Coder Registry

Apache Beam automatically selects coders based on type hints.
from apache_beam.coders import registry

# Get coder for a type
coder = registry.get_coder(int)  # Returns VarIntCoder
coder = registry.get_coder(str)  # Returns StrUtf8Coder

# Register custom coder
registry.register_coder(MyCustomType, MyCustomCoder)

Type Hints and Coders

import apache_beam as beam
from apache_beam import typehints

with beam.Pipeline() as p:
    # Type hints help Beam select the right coder
    numbers = (
        p 
        | beam.Create([1, 2, 3])
        .with_output_types(int)
    )
    
    # For custom types
    class Person:
        def __init__(self, name: str, age: int):
            self.name = name
            self.age = age
    
    people = (
        p
        | beam.Create([Person('Alice', 30)])
        .with_output_types(Person)
    )

Deterministic Coders

For GroupByKey operations, keys must use deterministic coders.
import apache_beam as beam
from apache_beam.coders import TupleCoder, StrUtf8Coder, VarIntCoder

with beam.Pipeline() as p:
    # Good: deterministic key coder
    pairs = p | beam.Create([
        ('key1', 1),
        ('key2', 2)
    ])
    grouped = pairs | beam.GroupByKey()
    # Uses TupleCoder([StrUtf8Coder(), VarIntCoder()])
    
    # Bad: non-deterministic key (dict)
    # This would fail:
    # bad_pairs = p | beam.Create([({'a': 1}, 'value')])
    # bad_grouped = bad_pairs | beam.GroupByKey()

Custom Coders

Creating a Custom Coder

import apache_beam as beam
from apache_beam.coders import Coder
import json

class JsonCoder(Coder):
    """Encodes objects as JSON."""
    
    def encode(self, value):
        return json.dumps(value).encode('utf-8')
    
    def decode(self, encoded):
        return json.loads(encoded.decode('utf-8'))
    
    def is_deterministic(self):
        # JSON dict encoding order may vary
        return False
    
    def estimate_size(self, value):
        return len(self.encode(value))

# Use custom coder
with beam.Pipeline() as p:
    data = p | beam.Create([{'name': 'Alice', 'age': 30}])
    data = data | beam.Map(lambda x: x).with_output_types(
        beam.typehints.Dict[str, beam.typehints.Any]
    )

Registering Custom Coders

from apache_beam.coders import registry

class Person:
    def __init__(self, name, age):
        self.name = name
        self.age = age

class PersonCoder(Coder):
    def encode(self, person):
        return f"{person.name},{person.age}".encode('utf-8')
    
    def decode(self, encoded):
        name, age = encoded.decode('utf-8').split(',')
        return Person(name, int(age))
    
    def is_deterministic(self):
        return True

# Register the coder
registry.register_coder(Person, PersonCoder)

# Now Beam will automatically use PersonCoder for Person objects
with beam.Pipeline() as p:
    people = p | beam.Create([Person('Alice', 30), Person('Bob', 25)])
    # PersonCoder is automatically used

FastPrimitivesCoder

Optimized coder for Python primitives.
from apache_beam.coders import FastPrimitivesCoder

coder = FastPrimitivesCoder()

# Can encode various types
encoded_int = coder.encode(42)
encoded_str = coder.encode("hello")
encoded_list = coder.encode([1, 2, 3])

Coder Best Practices

  1. Use deterministic coders for keys: GroupByKey requires deterministic key coders
  2. Prefer built-in coders: They are optimized and well-tested
  3. Specify coders explicitly when needed: Use .with_coder() to set specific coders
  4. Test custom coders: Ensure encode/decode round-trips correctly
  5. Consider size: Efficient coders reduce shuffle costs
import apache_beam as beam
from apache_beam.coders import VarIntCoder

with beam.Pipeline() as p:
    # Explicitly set coder
    numbers = (
        p 
        | beam.Create([1, 2, 3])
        | beam.Map(lambda x: x * 2).with_output_types(int)
    )
    
    # For PCollections after GroupByKey
    pairs = p | beam.Create([('a', 1), ('b', 2)])
    grouped = pairs | beam.GroupByKey()
    # Values are automatically wrapped in IterableCoder