Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt

Use this file to discover all available pages before exploring further.

This page demonstrates advanced streaming patterns in Apache Beam, including windowing strategies, trigger mechanisms, and handling late data.

Streaming Word Count

A basic streaming pipeline that reads from PubSub and processes data in fixed windows.
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.transforms import window

def run_streaming_wordcount(input_topic, output_topic):
    pipeline_options = PipelineOptions(streaming=True)
    pipeline_options.view_as(StandardOptions).streaming = True
    
    with beam.Pipeline(options=pipeline_options) as p:
        # Read from PubSub
        messages = (
            p
            | 'Read' >> beam.io.ReadFromPubSub(
                topic=input_topic
            ).with_output_types(bytes)
        )
        
        lines = messages | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
        
        # Apply windowing and count words
        counts = (
            lines
            | 'ExtractWords' >> beam.FlatMap(lambda x: x.split())
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'Window' >> beam.WindowInto(window.FixedWindows(15))
            | 'GroupByKey' >> beam.GroupByKey()
            | 'Sum' >> beam.Map(lambda word_ones: (word_ones[0], sum(word_ones[1])))
        )
        
        # Format and write results
        output = (
            counts
            | 'Format' >> beam.Map(lambda wc: f'{wc[0]}: {wc[1]}'.encode('utf-8'))
            | 'Write' >> beam.io.WriteToPubSub(output_topic)
        )
Key Concepts:
  • Set streaming=True in pipeline options
  • Use WindowInto with FixedWindows for 15-second windows
  • Data is processed continuously as it arrives

Advanced Windowing with Triggers

Demonstrating different trigger types for controlling when results are emitted.

Default Trigger (Watermark-based)

The default trigger fires when the watermark passes the end of the window.
import apache_beam as beam
from apache_beam.transforms import window, trigger
from joda.time import Duration

# Default trigger - fires once when watermark passes window end
default_windowed = (
    data
    | 'FixedWindows' >> beam.WindowInto(
        window.FixedWindows(Duration.standardMinutes(30)),
        trigger=trigger.AfterWatermark(),
        allowed_lateness=Duration.ZERO,
        accumulation_mode=trigger.AccumulationMode.DISCARDING
    )
)
Behavior:
  • Fires once when the watermark passes the window end
  • Produces ON_TIME results
  • Late data is dropped (zero allowed lateness)

Handling Late Data

Allow late data processing with allowed lateness.
from apache_beam.transforms import window, trigger

# Allow late data for up to 1 day
with_late_data = (
    data
    | 'WindowWithLateness' >> beam.WindowInto(
        window.FixedWindows(30 * 60),  # 30 minutes
        trigger=trigger.AfterWatermark(),
        allowed_lateness=24 * 60 * 60,  # 1 day
        accumulation_mode=trigger.AccumulationMode.DISCARDING
    )
    | 'CountPerWindow' >> beam.CombinePerKey(sum)
)
Key Points:
  • Windows stay open for 1 day after watermark passes
  • Each late element triggers a new pane (LATE timing)
  • Use DISCARDING mode to get incremental updates

Speculative Results (Early Firings)

Get early approximations before all data arrives.
from apache_beam.transforms import window, trigger

# Fire early results every minute
speculative = (
    data
    | 'SpeculativeWindow' >> beam.WindowInto(
        window.FixedWindows(30 * 60),
        trigger=trigger.Repeatedly(
            trigger.AfterProcessingTime(60)  # Every 1 minute
        ),
        allowed_lateness=24 * 60 * 60,
        accumulation_mode=trigger.AccumulationMode.ACCUMULATING
    )
    | 'Aggregate' >> beam.CombinePerKey(sum)
)
Use Case:
  • Get quick approximations for dashboards
  • Progressive refinement of results
  • All panes are marked EARLY (no watermark dependency)

Combined Trigger Strategy

Combine early firings, on-time results, and late data handling.
from apache_beam.transforms import window, trigger

# Complete trigger strategy
combined = (
    data
    | 'CombinedTrigger' >> beam.WindowInto(
        window.FixedWindows(30 * 60),
        trigger=trigger.AfterAll(
            trigger.Repeatedly(
                trigger.AfterProcessingTime(60)  # Early: every 1 min
            ),
            trigger.AfterWatermark(
                late=trigger.AfterProcessingTime(5 * 60)  # Late: every 5 min
            )
        ),
        allowed_lateness=24 * 60 * 60,
        accumulation_mode=trigger.AccumulationMode.ACCUMULATING
    )
)
Timeline:
  1. EARLY panes: Every 1 minute before window closes
  2. ON_TIME pane: When watermark passes window end
  3. LATE panes: Every 5 minutes after window closes

Windowing with Timestamps

Access window information in your pipeline for metadata enrichment.
import apache_beam as beam
from apache_beam.transforms import window

class FormatWithWindow(beam.DoFn):
    def process(self, element, window=beam.DoFn.WindowParam):
        """Add window start/end times to output."""
        word, count = element
        window_start = window.start.to_utc_datetime()
        window_end = window.end.to_utc_datetime()
        
        yield {
            'word': word,
            'count': count,
            'window_start': window_start.isoformat(),
            'window_end': window_end.isoformat()
        }

# Apply in pipeline
results = (
    windowed_counts
    | 'AddWindowInfo' >> beam.ParDo(FormatWithWindow())
)

Session Windows

Group events based on activity sessions with gaps of inactivity.
from apache_beam.transforms import window

# Create session windows with 10-minute gaps
sessions = (
    events
    | 'SessionWindows' >> beam.WindowInto(
        window.Sessions(10 * 60)  # 10-minute gap duration
    )
    | 'CountPerSession' >> beam.combiners.Count.PerElement()
)
Use Cases:
  • User session analytics
  • Detecting periods of activity
  • Grouping related events

Sliding Windows

Create overlapping windows for moving averages and continuous analysis.
from apache_beam.transforms import window

# 1-hour windows, sliding every 5 minutes
sliding = (
    metrics
    | 'SlidingWindows' >> beam.WindowInto(
        window.SlidingWindows(
            size=60 * 60,      # 1 hour window
            period=5 * 60      # Slide every 5 minutes
        )
    )
    | 'ComputeAverage' >> beam.CombinePerKey(beam.combiners.MeanCombineFn())
)

Best Practices

Choose Appropriate Windows

  • Fixed windows for regular intervals
  • Session windows for user activity
  • Sliding windows for moving calculations

Configure Allowed Lateness

  • Balance completeness vs. resource usage
  • Consider your data’s lateness characteristics
  • Use watermark estimators for better accuracy

Select Accumulation Mode

  • DISCARDING for independent updates
  • ACCUMULATING for cumulative results
  • Consider storage and computation trade-offs

Monitor Watermarks

  • Track watermark lag in production
  • Adjust allowed lateness based on metrics
  • Use custom watermark estimators if needed

Common Patterns

Traffic Analysis Example

Based on examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java:160-337
// Process traffic sensor data with multiple trigger strategies
PCollection<KV<String, Integer>> flowInfo = /* input data */;

// 1. Default trigger - watermark only
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes());

// 2. With allowed lateness - capture late data
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
        .discardingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

// 3. Speculative - early approximations
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(Repeatedly.forever(
            AfterProcessingTime.pastFirstElementInPane()
                .plusDelayOf(Duration.standardMinutes(1))))
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

// 4. Sequential - early, on-time, and late
flowInfo
    .apply(Window
        .<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(30)))
        .triggering(AfterEach.inOrder(
            Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(1)))
                .orFinally(AfterWatermark.pastEndOfWindow()),
            Repeatedly.forever(
                AfterProcessingTime.pastFirstElementInPane()
                    .plusDelayOf(Duration.standardMinutes(5)))))
        .accumulatingFiredPanes()
        .withAllowedLateness(Duration.standardDays(1)));

Windowing Guide

Learn windowing fundamentals

Triggers Guide

Deep dive into trigger mechanisms

Watermarks

Understanding watermarks and event time

Streaming I/O

Streaming sources and sinks