Windowing divides a PCollection’s elements into finite groups based on timestamps. This is essential for processing unbounded (streaming) data and useful for bounded (batch) data analysis.
What is Windowing?
From the Apache Beam source code:
Java (Window.java:38-78)
Python
/**
* {@link Window} logically divides up or groups the elements of a {@link PCollection}
* into finite windows according to a {@link WindowFn}. The output of {@code Window}
* contains the same elements as input, but they have been logically assigned to windows.
*
* Windowing a {@link PCollection} divides the elements into windows based on the
* associated event time for each element. This is especially useful for PCollections
* with unbounded size, since it allows operating on a sub-group of the elements placed
* into a related window.
*
* Several predefined {@link WindowFn}s are provided:
* - {@link FixedWindows} partitions the timestamps into fixed-width intervals.
* - {@link SlidingWindows} places data into overlapping fixed-width intervals.
* - {@link Sessions} groups data into sessions where each item in a window is
* separated from the next by no more than a specified gap.
*/
Why Windowing?
Windowing enables you to:
Process infinite streams by dividing them into finite chunks
Perform time-based aggregations (e.g., hourly totals, daily averages)
Handle late-arriving data with configurable lateness policies
Analyze time-based patterns in your data
All PCollections have a windowing strategy. By default, elements are assigned to a single global window that spans the entire dataset.
Built-in Window Functions
Fixed Windows (Tumbling)
Divide time into non-overlapping, fixed-duration intervals:
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
// 5-minute fixed windows
PCollection < String > windowed = input . apply (
Window. < String > into ( FixedWindows . of ( Duration . standardMinutes ( 5 ))));
// 1-hour windows
PCollection < String > hourly = input . apply (
Window. < String > into ( FixedWindows . of ( Duration . standardHours ( 1 ))));
Use case : Calculating metrics over regular intervals (hourly sales, daily averages)
Time: 0:00 0:05 0:10 0:15 0:20 0:25
Window: [----][----][----][----][----][----]
Sliding Windows (Hopping)
Create overlapping windows with a specified period and duration:
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
// 10-minute windows, starting every 5 minutes
PCollection < String > windowed = input . apply (
Window. < String > into (
SlidingWindows . of ( Duration . standardMinutes ( 10 ))
. every ( Duration . standardMinutes ( 5 ))));
Use case : Moving averages, overlapping analytics
Time: 0:00 0:05 0:10 0:15 0:20
Window1: [----------]
Window2: [----------]
Window3: [----------]
Window4: [----------]
Session Windows
Group events separated by a minimum gap duration:
import org.apache.beam.sdk.transforms.windowing.Sessions;
// Session windows with 10-minute gap
PCollection < String > sessions = input . apply (
Window. < String > into (
Sessions . withGapDuration ( Duration . standardMinutes ( 10 ))));
Use case : User sessions, activity bursts, click streams
Events: * * * * *
Session: [--------] [------]
^gap>10min^ ^gap<10min
Global Windows
The default window - all elements in a single window:
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
// Explicitly specify global windows (usually not needed)
PCollection < String > global = input . apply (
Window. < String > into ( new GlobalWindows ()));
Global windows are the default. You only need to specify them explicitly when changing from another windowing strategy.
Windowing Example: Hourly Traffic Count
Here’s a complete example from the Apache Beam examples:
Java (WindowedWordCount.java)
Python (windowed_wordcount.py)
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.joda.time.Duration;
Pipeline p = Pipeline . create (options);
// Read streaming data
PCollection < String > input = p . apply (
PubsubIO . readStrings (). fromTopic ( "projects/myproject/topics/traffic" ));
// Apply 1-hour fixed windows
PCollection < String > windowed = input . apply (
Window. < String > into ( FixedWindows . of ( Duration . standardHours ( 1 ))));
// Count events per window
PCollection < KV < String , Long >> counts = windowed
. apply ( "ExtractRoute" , ParDo . of ( new ExtractRouteFn ()))
. apply ( "CountPerRoute" , Count . perElement ());
// Write results
counts . apply ( "FormatResults" , ParDo . of ( new FormatResultsFn ()))
. apply ( BigQueryIO . write ()...);
p . run ();
You can access window metadata in your DoFns:
class AddWindowInfoFn extends DoFn < String , String > {
@ ProcessElement
public void processElement (
@ Element String element ,
BoundedWindow window ,
OutputReceiver < String > out ) {
String windowInfo = String . format (
"Element '%s' in window [%s..%s)" ,
element,
window . maxTimestamp (). minus ( window . maxTimestamp ()),
window . maxTimestamp ());
out . output (windowInfo);
}
}
Advanced Windowing Configuration
Allowed Lateness
Specify how long to wait for late data after a window closes:
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.Window;
PCollection < String > windowed = input . apply (
Window. < String > into ( FixedWindows . of ( Duration . standardMinutes ( 5 )))
. withAllowedLateness ( Duration . standardMinutes ( 10 ))
. discardingFiredPanes ());
Late data arriving after the allowed lateness period is discarded . Set this value based on your data’s characteristics and latency requirements.
Timestamp Combiner
Control how timestamps are combined when elements are grouped:
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
PCollection < KV < String , Iterable < Integer >>> grouped = pairs . apply (
Window. < KV < String, Integer >> into ( FixedWindows . of ( Duration . standardMinutes ( 5 )))
. withTimestampCombiner ( TimestampCombiner . EARLIEST )
. apply ( GroupByKey . create ()));
Options:
EARLIEST: Use the earliest timestamp
LATEST: Use the latest timestamp
END_OF_WINDOW: Use the end of the window
Windowing with Aggregations
Windowing is most powerful when combined with aggregations:
// Count events per 5-minute window
PCollection < Long > counts = input
. apply ( Window . into ( FixedWindows . of ( Duration . standardMinutes ( 5 ))))
. apply ( Count . globally (). withoutDefaults ());
// Sum values per key per hour
PCollection < KV < String , Integer >> sums = keyedValues
. apply ( Window . into ( FixedWindows . of ( Duration . standardHours ( 1 ))))
. apply ( Sum . integersPerKey ());
// Average per 10-minute sliding window
PCollection < Double > averages = numbers
. apply ( Window . into (
SlidingWindows . of ( Duration . standardMinutes ( 10 ))
. every ( Duration . standardMinutes ( 5 ))))
. apply ( Mean . globally ());
Watermarks
Watermarks track progress in event time:
A watermark is a timestamp that indicates Beam believes all data with timestamps before this point has been processed. When the watermark passes the end of a window, the window is considered complete.
Event Time: 0:00 0:05 0:10 0:15 0:20
Watermark: ------>--------------->
^
Elements with timestamps
before this are complete
Best Practices
Choose appropriate window sizes : Balance between latency (smaller windows = faster results) and efficiency (larger windows = less overhead).
Session windows for user activity : Use session windows to track user sessions, where activity bursts are separated by periods of inactivity.
Beware of data skew : Ensure your keys are well-distributed across windows to avoid hotspots.
Common Patterns
Hourly aggregations : Use 1-hour fixed windows for reports
Real-time dashboards : Use small fixed windows (1-5 minutes)
Moving averages : Use sliding windows
User sessions : Use session windows with appropriate gap
Daily summaries : Use 1-day fixed windows
Complete Example: Session Analysis
Pipeline p = Pipeline . create (options);
// Read click events
PCollection < ClickEvent > clicks = p . apply (
PubsubIO . read ( ClickEvent . class ). fromTopic ( "clicks" ));
// Group into user sessions (30-minute inactivity gap)
PCollection < KV < String , ClickEvent >> userClicks = clicks . apply (
ParDo . of ( new ExtractUserIdFn ()));
PCollection < KV < String , Iterable < ClickEvent >>> sessions = userClicks
. apply ( Window . into ( Sessions . withGapDuration ( Duration . standardMinutes ( 30 ))))
. apply ( GroupByKey . create ());
// Calculate session metrics
PCollection < SessionMetrics > metrics = sessions . apply (
ParDo . of ( new ComputeSessionMetricsFn ()));
metrics . apply ( BigQueryIO . write ()...);
p . run ();
Next Steps
Triggers Control when windowed results are emitted
Transforms Apply transformations to windowed data