Skip to main content
The SparkRunner executes Apache Beam pipelines on Apache Spark clusters, enabling you to leverage existing Spark infrastructure for both batch and streaming workloads.

Overview

Apache Spark is a unified analytics engine that provides:
  • Batch Processing: Fast in-memory batch processing
  • Streaming: Structured Streaming for real-time processing
  • SQL Support: Integrate with Spark SQL and DataFrames
  • Ecosystem: Rich ecosystem of libraries and connectors
  • Scalability: Run on clusters from laptops to data centers

When to Use SparkRunner

Best For

  • Existing Spark infrastructure
  • Spark ecosystem integration
  • Unified batch and streaming
  • On-premise deployments
  • Cost-effective large-scale batch
  • Teams familiar with Spark

Consider Alternatives

  • Pure streaming apps (FlinkRunner)
  • GCP workloads (DataflowRunner)
  • Local development (DirectRunner)
  • Low-latency streaming (FlinkRunner)

Setup and Configuration

Prerequisites

  • Apache Spark 3.0 or later
  • Java 8 or later
  • Spark cluster or local Spark installation

Dependencies

Add to your pom.xml:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-spark-3</artifactId>
  <version>{beam-version}</version>
</dependency>
For Gradle:
implementation 'org.apache.beam:beam-runners-spark-3:{beam-version}'
Use beam-runners-spark-3 for Spark 3.x. For Spark 2.x, use beam-runners-spark.

Spark Cluster Setup

Local Spark

# Download Spark
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzf spark-3.5.0-bin-hadoop3.tgz
cd spark-3.5.0-bin-hadoop3

# Start local master and worker
./sbin/start-master.sh
./sbin/start-worker.sh spark://localhost:7077

# Access UI at http://localhost:8080

Spark on YARN

# Submit to YARN cluster
spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --class com.example.MyPipeline \
  target/my-pipeline.jar

Spark on Kubernetes

# Submit to Kubernetes
spark-submit \
  --master k8s://https://kubernetes-api:6443 \
  --deploy-mode cluster \
  --class com.example.MyPipeline \
  target/my-pipeline.jar

Running a Pipeline

Basic Example

import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class MySparkPipeline {
  public static void main(String[] args) {
    SparkPipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(SparkPipelineOptions.class);
    
    // Set the runner
    options.setRunner(SparkRunner.class);
    
    // Spark master URL
    options.setSparkMaster("spark://localhost:7077");
    // Or local mode
    // options.setSparkMaster("local[4]");
    
    Pipeline p = Pipeline.create(options);
    
    // Build your pipeline
    p.apply(/* your transforms */);
    
    // Execute on Spark
    p.run().waitUntilFinish();
  }
}

Execution Modes

Local Mode

Run with local Spark:
SparkPipelineOptions options = 
    PipelineOptionsFactory.as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");  // Use all cores
// or
options.setSparkMaster("local[4]");  // Use 4 cores

Cluster Mode

Submit to a Spark cluster:
spark-submit \
  --master spark://master:7077 \
  --class com.example.MyPipeline \
  --deploy-mode cluster \
  target/my-pipeline.jar \
  --runner=SparkRunner

SparkPipelineOptions

Core Options

sparkMaster
string
default:"local[4]"
Spark master URL:
  • local[n] - Local mode with n threads
  • local[*] - Local mode with all cores
  • spark://host:port - Spark standalone cluster
  • yarn - YARN cluster
  • k8s://host:port - Kubernetes cluster
options.setSparkMaster("spark://localhost:7077");
appName
string
default:"Beam App"
Application name displayed in Spark UI.
options.setAppName("My Beam Pipeline");

Streaming Options

streaming
boolean
default:"false"
Enable streaming mode for unbounded sources.
options.setStreaming(true);
batchIntervalMillis
long
default:"500"
Batch interval for Spark Streaming in milliseconds.
options.setBatchIntervalMillis(1000L); // 1 second
checkpointDurationMillis
long
default:"-1"
Checkpoint interval in milliseconds. -1 uses Spark’s default.
options.setCheckpointDurationMillis(10000L); // 10 seconds
checkpointDir
string
Directory for Spark checkpoints.
options.setCheckpointDir("hdfs:///checkpoints");

Performance Options

bundleSize
long
default:"0"
Bundle size for splitting bounded sources. 0 uses Spark’s default parallelism.
options.setBundleSize(1000L);
cacheDisabled
boolean
default:"false"
Disable caching of reused PCollections.
options.setCacheDisabled(true);
maxRecordsPerBatch
long
default:"-1"
Maximum records per micro-batch for streaming sources.
options.setMaxRecordsPerBatch(10000L);

Resource Configuration

sparkSubmitConf
Map<String, String>
Spark configuration properties.
Map<String, String> conf = new HashMap<>();
conf.put("spark.executor.memory", "4g");
conf.put("spark.executor.cores", "2");
options.setSparkSubmitConf(conf);

Advanced Configuration

Using Existing SparkContext

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.beam.runners.spark.SparkContextOptions;

// Create Spark context
SparkConf conf = new SparkConf()
    .setAppName("My Pipeline")
    .setMaster("local[4]");
JavaSparkContext jsc = new JavaSparkContext(conf);

// Use with Beam
SparkContextOptions options = 
    PipelineOptionsFactory.as(SparkContextOptions.class);
options.setUsesProvidedSparkContext(true);
options.setProvidedSparkContext(jsc);

Pipeline p = Pipeline.create(options);
// Build pipeline
p.run().waitUntilFinish();

jsc.stop();

Spark Configuration

SparkPipelineOptions options = 
    PipelineOptionsFactory.as(SparkPipelineOptions.class);

Map<String, String> sparkConf = new HashMap<>();

// Executor configuration
sparkConf.put("spark.executor.memory", "4g");
sparkConf.put("spark.executor.cores", "2");
sparkConf.put("spark.executor.instances", "10");

// Driver configuration
sparkConf.put("spark.driver.memory", "2g");
sparkConf.put("spark.driver.cores", "1");

// Dynamic allocation
sparkConf.put("spark.dynamicAllocation.enabled", "true");
sparkConf.put("spark.dynamicAllocation.minExecutors", "2");
sparkConf.put("spark.dynamicAllocation.maxExecutors", "20");

// Shuffle configuration
sparkConf.put("spark.shuffle.service.enabled", "true");

options.setSparkSubmitConf(sparkConf);

Checkpointing

For streaming pipelines:
SparkPipelineOptions options = 
    PipelineOptionsFactory.as(SparkPipelineOptions.class);

options.setStreaming(true);
options.setCheckpointDir("hdfs:///beam/checkpoints");
options.setCheckpointDurationMillis(10000L); // 10 seconds

Batch vs Streaming

Batch Pipeline

SparkPipelineOptions options = 
    PipelineOptionsFactory.as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");
// streaming defaults to false

Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from("/path/to/input"))
    .apply(/* transforms */)
    .apply(TextIO.write().to("/path/to/output"));

p.run().waitUntilFinish();

Streaming Pipeline

SparkPipelineOptions options = 
    PipelineOptionsFactory.as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");
options.setStreaming(true);
options.setBatchIntervalMillis(1000L);
options.setCheckpointDir("/tmp/checkpoint");

Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<String, String>read()
    .withBootstrapServers("localhost:9092")
    .withTopic("input-topic")
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class))
    .apply(/* transforms */)
    .apply(KafkaIO.<Void, String>write()
        .withBootstrapServers("localhost:9092")
        .withTopic("output-topic"));

p.run().waitUntilFinish();

Monitoring and Debugging

Spark UI

Access Spark UI for monitoring:
  • Master UI: http://master:8080 (standalone mode)
  • Application UI: http://driver:4040
  • History Server: http://history-server:18080
View:
  • Job stages and tasks
  • Executor metrics
  • Storage and memory usage
  • Environment configuration

Metrics

Beam metrics are available in Spark:
import org.apache.beam.sdk.metrics.*;

public class MyDoFn extends DoFn<String, String> {
  private final Counter counter = 
      Metrics.counter(MyDoFn.class, "processed");
  private final Distribution distribution = 
      Metrics.distribution(MyDoFn.class, "size");
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    counter.inc();
    distribution.update(c.element().length());
    c.output(c.element());
  }
}

// Read metrics after execution
PipelineResult result = p.run();
result.waitUntilFinish();

MetricResults metrics = result.metrics();
MetricQueryResults counters = metrics.queryMetrics(
    MetricsFilter.builder()
        .addNameFilter(MetricNameFilter.named(MyDoFn.class, "processed"))
        .build());

Logging

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    LOG.info("Processing: {}", c.element());
    c.output(c.element());
  }
}
Logs are available:
  • In Spark executor logs
  • Through Spark UI
  • In configured log aggregation system

Performance Tuning

Parallelism

// Control bundle size for better parallelism
options.setBundleSize(1000L);

// Use appropriate Spark parallelism
Map<String, String> conf = new HashMap<>();
conf.put("spark.default.parallelism", "200");
conf.put("spark.sql.shuffle.partitions", "200");
options.setSparkSubmitConf(conf);

Memory Management

Map<String, String> conf = new HashMap<>();

// Executor memory
conf.put("spark.executor.memory", "8g");
conf.put("spark.executor.memoryOverhead", "2g");

// Memory fractions
conf.put("spark.memory.fraction", "0.8");
conf.put("spark.memory.storageFraction", "0.3");

options.setSparkSubmitConf(conf);

Caching Strategy

// Enable caching for reused PCollections (default)
options.setCacheDisabled(false);

// Or disable if recomputing is faster
options.setCacheDisabled(true);

Shuffle Optimization

Map<String, String> conf = new HashMap<>();

// Shuffle configuration
conf.put("spark.shuffle.compress", "true");
conf.put("spark.shuffle.spill.compress", "true");
conf.put("spark.shuffle.file.buffer", "128k");

options.setSparkSubmitConf(conf);

Best Practices

Resource Allocation

  1. Right-size executors
    conf.put("spark.executor.cores", "4");
    conf.put("spark.executor.memory", "8g");
    conf.put("spark.executor.instances", "10");
    
  2. Use dynamic allocation
    conf.put("spark.dynamicAllocation.enabled", "true");
    conf.put("spark.dynamicAllocation.minExecutors", "2");
    conf.put("spark.dynamicAllocation.maxExecutors", "50");
    

Streaming Best Practices

  1. Choose appropriate batch interval
    options.setBatchIntervalMillis(1000L); // Balance latency vs throughput
    
  2. Enable checkpointing
    options.setCheckpointDir("hdfs:///checkpoints");
    options.setCheckpointDurationMillis(10000L);
    
  3. Configure backpressure
    conf.put("spark.streaming.backpressure.enabled", "true");
    conf.put("spark.streaming.kafka.maxRatePerPartition", "1000");
    

Batch Best Practices

  1. Partition data appropriately
  2. Use broadcast variables for small datasets
  3. Avoid wide transformations when possible
  4. Coalesce output partitions

Troubleshooting

Check:
  • Spark master is running and accessible
  • JAR contains all dependencies
  • Spark version compatibility
  • Network connectivity
  • Increase executor memory
  • Increase memory overhead
  • Reduce partition size
  • Enable off-heap memory
  • Disable caching if not beneficial
  • Check data skew in Spark UI
  • Increase parallelism
  • Optimize shuffle operations
  • Enable dynamic allocation
  • Tune memory fractions
  • Reduce batch interval
  • Increase executors
  • Enable backpressure
  • Optimize transforms
  • Check for bottlenecks in UI
  • Verify checkpoint directory is accessible
  • Check HDFS/storage permissions
  • Ensure sufficient disk space
  • Monitor checkpoint size

Runner Capabilities

Supported Features

  • ✅ Batch processing
  • ✅ Structured Streaming
  • ✅ Windowing
  • ✅ Triggers
  • ✅ State and timers
  • ✅ Side inputs
  • ✅ Metrics

Limitations

  • Limited exactly-once streaming support
  • Batch interval impacts latency
  • Some Beam features not fully supported
  • Requires Spark cluster management

Integration Examples

Reading from Hive

// Use Spark's native Hive support
Pipeline p = Pipeline.create(options);
// Use external Hive connector or custom source

Writing to Cassandra

import org.apache.beam.sdk.io.cassandra.CassandraIO;

pipeline
    .apply(/* source */)
    .apply(CassandraIO.<MyType>write()
        .withHosts(Arrays.asList("localhost"))
        .withPort(9042)
        .withKeyspace("my_keyspace")
        .withTable("my_table"));

Next Steps

Spark Documentation

Learn more about Apache Spark

FlinkRunner

Alternative for streaming workloads

Performance

Optimize pipeline performance

Monitoring

Monitor Spark applications