Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The DataflowRunner executes Apache Beam pipelines on Google Cloud Dataflow, a fully managed service for stream and batch data processing.
Overview
Google Cloud Dataflow provides:
Fully Managed : No cluster management required
Autoscaling : Automatic resource scaling based on workload
Optimization : Automatic pipeline optimization and execution
Monitoring : Built-in monitoring and logging with Cloud Monitoring
Security : Integration with Google Cloud IAM and VPC
When to Use DataflowRunner
Best For
Production workloads on GCP
Large-scale data processing
Auto-scaling requirements
Managed infrastructure
Integration with GCP services
Consider Alternatives
Small local datasets (use DirectRunner)
Non-GCP environments
Existing Spark/Flink clusters
Cost-sensitive batch jobs
Setup and Configuration
Prerequisites
Google Cloud Project : Create a project in Google Cloud Console
Enable APIs : Enable Cloud Dataflow, Compute Engine, and Cloud Storage APIs
Authentication : Set up authentication credentials
Cloud Storage : Create a GCS bucket for staging and temp files
Dependencies
Add the Dataflow runner dependency: < dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-google-cloud-dataflow-java </ artifactId >
< version > {beam-version} </ version >
</ dependency >
For Gradle: implementation 'org.apache.beam:beam-runners-google-cloud-dataflow-java:{beam-version}'
Install Beam with GCP extras: pip install apache-beam[gcp]
Import the Dataflow runner: import " github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow "
Authentication
Set up Google Cloud credentials:
# Install gcloud CLI
curl https://sdk.cloud.google.com | bash
# Authenticate
gcloud auth application-default login
# Set your project
gcloud config set project YOUR_PROJECT_ID
Running a Pipeline
Basic Example
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class MyDataflowPipeline {
public static void main ( String [] args ) {
DataflowPipelineOptions options =
PipelineOptionsFactory . fromArgs (args)
. withValidation ()
. as ( DataflowPipelineOptions . class );
// Required: Set the runner
options . setRunner ( DataflowRunner . class );
// Required: GCP project
options . setProject ( "your-project-id" );
// Required: GCP region
options . setRegion ( "us-central1" );
// Required: Staging location
options . setStagingLocation ( "gs://your-bucket/staging" );
// Required: Temp location
options . setTempLocation ( "gs://your-bucket/temp" );
Pipeline p = Pipeline . create (options);
// Build your pipeline
p . apply ( /* your transforms */ );
// Execute on Dataflow
p . run ();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run ():
options = PipelineOptions([
'--runner=DataflowRunner' ,
'--project=your-project-id' ,
'--region=us-central1' ,
'--staging_location=gs://your-bucket/staging' ,
'--temp_location=gs://your-bucket/temp' ,
'--job_name=my-dataflow-job' ,
])
with beam.Pipeline( options = options) as p:
# Build your pipeline
(p
| beam.Create([ 'Hello' , 'World' ])
| beam.Map( print ))
if __name__ == '__main__' :
run()
package main
import (
" context "
" flag "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/dataflow "
)
var (
project = flag . String ( "project" , "" , "GCP project" )
region = flag . String ( "region" , "us-central1" , "GCP region" )
stagingLocation = flag . String ( "staging_location" , "" , "Staging location" )
tempLocation = flag . String ( "temp_location" , "" , "Temp location" )
)
func main () {
flag . Parse ()
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Build your pipeline
if err := dataflow . Execute ( context . Background (), p ); err != nil {
panic ( err )
}
}
Command Line Execution
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Dexec.args= "--runner=DataflowRunner \
--project=your-project-id \
--region=us-central1 \
--stagingLocation=gs://your-bucket/staging \
--tempLocation=gs://your-bucket/temp"
# Python
python my_pipeline.py \
--runner=DataflowRunner \
--project=your-project-id \
--region=us-central1 \
--staging_location=gs://your-bucket/staging \
--temp_location=gs://your-bucket/temp
DataflowPipelineOptions
Key configuration options for the DataflowRunner:
Required Options
Google Cloud project ID. options . setProject ( "my-gcp-project" );
Google Cloud region for job execution (e.g., us-central1, europe-west1). options . setRegion ( "us-central1" );
Cloud Storage path for staging files (must start with gs://). options . setStagingLocation ( "gs://my-bucket/staging" );
Cloud Storage path for temporary files. options . setTempLocation ( "gs://my-bucket/temp" );
Worker Configuration
numWorkers
integer
default: "autoscaling"
Initial number of workers. Dataflow will autoscale from this value. options . setNumWorkers ( 10 );
Maximum number of workers for autoscaling. options . setMaxNumWorkers ( 100 );
workerMachineType
string
default: "n1-standard-1"
Compute Engine machine type for workers. options . setWorkerMachineType ( "n1-standard-4" );
Disk size in GB for each worker. options . setDiskSizeGb ( 500 );
Streaming Options
Enable streaming mode for unbounded sources. options . setStreaming ( true );
Use Dataflow Streaming Engine for streaming pipelines. options . setEnableStreamingEngine ( true );
Network Configuration
Compute Engine network for launching workers. options . setNetwork ( "projects/my-project/global/networks/my-network" );
Compute Engine subnetwork for launching workers. options . setSubnetwork ( "regions/us-central1/subnetworks/my-subnet" );
Whether workers should have public IP addresses. options . setUsePublicIps ( false ); // Private IPs only
Advanced Configuration
Autoscaling
Dataflow automatically scales workers based on workload:
DataflowPipelineOptions options =
PipelineOptionsFactory . as ( DataflowPipelineOptions . class );
// Set autoscaling parameters
options . setAutoscalingAlgorithm (
DataflowPipelineWorkerPoolOptions . AutoscalingAlgorithmType . THROUGHPUT_BASED
);
options . setNumWorkers ( 2 ); // Initial workers
options . setMaxNumWorkers ( 20 ); // Maximum workers
Flex Templates
Create reusable Dataflow templates:
DataflowPipelineOptions options =
PipelineOptionsFactory . as ( DataflowPipelineOptions . class );
// Specify template location
options . setTemplateLocation ( "gs://my-bucket/templates/my-template" );
Pipeline p = Pipeline . create (options);
// Build pipeline
p . run (); // Creates template instead of running
options = PipelineOptions([
'--runner=DataflowRunner' ,
'--template_location=gs://my-bucket/templates/my-template' ,
# other options...
])
Update Existing Jobs
Update a running Dataflow job:
options . setUpdate ( true );
options . setJobName ( "existing-job-name" );
python my_pipeline.py \
--runner=DataflowRunner \
--update \
--job_name=existing-job-name \
# other options...
Monitoring and Debugging
Cloud Console
Monitor jobs in the Dataflow Console :
View job graph and metrics
Monitor worker resource usage
Inspect logs and errors
Track data throughput
Logging
Logs are automatically sent to Cloud Logging:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyDoFn extends DoFn < String , String > {
private static final Logger LOG = LoggerFactory . getLogger ( MyDoFn . class );
@ ProcessElement
public void processElement ( ProcessContext c ) {
LOG . info ( "Processing: {}" , c . element ());
c . output ( c . element ());
}
}
import logging
class MyDoFn ( beam . DoFn ):
def process ( self , element ):
logging.info( f 'Processing: { element } ' )
yield element
Metrics
Dataflow provides built-in metrics:
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
public class MyDoFn extends DoFn < String , String > {
private final Counter elementsProcessed =
Metrics . counter ( MyDoFn . class , "elementsProcessed" );
@ ProcessElement
public void processElement ( ProcessContext c ) {
elementsProcessed . inc ();
c . output ( c . element ());
}
}
Best Practices
Cost Optimization
Use Appropriate Machine Types
// For CPU-intensive workloads
options . setWorkerMachineType ( "n1-highcpu-4" );
// For memory-intensive workloads
options . setWorkerMachineType ( "n1-highmem-4" );
Enable Autoscaling
options . setAutoscalingAlgorithm ( AutoscalingAlgorithmType . THROUGHPUT_BASED );
options . setMaxNumWorkers ( 50 );
Use Streaming Engine (for streaming jobs)
options . setStreaming ( true );
options . setEnableStreamingEngine ( true );
Optimize Windowing
Use appropriate window sizes
Consider allowed lateness for late data
Batch Elements
Use GroupIntoBatches for downstream API calls
Reduce per-element overhead
Use Side Inputs Wisely
Keep side inputs small
Consider using external lookups for large datasets
Security
Use VPC Networks
options . setNetwork ( "projects/my-project/global/networks/my-vpc" );
options . setUsePublicIps ( false );
Service Accounts
options . setServiceAccount ( "my-service-account@my-project.iam.gserviceaccount.com" );
Encryption
Data is encrypted at rest and in transit by default
Use Customer Managed Encryption Keys (CMEK) for additional control
Streaming vs Batch
Batch Pipeline
DataflowPipelineOptions options =
PipelineOptionsFactory . as ( DataflowPipelineOptions . class );
options . setRunner ( DataflowRunner . class );
// streaming defaults to false
Pipeline p = Pipeline . create (options);
p . apply ( TextIO . read (). from ( "gs://bucket/input/*" ))
. apply ( /* transforms */ )
. apply ( TextIO . write (). to ( "gs://bucket/output" ));
Streaming Pipeline
DataflowPipelineOptions options =
PipelineOptionsFactory . as ( DataflowPipelineOptions . class );
options . setRunner ( DataflowRunner . class );
options . setStreaming ( true );
options . setEnableStreamingEngine ( true );
Pipeline p = Pipeline . create (options);
p . apply ( PubsubIO . readStrings (). fromTopic ( "projects/my-project/topics/my-topic" ))
. apply ( /* transforms */ )
. apply ( PubsubIO . writeStrings (). to ( "projects/my-project/topics/output-topic" ));
Troubleshooting
Common Issues
Job fails with 'Quota exceeded' error
Increase quotas in the GCP Console:
Go to IAM & Admin > Quotas
Filter by service (Compute Engine)
Request quota increase
Check:
Service account permissions
Network/firewall configuration
Region availability
Machine type availability in the region
Optimize:
Reduce worker machine sizes
Set appropriate max workers
Use Flex templates for repeated jobs
Enable Streaming Engine for streaming
Set appropriate worker disk sizes
Next Steps
Dataflow Console Monitor and manage your Dataflow jobs
FlinkRunner Alternative for self-managed clusters
Monitoring Guide Learn about metrics and monitoring
Pricing Understand Dataflow pricing