Skip to main content
A PCollection (Parallel Collection) represents a distributed dataset that your Beam pipeline operates on. It is the fundamental data abstraction in Apache Beam.

What is a PCollection?

From the Apache Beam source code:
/**
 * A {@link PCollection PCollection<T>} is an immutable collection of values of type {@code T}. 
 * A {@link PCollection} can contain either a bounded or unbounded number of elements. 
 * Bounded and unbounded {@link PCollection PCollections} are produced as the output of 
 * {@link PTransform PTransforms}.
 *
 * Each element in a {@link PCollection} has an associated timestamp. Readers assign 
 * timestamps to elements when they create {@link PCollection PCollections}, and other 
 * {@link PTransform PTransforms} propagate these timestamps from their input to their output.
 *
 * Additionally, a {@link PCollection} has an associated {@link WindowFn} and each element 
 * is assigned to a set of windows. By default, the windowing function is {@link GlobalWindows} 
 * and all elements are assigned into a single default window.
 */

Key Characteristics

1. Immutability

PCollections are immutable. Once created, you cannot modify a PCollection’s contents. Transforms create new PCollections instead:
PCollection<String> lines = p.apply(TextIO.read().from("input.txt"));
// This creates a NEW PCollection, doesn't modify 'lines'
PCollection<String> words = lines.apply(new ExtractWords());
Immutability enables parallel processing and fault tolerance. Beam can retry failed operations safely because data doesn’t change.

2. Distributed

PCollections are distributed across multiple workers. You don’t control where elements are processed:
# Elements are automatically distributed
lines = p | beam.io.ReadFromText('large_file.txt')  # Millions of lines
# Beam distributes these across available workers

3. Element Types

PCollections are strongly typed:
PCollection<String> strings = ...;        // Collection of strings
PCollection<Integer> numbers = ...;       // Collection of integers
PCollection<KV<String, Integer>> pairs = ...;  // Key-value pairs
PCollection<MyCustomClass> objects = ...; // Custom objects

Bounded vs Unbounded PCollections

Bounded PCollections (Batch)

Bounded PCollections have a fixed size - they represent finite datasets:
// Reading a file creates a bounded PCollection
PCollection<String> lines = p.apply(
    TextIO.read().from("gs://bucket/file.txt"));

// Creating from in-memory data
PCollection<String> words = p.apply(
    Create.of("hello", "world", "beam"));

Unbounded PCollections (Streaming)

Unbounded PCollections have infinite size - they represent continuous data streams:
// Reading from Pub/Sub creates unbounded PCollection
PCollection<String> messages = p.apply(
    PubsubIO.readStrings()
        .fromTopic("projects/myproject/topics/mytopic"));

// Reading from Kafka
PCollection<KV<Long, String>> records = p.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers("localhost:9092")
        .withTopic("my-topic")
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class));
The same transforms work on both bounded and unbounded PCollections. Windowing and triggering become more important for unbounded data.

Element Properties

Timestamps

Every element in a PCollection has an associated timestamp:
// Elements get timestamps from the source
PCollection<String> lines = p.apply(TextIO.read()...);
// Default timestamp for file elements is MIN_TIMESTAMP

// You can assign custom timestamps
PCollection<String> timestamped = lines.apply(
    ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(
            @Element String element,
            OutputReceiver<String> out) {
            Instant timestamp = new Instant();
            out.outputWithTimestamp(element, timestamp);
        }
    }));

Windows

Elements are organized into windows for processing:
// Default: all elements in a single global window
PCollection<String> data = ...;

// Apply fixed-time windows
PCollection<String> windowed = data.apply(
    Window.into(FixedWindows.of(Duration.standardMinutes(5))));
Windowing is covered in detail in the Windowing section.

Creating PCollections

From External Sources

Most commonly, PCollections are created by reading from external sources:
PCollection<String> lines = p.apply(
    TextIO.read().from("gs://bucket/*.txt"));

From In-Memory Data

For testing or small datasets, create PCollections from in-memory data:
import org.apache.beam.sdk.transforms.Create;

// Create from varargs
PCollection<String> words = p.apply(
    Create.of("hello", "world", "beam"));

// Create from collection
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
PCollection<Integer> pNumbers = p.apply(Create.of(numbers));

// Create with timestamps
PCollection<String> timestamped = p.apply(
    Create.timestamped(
        TimestampedValue.of("hello", new Instant(0)),
        TimestampedValue.of("world", new Instant(1000))));

Working with PCollections

Applying Transforms

Transforms are applied using the apply() method (or | operator in Python):
PCollection<String> input = ...;

// Apply a single transform
PCollection<Integer> lengths = input.apply(
    MapElements.into(TypeDescriptors.integers())
        .via(String::length));

// Chain multiple transforms
PCollection<KV<String, Long>> result = input
    .apply("Split", ParDo.of(new SplitWords()))
    .apply("Count", Count.perElement());

Element-wise Transforms

Process each element independently:
PCollection<String> upper = strings.apply(
    MapElements.into(TypeDescriptors.strings())
        .via(String::toUpperCase));

PCollection<String> filtered = strings.apply(
    Filter.by(s -> s.length() > 5));

Aggregating Transforms

Combine elements together:
// Count elements
PCollection<Long> count = strings.apply(Count.globally());

// Group by key
PCollection<KV<String, Integer>> pairs = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = 
    pairs.apply(GroupByKey.create());

// Combine values
PCollection<KV<String, Integer>> sums = 
    pairs.apply(Sum.integersPerKey());

Coders

Coders specify how to serialize PCollection elements. Beam usually infers coders automatically:
// Beam infers coder for common types
PCollection<String> strings = ...;  // Uses StringUtf8Coder
PCollection<Integer> ints = ...;    // Uses VarIntCoder

// Explicitly set a coder if needed
PCollection<MyClass> objects = input
    .apply(...)
    .setCoder(MyClassCoder.of());
Custom classes need custom coders. Ensure your classes are serializable or provide a custom coder.

Best Practices

Size considerations: PCollections can be huge. Never try to collect all elements into memory using a local collection.
Type safety: Use type descriptors (Java) or type hints (Python) to catch type errors early.

Do’s and Don’ts

Do:
  • ✅ Apply transforms to process PCollections
  • ✅ Use descriptive names for intermediate PCollections
  • ✅ Leverage Beam’s built-in transforms when possible
  • ✅ Test with small datasets using Create
Don’t:
  • ❌ Try to iterate over a PCollection locally
  • ❌ Modify elements in place (use transforms)
  • ❌ Access PCollection size directly (use Count)
  • ❌ Make assumptions about element order

Debugging PCollections

To inspect PCollection contents during development:
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class LoggingDoFn<T> extends DoFn<T, T> {
    private static final Logger LOG = LoggerFactory.getLogger(LoggingDoFn.class);
    
    @ProcessElement
    public void processElement(@Element T element, OutputReceiver<T> out) {
        LOG.info("Element: {}", element);
        out.output(element);
    }
}

// Use it
PCollection<String> logged = input.apply(ParDo.of(new LoggingDoFn<>()));

Next Steps

Transforms

Learn how to transform PCollections

Windowing

Divide unbounded PCollections into windows