Skip to main content
The PrismRunner is a local portable runner authored in Go that executes Beam pipelines on your machine. It’s designed for fast startup, comprehensive testing, and excellent debugging support.

Overview

PrismRunner provides:
  • Fast Startup: Quick pipeline execution for rapid development
  • Portable: Uses the Beam FnAPI to work with all SDKs
  • Local Execution: Runs on your machine for easy debugging
  • Testing Focused: Enforces correctness to catch bugs early
  • Modern Architecture: Built with Go’s concurrency model

When to Use PrismRunner

Best For

  • Local development
  • Fast iteration cycles
  • Testing pipelines
  • Debugging workflows
  • Cross-language pipelines
  • CI/CD testing

Not Suitable For

  • Production workloads
  • Large datasets
  • Distributed processing
  • Long-running jobs
  • High-throughput streaming
PrismRunner is designed to eventually replace the DirectRunner for Go SDK users and provide better local testing for all SDKs.

Setup and Configuration

Installation

PrismRunner is automatically downloaded when needed by the Beam SDK:
Add the Prism runner dependency:
<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-runners-prism-java</artifactId>
  <version>{beam-version}</version>
</dependency>
For Gradle:
implementation 'org.apache.beam:beam-runners-prism-java:{beam-version}'

Manual Installation

You can also install Prism manually:
# Download from GitHub releases
wget https://github.com/apache/beam/releases/download/v{version}/prism-{os}-{arch}.zip
unzip prism-{os}-{arch}.zip

# Or build from source
git clone https://github.com/apache/beam.git
cd beam/sdks/go/cmd/prism
go build

Running a Pipeline

Basic Usage

import org.apache.beam.runners.prism.PrismRunner;
import org.apache.beam.runners.prism.PrismPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;

public class MyPrismPipeline {
  public static void main(String[] args) {
    PrismPipelineOptions options = 
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PrismPipelineOptions.class);
    
    // Set the runner
    options.setRunner(PrismRunner.class);
    
    Pipeline p = Pipeline.create(options);
    
    // Build your pipeline
    p.apply(/* your transforms */);
    
    // Execute with Prism
    p.run().waitUntilFinish();
  }
}

Command Line

Run with PrismRunner from the command line:
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
  -Dexec.args="--runner=PrismRunner"

# Python
python my_pipeline.py --runner=PrismRunner

# Go
go run my_pipeline.go --runner=prism

PrismPipelineOptions

Configure PrismRunner with these options:

Core Options

prismLocation
string
Path or URL to the Prism binary. If not set, the SDK will auto-download it.
options.setPrismLocation("/path/to/prism");
Can be:
  • Local file path: /usr/local/bin/prism
  • URL to binary: https://example.com/prism
  • GitHub release URL: Auto-constructs download URL
prismVersionOverride
string
Override the SDK version for downloading Prism from GitHub releases.
options.setPrismVersionOverride("2.50.0");
enableWebUI
boolean
default:"true"
Enable the Prism Web UI for monitoring.
options.setEnableWebUI(true);
Access at http://localhost:8080 when enabled.
idleShutdownTimeout
string
default:"5m"
Duration Prism waits for a new job before shutting down. Negative durations disable auto-shutdown.
options.setIdleShutdownTimeout("10m");
Valid time units: ns, us, ms, s, m, h
prismLogLevel
string
default:"warn"
Log level for Prism: debug, info, warn, or error.
options.setPrismLogLevel("info");

Features and Capabilities

Supported Transforms

  • ParDo: All DoFn features
  • GroupByKey: Including session windows
  • CoGroupByKey: Multi-input grouping
  • Combine: Lifted and unlifted
  • Flatten: Multiple PCollections
  • Side Inputs: All side input patterns
  • Windowing: Fixed, sliding, session, global

Windowing Support

import org.apache.beam.sdk.transforms.windowing.*;

pipeline
    .apply(/* source */)
    .apply(Window.<String>into(
        FixedWindows.of(Duration.standardMinutes(1))))
    .apply(/* transforms */);
Supported window types:
  • Global windows
  • Fixed windows
  • Sliding windows
  • Session windows

State and Timers

State and timers support is limited in the current version of PrismRunner.

Splittable DoFn

PrismRunner expands Splittable DoFns:
public class MySplittableDoFn extends DoFn<String, String> {
  @ProcessElement
  public void processElement(
      @Element String element,
      RestrictionTracker<OffsetRange, Long> tracker,
      OutputReceiver<String> out) {
    // Splittable processing
  }
  
  @GetInitialRestriction
  public OffsetRange getInitialRestriction(@Element String element) {
    return new OffsetRange(0, element.length());
  }
}

Web UI

PrismRunner includes a built-in web UI for monitoring:

Accessing the UI

When enableWebUI is true:
  1. Start your pipeline with PrismRunner
  2. Open http://localhost:8080 in your browser
  3. View job progress, metrics, and logs

Features

  • Job status and progress
  • Transform graph visualization
  • Metrics and counters
  • Error messages and logs
  • Watermark tracking
The Web UI is particularly useful for debugging complex pipelines and understanding execution flow.

Testing with PrismRunner

PrismRunner is excellent for testing:

Unit Tests

import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.junit.Rule;
import org.junit.Test;

public class MyPipelineTest {
  @Rule
  public final transient TestPipeline pipeline = 
      TestPipeline.fromOptions(
          PipelineOptionsFactory.create()
              .as(PrismPipelineOptions.class));
  
  @Test
  public void testWithPrism() {
    PrismPipelineOptions options = 
        pipeline.getOptions().as(PrismPipelineOptions.class);
    options.setRunner(PrismRunner.class);
    
    PCollection<String> output = pipeline
        .apply(Create.of("Hello", "World"))
        .apply(MapElements.into(TypeDescriptors.strings())
            .via(String::toUpperCase));
    
    PAssert.that(output).containsInAnyOrder("HELLO", "WORLD");
    
    pipeline.run();
  }
}

Integration Tests

PrismRunner is ideal for CI/CD integration tests:
# GitHub Actions example
name: Test Pipeline

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
      
      - name: Set up Java
        uses: actions/setup-java@v2
        with:
          java-version: '11'
      
      - name: Run tests with Prism
        run: |
          mvn test -Dtest=MyPipelineTest \
            -Drunner=PrismRunner

Debugging

PrismRunner provides excellent debugging support:

Local Debugging

Since Prism runs locally, you can:
  • Set breakpoints in your DoFns
  • Use your IDE’s debugger
  • Inspect variables in real-time
  • Step through pipeline execution

Logging

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyDoFn extends DoFn<String, String> {
  private static final Logger LOG = LoggerFactory.getLogger(MyDoFn.class);
  
  @ProcessElement
  public void processElement(ProcessContext c) {
    LOG.info("Processing: {}", c.element());
    c.output(c.element());
  }
}

Prism Logs

Control Prism’s own logging:
PrismPipelineOptions options = 
    PipelineOptionsFactory.as(PrismPipelineOptions.class);

// Set to debug for detailed Prism internals
options.setPrismLogLevel("debug");

Portable Architecture

PrismRunner uses Beam’s portable architecture:

How It Works

  1. Pipeline Translation: SDK translates pipeline to portable proto
  2. Job Submission: Prism receives the job via FnAPI
  3. SDK Harness: Prism launches SDK-specific workers
  4. Execution: Prism coordinates execution across workers
  5. Results: Results are returned to the client

Cross-Language Support

PrismRunner supports cross-language pipelines:
// Java pipeline using Python transform
pipeline
    .apply(Create.of("data"))
    .apply("PythonTransform", 
        PythonExternalTransform.from("my_transform"))
    .apply(/* more Java transforms */);

Performance Considerations

PrismRunner is optimized for development, not production:
  • Memory: All data processed in memory
  • Single Machine: No distributed execution
  • Fast Startup: Quick iteration for development
  • Testing Focus: Prioritizes correctness over speed
For production workloads or large datasets, use DataflowRunner, FlinkRunner, or SparkRunner.

Comparison with DirectRunner

FeaturePrismRunnerDirectRunner
ArchitecturePortable/FnAPISDK-specific
LanguagesAll SDKsPer-SDK
Startup TimeFastFast
Cross-languageYesNo
Web UIYesNo
StateLimitedFull
MaturityNewerMature

Current Limitations

PrismRunner is under active development. Some limitations:
  • Testing Use Only: Not for production workloads
  • In-Memory Only: Not suitable for large jobs
  • Limited State: State and timers support is incomplete
  • Single Machine: No distributed execution
  • Feature Coverage: Some Beam features not yet implemented
See the Prism README for current status.

Future Roadmap

Upcoming features:
  • Complete state and timers support
  • Triggers and complex windowing
  • Test Stream support
  • Container execution for cross-language
  • Better streaming support
  • Performance optimizations

Best Practices

Development Workflow

  1. Start with PrismRunner for development
    python my_pipeline.py --runner=PrismRunner
    
  2. Test with DirectRunner for comprehensive validation
    python my_pipeline.py --runner=DirectRunner
    
  3. Deploy with production runner
    python my_pipeline.py --runner=DataflowRunner \
      --project=my-project \
      --region=us-central1
    

Testing Strategy

  • Use PrismRunner for fast unit tests
  • Use DirectRunner for thorough validation tests
  • Use production runner for integration tests

CI/CD Integration

# Use PrismRunner for fast CI tests
- name: Fast Tests
  run: mvn test -Drunner=PrismRunner

# Use DirectRunner for thorough validation
- name: Validation Tests
  run: mvn verify -Drunner=DirectRunner

Troubleshooting

The SDK will auto-download Prism. If this fails:
  • Check internet connectivity
  • Manually download from GitHub releases
  • Set --prismLocation to local binary
  • Ensure --enableWebUI=true
  • Check port 8080 is not in use
  • Verify firewall settings
  • Check Prism’s current feature support
  • Consider using DirectRunner instead
  • Report the issue on GitHub
  • Reduce data size for local testing
  • Check for inefficient transforms
  • Consider using a distributed runner for large data

Next Steps

Prism GitHub

View source and contribute

DirectRunner

Alternative local runner

Testing Guide

Learn more about testing

Cross-Language

Build multi-language pipelines