Control when and how results are emitted from windows
Triggers determine when Beam emits results for a window. They control the balance between latency (how quickly you get results) and completeness (how much data is included).
Triggers answer the question: “When should we emit results for this window?”From the Apache Beam examples:
/** * This example illustrates the basic concepts behind triggering. It shows how to use * different trigger definitions to produce partial (speculative) results before all * the data is processed and to control when updated results are produced for late data. * * Concepts: * 1. The default triggering behavior * 2. Late data with the default trigger * 3. How to get speculative estimates * 4. Combining late data and speculative estimates */
Get early results before all data arrives (speculative results)
Handle late data that arrives after the window closes
Balance latency and accuracy based on your requirements
Refine results as more data becomes available
Without triggers, windows only emit results once when the watermark passes the end of the window. Triggers give you fine-grained control over this timing.
import org.apache.beam.sdk.transforms.windowing.AfterPane;// Fire after every 100 elementsPCollection<String> results = input.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering( Repeatedly.forever( AfterPane.elementCountAtLeast(100))) .discardingFiredPanes());
import org.apache.beam.sdk.transforms.windowing.AfterEach;// Fire after 100 elements, then after watermarkPCollection<String> results = input.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering( AfterEach.inOrder( AfterPane.elementCountAtLeast(100), AfterWatermark.pastEndOfWindow())) .discardingFiredPanes());
import org.apache.beam.sdk.transforms.windowing.AfterAll;// Fire when BOTH conditions are metPCollection<String> results = input.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering( AfterAll.of( AfterPane.elementCountAtLeast(100), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)))) .discardingFiredPanes());
import org.apache.beam.sdk.transforms.windowing.AfterFirst;// Fire when EITHER condition is metPCollection<String> results = input.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(5))) .triggering( AfterFirst.of( AfterPane.elementCountAtLeast(1000), AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)))) .discardingFiredPanes());
The most common pattern combines early (speculative) results with on-time and late results:
import org.apache.beam.sdk.transforms.windowing.*;// Early results every minute, final result at watermark, late data every 5 minutesPCollection<KV<String, Integer>> results = input.apply( Window.<KV<String, Integer>>into( FixedWindows.of(Duration.standardMinutes(30))) .triggering( AfterWatermark.pastEndOfWindow() // Speculative early results .withEarlyFirings( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1))) // Handle late data .withLateFirings( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(5)))) .withAllowedLateness(Duration.standardDays(1)) .accumulatingFiredPanes());
This trigger:
Early: Fires every minute with partial results
On-time: Fires when watermark passes end of window
Late: Fires every 5 minutes for late-arriving data