Skip to main content
The I/O packages provide transforms for reading from and writing to various data sources and sinks.

textio

Package textio provides transforms for reading and writing text files.

Read

Reads a set of files matching a glob pattern and returns lines as PCollection<string>.
func Read(s beam.Scope, glob string, opts ...ReadOptionFn) beam.PCollection
s
beam.Scope
The scope to insert the transform into
glob
string
File path or glob pattern (e.g., “gs://bucket/.txt”, “/path/to/.log”)
opts
...ReadOptionFn
Optional configuration: ReadAutoCompression(), ReadGzip(), ReadUncompressed()
Returns
beam.PCollection
PCollection<string> of lines (newlines removed)
Example:
import (
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
)

lines := textio.Read(root, "gs://my-bucket/input/*.txt")

// With compression options
gzLines := textio.Read(root, "data/*.gz", textio.ReadGzip())
autoLines := textio.Read(root, "data/*", textio.ReadAutoCompression())

ReadAll

Expands and reads filenames from an input PCollection<string> of globs.
func ReadAll(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
s
beam.Scope
The scope to insert the transform into
col
beam.PCollection
PCollection<string> of file paths or glob patterns
opts
...ReadOptionFn
Optional compression configuration
Returns
beam.PCollection
PCollection<string> of all lines from all matched files
Example:
patterns := beam.Create(root, "data/part-*.txt", "logs/*.log")
allLines := textio.ReadAll(root, patterns)

ReadWithFilename

Reads files and returns PCollection<KV<string,string>> of filename and line.
func ReadWithFilename(s beam.Scope, glob string, opts ...ReadOptionFn) beam.PCollection
Returns
beam.PCollection
PCollection<KV<string,string>> where key is filename and value is line content
Example:
filesAndLines := textio.ReadWithFilename(root, "logs/*.log")

// Process with filename context
func logWithFile(filename, line string) {
    log.Printf("[%s] %s", filename, line)
}

beam.ParDo0(root, logWithFile, filesAndLines)

Write

Writes a PCollection<string> to a file as separate lines.
func Write(s beam.Scope, filename string, col beam.PCollection)
s
beam.Scope
The scope to insert the transform into
filename
string
Output file path
col
beam.PCollection
PCollection<string> to write (each element becomes a line)
Example:
lines := beam.Create(root, "line 1", "line 2", "line 3")
textio.Write(root, "output.txt", lines)

// GCS path
textio.Write(root, "gs://my-bucket/output.txt", results)

Immediate

Reads a local file at pipeline construction time and embeds data into the pipeline.
func Immediate(s beam.Scope, filename string) (beam.PCollection, error)
s
beam.Scope
The scope to insert the transform into
filename
string
Local file path to read immediately
Returns
(beam.PCollection, error)
PCollection<string> with file contents, or error if read fails
Note: Only use for small files as data is embedded in the pipeline.

fileio

Package fileio provides lower-level transforms for matching and reading files with more control.

MatchFiles

Finds all files matching a glob pattern and returns PCollection<FileMetadata>.
func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection
s
beam.Scope
The scope to insert the transform into
glob
string
File path or glob pattern
opts
...MatchOptionFn
Options: MatchEmptyAllow(), MatchEmptyDisallow(), MatchEmptyAllowIfWildcard()
Returns
beam.PCollection
PCollection<FileMetadata> with file path, size, and last modified time
Example:
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio"

matches := fileio.MatchFiles(root, "data/*.json")

// Allow empty results
matches := fileio.MatchFiles(root, "optional/*.txt", fileio.MatchEmptyAllow())

MatchAll

Matches files from an input PCollection<string> of glob patterns.
func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection
col
beam.PCollection
PCollection<string> of glob patterns
Returns
beam.PCollection
PCollection<FileMetadata> of all matching files

MatchContinuously

Continuously watches for new files matching a pattern at regular intervals.
func MatchContinuously(
    s beam.Scope,
    glob string,
    interval time.Duration,
    opts ...MatchContOptionFn,
) beam.PCollection
s
beam.Scope
The scope to insert the transform into
glob
string
File pattern to watch
interval
time.Duration
How often to check for new files
opts
...MatchContOptionFn
Options: MatchStart(), MatchEnd(), MatchDuplicateSkip(), MatchDuplicateAllowIfModified(), MatchApplyWindow()
Returns
beam.PCollection
Unbounded PCollection<FileMetadata> of matching files over time
Example:
import "time"

// Check for new files every 30 seconds
newFiles := fileio.MatchContinuously(
    root,
    "gs://bucket/incoming/*.json",
    30*time.Second,
    fileio.MatchDuplicateSkip(),
)

ReadMatches

Converts PCollection<FileMetadata> to PCollection<ReadableFile>.
func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
s
beam.Scope
The scope to insert the transform into
col
beam.PCollection
PCollection<FileMetadata> from MatchFiles/MatchAll
opts
...ReadOptionFn
Options: ReadAutoCompression(), ReadGzip(), ReadUncompressed(), ReadDirectorySkip()
Returns
beam.PCollection
PCollection<ReadableFile> ready for reading
Example:
matches := fileio.MatchFiles(root, "data/*.gz")
files := fileio.ReadMatches(root, matches, fileio.ReadGzip())

// Custom processing
func processFile(ctx context.Context, file fileio.ReadableFile, emit func(string)) error {
    data, err := file.Read(ctx)
    if err != nil {
        return err
    }
    // Process data...
    emit(string(data))
    return nil
}

results := beam.ParDo(root, processFile, files)

FileMetadata

Contains metadata about a matched file.
type FileMetadata struct {
    Path         string
    Size         int64
    LastModified time.Time
}
Path
string
Full path to the file
Size
int64
File size in bytes
LastModified
time.Time
When the file was last modified

ReadableFile

Wrapper around FileMetadata providing methods to read file contents.
type ReadableFile struct {
    Metadata    FileMetadata
    Compression compressionType
}

Open

Opens the file for reading.
func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error)
ctx
context.Context
Context for the file operation
Returns
(io.ReadCloser, error)
Reader for file contents (caller must close) or error
Example:
func processFile(ctx context.Context, file fileio.ReadableFile) error {
    reader, err := file.Open(ctx)
    if err != nil {
        return err
    }
    defer reader.Close()
    
    scanner := bufio.NewScanner(reader)
    for scanner.Scan() {
        line := scanner.Text()
        // Process line...
    }
    return scanner.Err()
}

Read

Reads the entire file into memory.
func (f ReadableFile) Read(ctx context.Context) ([]byte, error)
ctx
context.Context
Context for the file operation
Returns
([]byte, error)
Complete file contents or error

ReadString

Reads the entire file as a string.
func (f ReadableFile) ReadString(ctx context.Context) (string, error)
Returns
(string, error)
File contents as string or error

Other I/O Packages

The Go SDK includes I/O connectors for various data sources:

avroio

Read and write Avro files.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/avroio"

bigqueryio

Read from and write to Google BigQuery.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/bigqueryio"

parquetio

Read and write Parquet files.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/parquetio"

pubsubio

Read from and write to Google Cloud Pub/Sub.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/pubsubio"

databaseio

Generic database I/O for SQL databases.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/databaseio"

mongodbio

Read from and write to MongoDB.
import "github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"

Complete Example

package main

import (
    "context"
    "strings"
    
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/register"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
)

func init() {
    register.Function1x2(extractWords)
    register.Function2x1(sumCounts)
    register.Function1x1(formatOutput)
}

func extractWords(line string) (string, int) {
    words := strings.Fields(line)
    for _, word := range words {
        // Would emit each word; simplified here
    }
    return word, 1
}

func sumCounts(a, b int) int {
    return a + b
}

func formatOutput(word string, count int) string {
    return fmt.Sprintf("%s: %d", word, count)
}

func main() {
    beam.Init()
    
    p := beam.NewPipeline()
    root := p.Root()
    
    // Read input files
    lines := textio.Read(root, "input/*.txt")
    
    // Count words
    wordCounts := beam.ParDo(root, extractWords, lines)
    grouped := beam.CombinePerKey(root, sumCounts, wordCounts)
    
    // Format and write output
    formatted := beam.ParDo(root, formatOutput, grouped)
    textio.Write(root, "output.txt", formatted)
    
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf("Failed to execute pipeline: %v", err)
    }
}

Best Practices

  • Use appropriate URI schemes: gs:// for GCS, s3:// for S3, local paths for local files
  • Glob patterns support * and ** wildcards
  • Always validate file paths before pipeline execution
  • Use ReadAutoCompression() for mixed compression types
  • Explicit compression options are faster (no detection overhead)
  • Gzip is automatically detected from .gz extension
  • textio.Read uses splittable DoFn for parallel processing
  • Large files are automatically split into chunks
  • Use fileio for custom file processing logic
  • Avoid Immediate() for large files (embeds in pipeline)
  • Use MatchContinuously for streaming file ingestion
  • Configure deduplication based on your use case
  • Consider windowing strategies for time-based processing