Documentation Index Fetch the complete documentation index at: https://mintlify.com/apache/beam/llms.txt
Use this file to discover all available pages before exploring further.
The Apache Beam Go SDK brings the power of Beam to Go developers, offering native Go idioms and excellent performance for data processing pipelines.
Installation
Install Go
Ensure you have Go 1.21 or later installed:
Create a new Go module
mkdir my-beam-pipeline
cd my-beam-pipeline
go mod init github.com/yourusername/my-beam-pipeline
Install Apache Beam Go SDK
go get github.com/apache/beam/sdks/v2/go/pkg/beam
Install additional packages (optional)
# For I/O connectors
go get github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio
go get github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio
# For running pipelines
go get github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx
Quick Start
Here’s a complete word count example:
package main
import (
" context "
" flag "
" fmt "
" log "
" regexp "
" strings "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
" github.com/apache/beam/sdks/v2/go/pkg/beam/register "
" github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats "
" github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx "
)
var (
input = flag . String ( "input" , "gs://apache-beam-samples/shakespeare/kinglear.txt" , "File to read." )
output = flag . String ( "output" , "output.txt" , "Output file." )
)
var wordRE = regexp . MustCompile ( `[a-zA-Z]+('[a-z])?` )
// extractWords splits a line into words.
func extractWords ( line string , emit func ( string )) {
for _ , word := range wordRE . FindAllString ( line , - 1 ) {
emit ( word )
}
}
// formatCounts formats a word count as a string.
func formatCounts ( word string , count int ) string {
return fmt . Sprintf ( " %s : %d " , word , count )
}
func init () {
// Register functions for serialization
register . Function2x0 ( extractWords )
register . Function2x1 ( formatCounts )
}
func main () {
flag . Parse ()
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Read lines from input file
lines := textio . Read ( s , * input )
// Split lines into words
words := beam . ParDo ( s , extractWords , lines )
// Count occurrences of each word
counted := stats . Count ( s , words )
// Format the counts
formatted := beam . ParDo ( s , formatCounts , counted )
// Write results to output file
textio . Write ( s , * output , formatted )
// Run the pipeline
if err := beamx . Run ( context . Background (), p ); err != nil {
log . Fatalf ( "Failed to execute pipeline: %v " , err )
}
}
Run the pipeline:
go run wordcount.go --output=output.txt
Core Concepts
Pipeline
Create and execute pipelines:
import " github.com/apache/beam/sdks/v2/go/pkg/beam "
func main () {
beam . Init ()
p := beam . NewPipeline ()
s := p . Root ()
// Build your pipeline using scope 's'
if err := beamx . Run ( context . Background (), p ); err != nil {
log . Fatal ( err )
}
}
PCollection
PCollections represent distributed datasets:
// Create from in-memory data
data := beam . CreateList ( s , [] string { "Hello" , "World" , "Beam" })
// Read from files
lines := textio . Read ( s , "input.txt" )
// PCollections are immutable and strongly typed
var processed beam . PCollection // Type inference from transforms
Scopes
Scopes organize and name pipeline components:
s := p . Root ()
// Create named subscopes
readScope := s . Scope ( "Read" )
processScope := s . Scope ( "Process" )
lines := textio . Read ( readScope , "input.txt" )
processed := beam . ParDo ( processScope , transformFn , lines )
Apply transforms to process data:
// ParDo: Element-wise transformation
result := beam . ParDo ( s , myDoFn , input )
// Combine: Aggregate values
total := stats . Sum ( s , numbers )
// Flatten: Merge multiple PCollections
merged := beam . Flatten ( s , pcol1 , pcol2 , pcol3 )
// GroupByKey: Group by key (for KV pairs)
grouped := beam . GroupByKey ( s , kvPairs )
DoFns and Functions
Simple Functions
Use regular Go functions for transformations:
// Simple 1-to-1 transform
func double ( x int ) int {
return x * 2
}
// 1-to-many transform (using emit)
func splitWords ( line string , emit func ( string )) {
for _ , word := range strings . Fields ( line ) {
emit ( word )
}
}
// Register functions for serialization
func init () {
register . Function1x1 ( double )
register . Function2x0 ( splitWords )
}
// Use in pipeline
doubled := beam . ParDo ( s , double , numbers )
words := beam . ParDo ( s , splitWords , lines )
Structural DoFns
Use struct-based DoFns for complex logic:
import " github.com/apache/beam/sdks/v2/go/pkg/beam "
// FilterByThresholdFn filters elements above a threshold
type FilterByThresholdFn struct {
Threshold int
}
func ( f * FilterByThresholdFn ) ProcessElement ( x int , emit func ( int )) {
if x > f . Threshold {
emit ( x )
}
}
func init () {
register . DoFn2x0 [ int ]( & FilterByThresholdFn {})
}
// Use in pipeline
filtered := beam . ParDo ( s , & FilterByThresholdFn { Threshold : 10 }, numbers )
Access additional data during processing:
func enrichWithSideInput ( word string , sideInput func ( * string ) bool , emit func ( string )) {
var prefix string
if sideInput ( & prefix ) {
emit ( prefix + ": " + word )
}
}
func init () {
register . Function3x0 ( enrichWithSideInput )
}
sideData := beam . CreateList ( s , [] string { "PREFIX" })
sideIter := beam . SideInput { Input : sideData }
enriched := beam . ParDo ( s , enrichWithSideInput , words , sideIter )
Go-Specific Features
Type Safety with Generics
The Go SDK leverages Go’s type system:
// Strongly typed transforms
func processInt ( x int ) int { return x * 2 }
func processString ( s string ) string { return strings . ToUpper ( s ) }
intResults := beam . ParDo ( s , processInt , intCollection )
strResults := beam . ParDo ( s , processString , strCollection )
Registration System
Register functions for proper serialization:
import " github.com/apache/beam/sdks/v2/go/pkg/beam/register "
func myTransform ( x int ) int { return x + 1 }
func myEmitter ( x int , emit func ( int , int )) { emit ( x , x * 2 ) }
func init () {
// Register simple functions
register . Function1x1 ( myTransform )
register . Function2x0 ( myEmitter )
// Register DoFns
register . DoFn2x0 [ int ]( & MyDoFn {})
// Register types
register . Type ( reflect . TypeOf (( * MyStruct )( nil )). Elem ())
}
Combiners
Implement efficient aggregations:
import " github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats "
// Built-in combiners
sum := stats . Sum ( s , numbers )
count := stats . Count ( s , elements )
max := stats . Max ( s , values )
// Count per key
counted := stats . Count ( s , words ) // Returns PCollection<KV<string, int>>
I/O Connectors
Text Files
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio "
// Read text files
lines := textio . Read ( s , "input.txt" )
// Read with glob pattern
allFiles := textio . Read ( s , "data/*.txt" )
// Write text files
textio . Write ( s , "output.txt" , results )
// Immediate write (returns no PCollection)
textio . Immediate ( s , "output.txt" , data )
Avro Files
import " github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio "
// Define your schema type
type MyRecord struct {
Name string `avro:"name"`
Age int `avro:"age"`
Email string `avro:"email"`
}
func init () {
register . Type ( reflect . TypeOf (( * MyRecord )( nil )). Elem ())
}
// Read Avro files
records := avroio . Read ( s , "data.avro" , reflect . TypeOf ( MyRecord {}))
// Write Avro files
avroio . Write ( s , "output.avro" , myRecords )
Cross-Language I/O
Use I/O transforms from other SDKs:
import (
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/xlangx "
)
// Use Python's ReadFromBigQuery via cross-language
expansionAddr := "localhost:8097" // Expansion service address
rows := beam . CrossLanguage (
s ,
"beam:transform:org.apache.beam:bigquery_read:v1" ,
& ReadFromBigQueryConfig {
Query : "SELECT * FROM dataset.table" ,
},
expansionAddr ,
beam . UnnamedInput ( beam . Impulse ( s )),
)
Running Pipelines
Direct Runner (Local)
go run main.go \
--runner=direct \
--output=output.txt
Dataflow Runner
go run main.go \
--runner=dataflow \
--project=YOUR_PROJECT_ID \
--region=us-central1 \
--staging_location=gs://YOUR_BUCKET/staging \
--worker_harness_container_image=gcr.io/YOUR_PROJECT/beam_go_sdk:latest
Flink Runner
go run main.go \
--runner=flink \
--flink_master=localhost:8081 \
--environment_type=DOCKER
Prism (Portable Local Runner)
# Install Prism
go install github.com/apache/beam/sdks/v2/go/cmd/prism@latest
# Run your pipeline
go run main.go --runner=prism
Best Practices
Always Register Functions and Types
Use the init() function to register all custom types and functions: func init () {
register . Function1x1 ( myFunc )
register . DoFn2x0 [ string ]( & MyDoFn {})
register . Type ( reflect . TypeOf (( * MyStruct )( nil )). Elem ())
}
Use Named Scopes for Clarity
Organize your pipeline with descriptive scope names: s := p . Root ()
readScope := s . Scope ( "ReadInput" )
lines := textio . Read ( readScope , * input )
processScope := s . Scope ( "ProcessData" )
results := beam . ParDo ( processScope , processFn , lines )
writeScope := s . Scope ( "WriteOutput" )
textio . Write ( writeScope , * output , results )
Leverage Built-in Combiners
Use the stats package for common aggregations: import " github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats "
// Efficient built-in combiners
wordCounts := stats . Count ( s , words )
totalSum := stats . Sum ( s , numbers )
maxValue := stats . Max ( s , values )
minValue := stats . Min ( s , values )
Check errors from pipeline execution: import " context "
if err := beamx . Run ( context . Background (), p ); err != nil {
log . Fatalf ( "Pipeline failed: %v " , err )
}
Create reusable pipeline components:
// CountWords is a composite transform
func CountWords ( s beam . Scope , lines beam . PCollection ) beam . PCollection {
s = s . Scope ( "CountWords" )
words := beam . ParDo ( s . Scope ( "ExtractWords" ), extractWords , lines )
counted := stats . Count ( s . Scope ( "Count" ), words )
formatted := beam . ParDo ( s . Scope ( "Format" ), formatCounts , counted )
return formatted
}
// Use the composite transform
results := CountWords ( s , inputLines )
Testing
Test your pipeline components:
import (
" testing "
" github.com/apache/beam/sdks/v2/go/pkg/beam "
" github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert "
" github.com/apache/beam/sdks/v2/go/pkg/beam/testing/ptest "
)
func TestMyTransform ( t * testing . T ) {
p , s := beam . NewPipelineWithRoot ()
input := beam . CreateList ( s , [] int { 1 , 2 , 3 , 4 , 5 })
output := beam . ParDo ( s , double , input )
expected := [] int { 2 , 4 , 6 , 8 , 10 }
passert . Equals ( s , output , expected ... )
if err := ptest . Run ( p ); err != nil {
t . Fatalf ( "Pipeline failed: %v " , err )
}
}
Building Container Images
For portable runners, build SDK harness containers:
# Using Gradle from Beam source
./gradlew :sdks:go:container:docker -Pdocker-repository-root=YOUR_REPO
# Push to registry
docker push YOUR_REPO/beam_go_sdk:latest
# Use in pipeline
go run main.go \
--runner=dataflow \
--worker_harness_container_image=YOUR_REPO/beam_go_sdk:latest
Resources
Go SDK Reference Complete Go package documentation
Code Examples Sample pipelines and patterns
Prism Runner Local portable runner for testing
Build Guide Building and testing Go SDK
Next Steps