Skip to main content
The FlinkRunner executes Apache Beam pipelines on Apache Flink clusters, providing high-throughput, low-latency stream and batch processing.

Overview

Apache Flink is a distributed stream processing framework that excels at:
  • Stream Processing: True streaming with low latency and high throughput
  • Exactly-Once Processing: Strong consistency guarantees with checkpointing
  • State Management: Distributed stateful computations
  • Event Time Processing: Native support for event time and watermarks

When to Use FlinkRunner

Best For

  • Real-time streaming applications
  • Stateful stream processing
  • Low-latency requirements
  • Exactly-once processing semantics
  • Complex event processing
  • Existing Flink infrastructure

Consider Alternatives

  • Simple batch jobs (DirectRunner)
  • GCP-based workloads (DataflowRunner)
  • Existing Spark clusters (SparkRunner)
  • Local development (PrismRunner)

Setup and Configuration

Prerequisites

  • Apache Flink cluster (1.15 or later recommended)
  • Java 8 or later
  • Network access to Flink JobManager

Dependencies

Add to your pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-flink-1.18</artifactId>
  <version>{beam-version}</version>
</dependency>
For Gradle:
implementation 'org.apache.beam:beam-runners-flink-1.18:{beam-version}'
Replace 1.18 with your Flink version. Supported versions include 1.15, 1.16, 1.17, and 1.18.
# Download Flink
wget https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz
tar -xzf flink-1.18.0-bin-scala_2.12.tgz
cd flink-1.18.0

# Start local cluster
./bin/start-cluster.sh

# Access web UI at http://localhost:8081
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  name: beam-flink-cluster
spec:
  image: flink:1.18
  flinkVersion: v1_18
  jobManager:
    resource:
      memory: "2048m"
      cpu: 1
  taskManager:
    resource:
      memory: "2048m"
      cpu: 1
    replicas: 2

Running a Pipeline

Basic Example

import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class MyFlinkPipeline {
  public static void main(String[] args) {
    FlinkPipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(FlinkPipelineOptions.class);
    
    // Set the runner
    options.setRunner(FlinkRunner.class);
    
    // Flink master address
    options.setFlinkMaster("localhost:8081");
    
    // Parallelism
    options.setParallelism(4);
    
    Pipeline p = Pipeline.create(options);
    
    // Build your pipeline
    p.apply(/* your transforms */);
    
    // Execute on Flink
    p.run().waitUntilFinish();
  }
}

Execution Modes

Local Mode

Run with an embedded Flink cluster:
FlinkPipelineOptions options = 
    PipelineOptionsFactory.as(FlinkPipelineOptions.class);

options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");  // Embedded local cluster
options.setParallelism(2);

Cluster Mode

Submit to an existing Flink cluster:
# Java
mvn package

flink run \
  -c com.example.MyPipeline \
  target/my-pipeline-bundled.jar \
  --runner=FlinkRunner \
  --flinkMaster=localhost:8081 \
  --parallelism=4

Auto Mode

options.setFlinkMaster("[auto]");  // Detect based on environment

FlinkPipelineOptions

Core Options

Address of the Flink JobManager. Can be:
  • host:port - Connect to remote cluster
  • [local] - Start local embedded cluster
  • [auto] - Auto-detect based on environment
options.setFlinkMaster("localhost:8081");
parallelism
integer
default:"-1"
Degree of parallelism for the pipeline. -1 uses Flink’s default.
options.setParallelism(8);
maxParallelism
integer
default:"-1"
Maximum degree of parallelism. Sets upper limit for dynamic scaling.
options.setMaxParallelism(128);

Checkpointing Options

checkpointingInterval
long
default:"-1"
Interval in milliseconds for triggering checkpoints. -1 disables checkpointing.
options.setCheckpointingInterval(60000L); // Every 60 seconds
checkpointingMode
string
default:"EXACTLY_ONCE"
Checkpointing mode: EXACTLY_ONCE or AT_LEAST_ONCE.
options.setCheckpointingMode("EXACTLY_ONCE");
checkpointTimeoutMillis
long
default:"-1"
Maximum time in milliseconds for a checkpoint to complete.
options.setCheckpointTimeoutMillis(600000L); // 10 minutes
minPauseBetweenCheckpoints
long
default:"-1"
Minimum pause in milliseconds between checkpoints.
options.setMinPauseBetweenCheckpoints(5000L); // 5 seconds
numConcurrentCheckpoints
integer
default:"1"
Maximum number of concurrent checkpoints.
options.setNumConcurrentCheckpoints(1);

State Backend Options

stateBackend
string
State backend for storing state. Options: filesystem, rocksdb, memory.
options.setStateBackend("rocksdb");
checkpointingDirectory
string
Directory for storing checkpoints.
options.setCheckpointingDirectory("hdfs:///checkpoints");

Streaming Options

streaming
boolean
default:"false"
Enable streaming mode for unbounded sources.
options.setStreaming(true);
autoWatermarkInterval
long
Interval for automatic watermark emission in milliseconds.
options.setAutoWatermarkInterval(200L);

Advanced Configuration

Exactly-Once Processing

Configure for exactly-once semantics:
FlinkPipelineOptions options = 
    PipelineOptionsFactory.as(FlinkPipelineOptions.class);

// Enable checkpointing for exactly-once
options.setCheckpointingInterval(60000L);
options.setCheckpointingMode("EXACTLY_ONCE");
options.setCheckpointingDirectory("hdfs:///beam/checkpoints");

// State backend
options.setStateBackend("rocksdb");

// Fault tolerance
options.setFailOnCheckpointingErrors(true);
options.setMinPauseBetweenCheckpoints(5000L);

State Management

// Use RocksDB for large state
options.setStateBackend("rocksdb");

// Configure state backend
Map<String, String> config = new HashMap<>();
config.put("state.backend.rocksdb.memory.managed", "true");
config.put("state.backend.rocksdb.block.cache-size", "256m");
options.setFlinkConfDir("/path/to/flink-conf");

Savepoints

Start from a savepoint:
flink run \
  -s hdfs:///savepoints/savepoint-123 \
  -c com.example.MyPipeline \
  target/my-pipeline.jar
// In code
options.setSavepointPath("hdfs:///savepoints/savepoint-123");

Window Configuration

import org.apache.beam.sdk.transforms.windowing.*;

pipeline
    .apply(/* source */)
    .apply(Window.<String>into(
        FixedWindows.of(Duration.standardMinutes(1)))
        .triggering(AfterWatermark.pastEndOfWindow()
            .withEarlyFirings(AfterProcessingTime
                .pastFirstElementInPane()
                .plusDelayOf(Duration.standardSeconds(30)))
            .withLateFirings(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.standardMinutes(5))
        .discardingFiredPanes());

Batch vs Streaming

Batch Pipeline

FlinkPipelineOptions options = 
    PipelineOptionsFactory.as(FlinkPipelineOptions.class);

options.setRunner(FlinkRunner.class);
// streaming defaults to false for batch

Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("/path/to/input"))
    .apply(/* transforms */)
    .apply(TextIO.write().to("/path/to/output"));

p.run().waitUntilFinish();

Streaming Pipeline

FlinkPipelineOptions options = 
    PipelineOptionsFactory.as(FlinkPipelineOptions.class);

options.setRunner(FlinkRunner.class);
options.setStreaming(true);
options.setCheckpointingInterval(30000L);

Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<String, String>read()
    .withBootstrapServers("localhost:9092")
    .withTopic("input-topic")
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class))
    .apply(/* transforms */)
    .apply(KafkaIO.<Void, String>write()
        .withBootstrapServers("localhost:9092")
        .withTopic("output-topic"));

p.run().waitUntilFinish();

Monitoring and Debugging

Access the Flink web UI at http://jobmanager-host:8081:
  • View running and completed jobs
  • Monitor task metrics
  • Inspect checkpoints and savepoints
  • View logs and exceptions
  • Track watermarks and event time

Metrics

Beam metrics are exposed as Flink metrics:
import org.apache.beam.sdk.metrics.*;

public class MyDoFn extends DoFn<String, String> {
  private final Counter counter = 
      Metrics.counter(MyDoFn.class, "processed");
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    counter.inc();
    c.output(c.element());
  }
}

Logging

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    LOG.info("Processing: {}", c.element());
    c.output(c.element());
  }
}
Logs are available in:
  • Flink Web UI (per task)
  • TaskManager log files
  • Configured log aggregation system

Performance Tuning

Parallelism

// Set appropriate parallelism
options.setParallelism(numTaskManagers * slotsPerTaskManager);

// Set max parallelism for better key distribution
options.setMaxParallelism(512);

Memory Configuration

// Configure task manager memory
// In flink-conf.yaml:
// taskmanager.memory.process.size: 4096m
// taskmanager.memory.managed.fraction: 0.4

Network Buffers

# In flink-conf.yaml
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 64mb
taskmanager.network.memory.max: 1gb

State Backend Tuning

# RocksDB configuration in flink-conf.yaml
state.backend: rocksdb
state.backend.rocksdb.memory.managed: true
state.backend.rocksdb.block.cache-size: 256m
state.backend.incremental: true

Best Practices

Checkpointing Strategy

  1. Enable checkpointing for production
    options.setCheckpointingInterval(60000L); // Every minute
    options.setCheckpointingMode("EXACTLY_ONCE");
    
  2. Use appropriate intervals
    • Shorter intervals: Lower data loss, higher overhead
    • Longer intervals: Higher data loss, lower overhead
  3. Configure timeout appropriately
    options.setCheckpointTimeoutMillis(600000L); // 10 minutes
    

State Management

  1. Choose the right state backend
    • Memory: Small state, fast access
    • Filesystem: Medium state
    • RocksDB: Large state, slower access
  2. Use incremental checkpoints
    state.backend.incremental: true
    
  3. Monitor state size
    • Check in Flink UI
    • Set up alerts for growth

Resource Management

  1. Right-size workers
    taskmanager.numberOfTaskSlots: 4
    taskmanager.memory.process.size: 4096m
    
  2. Use appropriate parallelism
    • Generally: slots_per_tm * num_tm
    • Consider data skew
  3. Configure backpressure handling
    • Monitor in Flink UI
    • Increase parallelism if needed

Troubleshooting

Check:
  • Flink cluster is running
  • JobManager address is correct
  • JAR contains all dependencies
  • Sufficient resources available
  • Increase checkpoint timeout
  • Check state backend configuration
  • Verify checkpoint directory is accessible
  • Monitor state size growth
  • Increase TaskManager memory
  • Reduce parallelism
  • Use RocksDB state backend
  • Enable incremental checkpoints
  • Check for backpressure in UI
  • Increase parallelism
  • Optimize transforms
  • Reduce checkpoint frequency
  • Check for stuck sources
  • Verify watermark generation
  • Look for slow tasks in UI
  • Check for data skew

Runner Capabilities

Supported Features

  • ✅ Batch and streaming
  • ✅ Exactly-once processing
  • ✅ Event time processing
  • ✅ State and timers
  • ✅ Side inputs
  • ✅ All window types
  • ✅ Custom triggers
  • ✅ Savepoints

Limitations

  • Limited support for some Beam transforms
  • Requires Flink cluster management
  • State size limited by backend choice

Next Steps

Flink Documentation

Learn more about Apache Flink

SparkRunner

Alternative distributed runner

State & Timers

Advanced stateful processing

Windowing

Learn about windowing strategies