The PrismRunner is a local portable runner authored in Go that executes Beam pipelines on your machine. It’s designed for fast startup, comprehensive testing, and excellent debugging support.
Overview
PrismRunner provides:
Fast Startup : Quick pipeline execution for rapid development
Portable : Uses the Beam FnAPI to work with all SDKs
Local Execution : Runs on your machine for easy debugging
Testing Focused : Enforces correctness to catch bugs early
Modern Architecture : Built with Go’s concurrency model
When to Use PrismRunner
Best For
Local development
Fast iteration cycles
Testing pipelines
Debugging workflows
Cross-language pipelines
CI/CD testing
Not Suitable For
Production workloads
Large datasets
Distributed processing
Long-running jobs
High-throughput streaming
PrismRunner is designed to eventually replace the DirectRunner for Go SDK users and provide better local testing for all SDKs.
Setup and Configuration
Installation
PrismRunner is automatically downloaded when needed by the Beam SDK:
Add the Prism runner dependency: < dependency >
< groupId > org.apache.beam </ groupId >
< artifactId > beam-runners-prism-java </ artifactId >
< version > {beam-version} </ version >
</ dependency >
For Gradle: implementation 'org.apache.beam:beam-runners-prism-java:{beam-version}'
Install Beam with Prism support: Prism binary is automatically managed by the SDK. Import the Prism runner: import " github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism "
Or install the Prism binary directly: go install github.com/apache/beam/sdks/v2/go/cmd/prism@latest
Manual Installation
You can also install Prism manually:
# Download from GitHub releases
wget https://github.com/apache/beam/releases/download/v{version}/prism-{os}-{arch}.zip
unzip prism-{os}-{arch}.zip
# Or build from source
git clone https://github.com/apache/beam.git
cd beam/sdks/go/cmd/prism
go build
Running a Pipeline
Basic Usage
import org.apache.beam.runners.prism.PrismRunner;
import org.apache.beam.runners.prism.PrismPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
public class MyPrismPipeline {
public static void main ( String [] args ) {
PrismPipelineOptions options =
PipelineOptionsFactory . fromArgs (args)
. withValidation ()
. as ( PrismPipelineOptions . class );
// Set the runner
options . setRunner ( PrismRunner . class );
Pipeline p = Pipeline . create (options);
// Build your pipeline
p . apply ( /* your transforms */ );
// Execute with Prism
p . run (). waitUntilFinish ();
}
}
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def run ():
options = PipelineOptions([
'--runner=PrismRunner' ,
])
with beam.Pipeline( options = options) as p:
# Build your pipeline
(p
| beam.Create([ 'Hello' , 'Prism' ])
| beam.Map( lambda x : x.upper())
| beam.Map( print ))
if __name__ == '__main__' :
run()
package main
import (
" context "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism "
)
func main () {
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Build your pipeline
data := beam . Create ( s , "Hello" , "Prism" )
// ... more transforms
if err := prism . Execute ( context . Background (), p ); err != nil {
panic ( err )
}
}
Command Line
Run with PrismRunner from the command line:
# Java
mvn compile exec:java -Dexec.mainClass=com.example.MyPipeline \
-Dexec.args= "--runner=PrismRunner"
# Python
python my_pipeline.py --runner=PrismRunner
# Go
go run my_pipeline.go --runner=prism
PrismPipelineOptions
Configure PrismRunner with these options:
Core Options
Path or URL to the Prism binary. If not set, the SDK will auto-download it. options . setPrismLocation ( "/path/to/prism" );
Can be:
Local file path: /usr/local/bin/prism
URL to binary: https://example.com/prism
GitHub release URL: Auto-constructs download URL
Override the SDK version for downloading Prism from GitHub releases. options . setPrismVersionOverride ( "2.50.0" );
Enable the Prism Web UI for monitoring. options . setEnableWebUI ( true );
Access at http://localhost:8080 when enabled.
Duration Prism waits for a new job before shutting down. Negative durations disable auto-shutdown. options . setIdleShutdownTimeout ( "10m" );
Valid time units: ns, us, ms, s, m, h
Log level for Prism: debug, info, warn, or error. options . setPrismLogLevel ( "info" );
Features and Capabilities
✅ ParDo : All DoFn features
✅ GroupByKey : Including session windows
✅ CoGroupByKey : Multi-input grouping
✅ Combine : Lifted and unlifted
✅ Flatten : Multiple PCollections
✅ Side Inputs : All side input patterns
✅ Windowing : Fixed, sliding, session, global
Windowing Support
import org.apache.beam.sdk.transforms.windowing. * ;
pipeline
. apply ( /* source */ )
. apply (Window. < String > into (
FixedWindows . of ( Duration . standardMinutes ( 1 ))))
. apply ( /* transforms */ );
Supported window types:
Global windows
Fixed windows
Sliding windows
Session windows
State and Timers
State and timers support is limited in the current version of PrismRunner.
Splittable DoFn
PrismRunner expands Splittable DoFns:
public class MySplittableDoFn extends DoFn < String , String > {
@ ProcessElement
public void processElement (
@ Element String element ,
RestrictionTracker < OffsetRange , Long > tracker ,
OutputReceiver < String > out ) {
// Splittable processing
}
@ GetInitialRestriction
public OffsetRange getInitialRestriction (@ Element String element ) {
return new OffsetRange ( 0 , element . length ());
}
}
Web UI
PrismRunner includes a built-in web UI for monitoring:
Accessing the UI
When enableWebUI is true:
Start your pipeline with PrismRunner
Open http://localhost:8080 in your browser
View job progress, metrics, and logs
Features
Job status and progress
Transform graph visualization
Metrics and counters
Error messages and logs
Watermark tracking
The Web UI is particularly useful for debugging complex pipelines and understanding execution flow.
Testing with PrismRunner
PrismRunner is excellent for testing:
Unit Tests
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.PAssert;
import org.junit.Rule;
import org.junit.Test;
public class MyPipelineTest {
@ Rule
public final transient TestPipeline pipeline =
TestPipeline . fromOptions (
PipelineOptionsFactory . create ()
. as ( PrismPipelineOptions . class ));
@ Test
public void testWithPrism () {
PrismPipelineOptions options =
pipeline . getOptions (). as ( PrismPipelineOptions . class );
options . setRunner ( PrismRunner . class );
PCollection < String > output = pipeline
. apply ( Create . of ( "Hello" , "World" ))
. apply ( MapElements . into ( TypeDescriptors . strings ())
. via (String :: toUpperCase));
PAssert . that (output). containsInAnyOrder ( "HELLO" , "WORLD" );
pipeline . run ();
}
}
import unittest
import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.options.pipeline_options import PipelineOptions
class MyPipelineTest ( unittest . TestCase ):
def test_with_prism ( self ):
options = PipelineOptions([
'--runner=PrismRunner' ,
])
with TestPipeline( options = options) as p:
output = (
p
| beam.Create([ 'Hello' , 'World' ])
| beam.Map( str .upper)
)
assert_that(output, equal_to([ 'HELLO' , 'WORLD' ]))
package main
import (
" context "
" testing "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism "
" github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert "
)
func TestPipeline ( t * testing . T ) {
p , s := beam . NewPipelineWithRoot ()
input := beam . Create ( s , "Hello" , "World" )
output := /* transforms */
passert . Equals ( s , output , "HELLO" , "WORLD" )
if err := prism . Execute ( context . Background (), p ); err != nil {
t . Fatalf ( "Pipeline failed: %v " , err )
}
}
Integration Tests
PrismRunner is ideal for CI/CD integration tests:
# GitHub Actions example
name : Test Pipeline
on : [ push , pull_request ]
jobs :
test :
runs-on : ubuntu-latest
steps :
- uses : actions/checkout@v2
- name : Set up Java
uses : actions/setup-java@v2
with :
java-version : '11'
- name : Run tests with Prism
run : |
mvn test -Dtest=MyPipelineTest \
-Drunner=PrismRunner
Debugging
PrismRunner provides excellent debugging support:
Local Debugging
Since Prism runs locally, you can:
Set breakpoints in your DoFns
Use your IDE’s debugger
Inspect variables in real-time
Step through pipeline execution
Logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyDoFn extends DoFn < String , String > {
private static final Logger LOG = LoggerFactory . getLogger ( MyDoFn . class );
@ ProcessElement
public void processElement ( ProcessContext c ) {
LOG . info ( "Processing: {}" , c . element ());
c . output ( c . element ());
}
}
import logging
class MyDoFn ( beam . DoFn ):
def process ( self , element ):
logging.info( f 'Processing: { element } ' )
yield element.upper()
import " github.com/apache/beam/sdks/v2/go/pkg/beam/log "
func processElement ( ctx context . Context , element string ) string {
log . Infof ( ctx , "Processing: %s " , element )
return element
}
Prism Logs
Control Prism’s own logging:
PrismPipelineOptions options =
PipelineOptionsFactory . as ( PrismPipelineOptions . class );
// Set to debug for detailed Prism internals
options . setPrismLogLevel ( "debug" );
Portable Architecture
PrismRunner uses Beam’s portable architecture:
How It Works
Pipeline Translation : SDK translates pipeline to portable proto
Job Submission : Prism receives the job via FnAPI
SDK Harness : Prism launches SDK-specific workers
Execution : Prism coordinates execution across workers
Results : Results are returned to the client
Cross-Language Support
PrismRunner supports cross-language pipelines:
// Java pipeline using Python transform
pipeline
. apply ( Create . of ( "data" ))
. apply ( "PythonTransform" ,
PythonExternalTransform . from ( "my_transform" ))
. apply ( /* more Java transforms */ );
PrismRunner is optimized for development, not production:
Memory : All data processed in memory
Single Machine : No distributed execution
Fast Startup : Quick iteration for development
Testing Focus : Prioritizes correctness over speed
Comparison with DirectRunner
Feature PrismRunner DirectRunner Architecture Portable/FnAPI SDK-specific Languages All SDKs Per-SDK Startup Time Fast Fast Cross-language Yes No Web UI Yes No State Limited Full Maturity Newer Mature
Current Limitations
PrismRunner is under active development. Some limitations:
Testing Use Only : Not for production workloads
In-Memory Only : Not suitable for large jobs
Limited State : State and timers support is incomplete
Single Machine : No distributed execution
Feature Coverage : Some Beam features not yet implemented
See the Prism README for current status.
Future Roadmap
Upcoming features:
Complete state and timers support
Triggers and complex windowing
Test Stream support
Container execution for cross-language
Better streaming support
Performance optimizations
Best Practices
Development Workflow
Start with PrismRunner for development
python my_pipeline.py --runner=PrismRunner
Test with DirectRunner for comprehensive validation
python my_pipeline.py --runner=DirectRunner
Deploy with production runner
python my_pipeline.py --runner=DataflowRunner \
--project=my-project \
--region=us-central1
Testing Strategy
Use PrismRunner for fast unit tests
Use DirectRunner for thorough validation tests
Use production runner for integration tests
CI/CD Integration
# Use PrismRunner for fast CI tests
- name : Fast Tests
run : mvn test -Drunner=PrismRunner
# Use DirectRunner for thorough validation
- name : Validation Tests
run : mvn verify -Drunner=DirectRunner
Troubleshooting
The SDK will auto-download Prism. If this fails:
Check internet connectivity
Manually download from GitHub releases
Set --prismLocation to local binary
Ensure --enableWebUI=true
Check port 8080 is not in use
Verify firewall settings
Pipeline fails with 'feature not implemented'
Check Prism’s current feature support
Consider using DirectRunner instead
Report the issue on GitHub
Reduce data size for local testing
Check for inefficient transforms
Consider using a distributed runner for large data
Next Steps
Prism GitHub View source and contribute
DirectRunner Alternative local runner
Testing Guide Learn more about testing
Cross-Language Build multi-language pipelines