Skip to main content
The pipeline package provides the foundational types and functions for creating and managing Apache Beam pipelines in Go.

Pipeline

Pipeline manages a directed acyclic graph of primitive PTransforms and the PCollections that they consume and produce. Each Pipeline is self-contained and isolated from any other Pipeline.

NewPipeline

Creates a new empty pipeline.
func NewPipeline() *Pipeline
Returns
*Pipeline
A new pipeline instance ready for transform construction
Example:
import "github.com/apache/beam/sdks/v2/go/pkg/beam"

p := beam.NewPipeline()
root := p.Root()
// Add transforms to root scope

Root

Returns the root scope of the pipeline.
func (p *Pipeline) Root() Scope
Returns
Scope
The root scope used for inserting transforms

Build

Validates the pipeline and returns a lower-level representation for execution. Called by runners only.
func (p *Pipeline) Build() ([]*graph.MultiEdge, []*graph.Node, error)
Returns
([]*graph.MultiEdge, []*graph.Node, error)
Graph edges, nodes, and any validation error

Scope

Scope is a hierarchical grouping for composite transforms. Scopes can be enclosed in other scopes and form a tree structure.

Type Definition

type Scope struct {
    // Internal fields - use provided methods
}

IsValid

Returns true if the Scope is valid. Any use of an invalid Scope will panic.
func (s Scope) IsValid() bool
Returns
bool
Whether the scope is valid for use

Scope

Returns a sub-scope with the given name.
func (s Scope) Scope(name string) Scope
name
string
Name for the sub-scope (may be augmented for uniqueness)
Returns
Scope
A new sub-scope nested within this scope
Example:
root := p.Root()
processing := root.Scope("ProcessData")
filtering := processing.Scope("Filter")

WithContext

Creates a named subscope with an attached context for the represented composite transform.
func (s Scope) WithContext(ctx context.Context, name string) Scope
ctx
context.Context
Context to attach to the scope
name
string
Name for the sub-scope
Returns
Scope
A new sub-scope with the attached context

PCollection

PCollection is an immutable collection of values. A PCollection can contain either a bounded or unbounded number of elements.

Type Definition

type PCollection struct {
    // Internal fields - use provided methods
}

IsValid

Returns true if the PCollection is valid and part of a Pipeline.
func (p PCollection) IsValid() bool
Returns
bool
Whether the PCollection is valid

Type

Returns the full type of the elements in the PCollection.
func (p PCollection) Type() FullType
Returns
FullType
The concrete type of elements (e.g., int, string, KV<int,string>)

Coder

Returns the coder for the collection.
func (p PCollection) Coder() Coder
Returns
Coder
The coder used for encoding/decoding elements

SetCoder

Sets the coder for the collection.
func (p PCollection) SetCoder(c Coder) error
c
Coder
The coder to use for this PCollection
Returns
error
Error if the coder type doesn’t match the PCollection type

Pipeline Execution

Run

Executes the pipeline using the selected registered runner.
func Run(ctx context.Context, runner string, p *Pipeline) (PipelineResult, error)
ctx
context.Context
Context for pipeline execution
runner
string
Name of the registered runner (e.g., “direct”, “dataflow”, “flink”)
p
*Pipeline
The pipeline to execute
Returns
(PipelineResult, error)
Pipeline result with metrics and job ID, or error if execution fails
Example:
import (
    "context"
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

p := beam.NewPipeline()
root := p.Root()

// Build your pipeline
data := beam.Create(root, "hello", "world")
// ... more transforms

// Execute with beamx helper (uses --runner flag)
if err := beamx.Run(context.Background(), p); err != nil {
    log.Fatalf("Pipeline failed: %v", err)
}

PipelineResult

Interface returned by pipeline execution containing metrics and job information.
type PipelineResult interface {
    Metrics() metrics.Results
    JobID() string
}

Metrics

Returns the metrics collected during pipeline execution.
Metrics() metrics.Results

JobID

Returns the job identifier from the runner.
JobID() string

Create

Inserts a fixed set of values into the pipeline.

Create

Creates a PCollection from a fixed non-empty set of values.
func Create(s Scope, values ...any) PCollection
s
Scope
The scope to insert the transform into
values
...any
One or more values of the same type
Returns
PCollection
A PCollection containing the specified values
Example:
data := beam.Create(root, "apple", "banana", "cherry")
numbers := beam.Create(root, 1, 2, 3, 4, 5)

CreateList

Creates a PCollection from a slice or array. Supports empty collections.
func CreateList(s Scope, list any) PCollection
s
Scope
The scope to insert the transform into
list
any
A slice or array of values
Returns
PCollection
A PCollection containing the list elements
Example:
items := []string{"a", "b", "c"}
data := beam.CreateList(root, items)

empty := []int{}
emptyCol := beam.CreateList(root, empty)  // Valid, unlike Create

Best Practices

  • Create one pipeline per program execution
  • Use meaningful scope names for debugging and monitoring
  • Build the entire pipeline before calling Run
  • Pipelines can safely be executed concurrently
  • Use Scope() to create logical groupings of transforms
  • Scope names appear in monitoring and visualization tools
  • Nested scopes form namespaces for transform identification
  • PCollections are immutable - transforms create new ones
  • Check IsValid() when handling PCollections conditionally
  • PCollections cannot be shared between pipelines