Skip to main content
The Apache Beam TypeScript SDK brings modern JavaScript/TypeScript development to data processing pipelines, with a focus on schema-first data processing and cross-language capabilities.

Installation

1

Install Node.js

Ensure you have Node.js 16 or later installed:
node --version
2

Create a new project

mkdir my-beam-pipeline
cd my-beam-pipeline
npm init -y
3

Install Apache Beam

npm install apache-beam
4

Install TypeScript (recommended)

npm install --save-dev typescript @types/node
npx tsc --init
5

Install Python and Java (for cross-language transforms)

The TypeScript SDK leverages cross-language transforms extensively. Install:
  • Python 3.8+ for Python transforms
  • Java 8+ for Java transforms

Quick Start

Here’s a simple word count example:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";
import { countPerElement } from "apache-beam/transforms/group_and_combine";

function wordCount(lines: beam.PCollection<string>): beam.PCollection<any> {
  return lines
    .map((s: string) => s.toLowerCase())
    .flatMap(function* (line: string) {
      yield* line.split(/[^a-z]+/);
    })
    .apply(countPerElement());
}

async function main() {
  await createRunner().run((root) => {
    const lines = root.apply(
      beam.create([
        "To be or not to be that is the question",
        "Whether tis nobler in the mind to suffer",
        "The slings and arrows of outrageous fortune",
      ])
    );

    const counts = lines.apply(wordCount);
    counts.map(console.log);
  });
}

main()
  .catch((e) => console.error(e))
  .finally(() => process.exit());
Build and run:
npx tsc
node dist/wordcount.js

Core Concepts

Root and Pipeline

Pipelines are built using a Root PValue:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";

async function main() {
  const runner = createRunner();
  
  await runner.run((root) => {
    // Build your pipeline starting from root
    const data = root.apply(beam.create([1, 2, 3, 4, 5]));
    
    // Apply transforms
    data.map((x) => x * 2).map(console.log);
  });
}

PCollection

PCollections represent distributed datasets:
// Create from in-memory data
const numbers = root.apply(beam.create([1, 2, 3, 4, 5]));

// PCollections support method chaining
const result = numbers
  .map((x) => x * 2)
  .filter((x) => x > 5);

Transforms

Apply transforms using .apply() or convenience methods:
import * as beam from "apache-beam";

// Using convenience methods
const doubled = numbers.map((x) => x * 2);
const words = lines.flatMap((line) => line.split(" "));
const filtered = data.filter((x) => x > 10);

// Using apply with PTransform
const counted = words.apply(countPerElement());

// Using apply with a function
const custom = data.apply((pcoll) => {
  return pcoll.map((x) => x * 2).filter((x) => x > 5);
});

TypeScript-Specific Features

Schema-First Approach

The TypeScript SDK emphasizes working with structured data:
import * as beam from "apache-beam";

interface User {
  name: string;
  age: number;
  email: string;
}

const users = root.apply(
  beam.create<User>([
    { name: "Alice", age: 30, email: "alice@example.com" },
    { name: "Bob", age: 25, email: "bob@example.com" },
  ])
);

// Work with structured data naturally
const emails = users.map((user) => user.email);
const adults = users.filter((user) => user.age >= 18);

Map and FlatMap

Use familiar array-like operations:
// Map: 1-to-1 transformation
const upper = words.map((word) => word.toUpperCase());

// FlatMap: 1-to-many using generators
const chars = words.flatMap(function* (word) {
  for (const char of word) {
    yield char;
  }
});

// FlatMap: Return arrays
const split = lines.flatMap((line) => line.split(" "));

Async/Await Support

Work with asynchronous operations:
import * as beam from "apache-beam";

// Async map operations
const enriched = await data.asyncMap(async (element) => {
  const result = await fetchFromAPI(element);
  return result;
});

// Async pipeline execution
await runner.run(async (root) => {
  const data = root.apply(beam.create([1, 2, 3]));
  data.map(console.log);
});

Context Parameters

Access element metadata and side inputs:
import { withTimestamp, withWindow } from "apache-beam";

// Access timestamp
const withTime = data.map(
  (element, context) => {
    return {
      element,
      timestamp: context.timestamp,
    };
  },
  { timestamp: withTimestamp() }
);

// Access window
const withWin = data.map(
  (element, context) => {
    return {
      element,
      window: context.window,
    };
  },
  { window: withWindow() }
);

Working with Multiple Outputs

Split PCollections based on element properties:
import { Split } from "apache-beam/transforms";

interface Result {
  valid?: string;
  invalid?: string;
}

// Process and tag elements
const tagged = data.map((element): Result => {
  if (isValid(element)) {
    return { valid: element };
  } else {
    return { invalid: element };
  }
});

// Split into separate PCollections
const { valid, invalid } = tagged.apply(new Split());

Grouping and Combining

Group By Key

import { groupBy } from "apache-beam/transforms";

interface Event {
  userId: string;
  action: string;
  timestamp: number;
}

const events = root.apply(beam.create<Event>([...]));

// Group by a field
const byUser = events.apply(
  groupBy((event) => event.userId)
);

Aggregations

import {
  countPerElement,
  sum,
  mean,
  combine,
} from "apache-beam/transforms/group_and_combine";

// Count occurrences
const wordCounts = words.apply(countPerElement());

// Sum values by key
interface Sale {
  product: string;
  amount: number;
}

const totals = sales.apply(
  groupBy((sale) => sale.product)
).apply(
  combine((sales) => sales.reduce((sum, s) => sum + s.amount, 0))
);

Cross-Language Transforms

Use transforms from Python and Java SDKs:
import * as external from "apache-beam/transforms/external";

// Use Python's ReadFromBigQuery
const bigQueryData = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromBigQuery",
    {
      query: "SELECT * FROM dataset.table LIMIT 100",
      use_standard_sql: true,
    }
  )
);

// Use Java transforms
const kafkaData = root.apply(
  external.javaTransform(
    "org.apache.beam.sdk.io.kafka.KafkaIO.Read",
    {
      bootstrapServers: "localhost:9092",
      topics: ["my-topic"],
    }
  )
);

I/O Operations

Reading Files

import { readFromText } from "apache-beam/io";

// Read text files
const lines = root.apply(
  readFromText("gs://bucket/path/*.txt")
);

// Read with custom parsing
const parsed = lines.map((line) => JSON.parse(line));

Writing Files

import { writeToText } from "apache-beam/io";

// Write to text files
data.apply(
  writeToText("output/results.txt")
);

// Format before writing
results
  .map((item) => JSON.stringify(item))
  .apply(writeToText("output/data.jsonl"));

BigQuery (via Python)

import * as external from "apache-beam/transforms/external";

// Read from BigQuery
const rows = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromBigQuery",
    {
      table: "project:dataset.table",
    }
  )
);

// Write to BigQuery
data.apply(
  external.pythonTransform(
    "apache_beam.io.WriteToBigQuery",
    {
      table: "project:dataset.output_table",
      schema: {
        fields: [
          { name: "field1", type: "STRING" },
          { name: "field2", type: "INTEGER" },
        ],
      },
    }
  )
);

Running Pipelines

Direct Runner (Local)

node dist/main.js --runner=direct
Flink infrastructure is automatically downloaded:
node dist/main.js --runner=flink

Dataflow Runner

node dist/main.js \
  --runner=dataflow \
  --project=YOUR_PROJECT_ID \
  --region=us-central1 \
  --tempLocation=gs://YOUR_BUCKET/temp

Configuring Runners

import { createRunner } from "apache-beam/runners";

const runner = createRunner({
  runner: "flink",
  flinkMaster: "localhost:8081",
});

await runner.run((root) => {
  // Build pipeline
});

Windowing

Process streaming data with time windows:
import { FixedWindows, SlidingWindows } from "apache-beam/transforms/window";
import { assignWindows } from "apache-beam";

// Fixed windows
const windowed = data.apply(
  assignWindows(FixedWindows({ durationSecs: 60 }))
);

// Sliding windows
const sliding = data.apply(
  assignWindows(
    SlidingWindows({
      periodSecs: 30,
      durationSecs: 60,
    })
  )
);

Best Practices

Define interfaces for your data:
interface LogEntry {
  timestamp: number;
  level: string;
  message: string;
  userId?: string;
}

const logs = root.apply(beam.create<LogEntry>([...]));

// TypeScript catches errors at compile time
const errors = logs.filter((log) => log.level === "ERROR");
Use mature I/O connectors from Python and Java:
import * as external from "apache-beam/transforms/external";

// Use Python's extensive I/O library
const kafkaData = root.apply(
  external.pythonTransform(
    "apache_beam.io.ReadFromKafka",
    {
      consumer_config: {
        "bootstrap.servers": "localhost:9092",
      },
      topics: ["events"],
    }
  )
);
Generators provide clean syntax for emitting multiple elements:
const exploded = data.flatMap(function* (element) {
  for (const item of element.items) {
    yield item;
  }
});
Use async/await for I/O operations:
const enriched = await data.asyncMap(async (element) => {
  try {
    const details = await fetchDetails(element.id);
    return { ...element, ...details };
  } catch (error) {
    console.error(`Failed to enrich ${element.id}:`, error);
    return element;
  }
});

Composite Transforms

Create reusable transform functions:
import * as beam from "apache-beam";
import { countPerElement } from "apache-beam/transforms/group_and_combine";

function extractAndCountWords(
  lines: beam.PCollection<string>
): beam.PCollection<{ word: string; count: number }> {
  return lines
    .flatMap((line) => line.toLowerCase().split(/\W+/))
    .filter((word) => word.length > 0)
    .apply(countPerElement())
    .map(({ element, count }) => ({
      word: element,
      count: count,
    }));
}

// Use the composite transform
const wordCounts = lines.apply(extractAndCountWords);

Testing

Test your transforms:
import * as beam from "apache-beam";
import { createRunner } from "apache-beam/runners";
import { expect } from "chai";

describe("Word Count Transform", () => {
  it("counts words correctly", async () => {
    const runner = createRunner({ runner: "direct" });
    const results: any[] = [];

    await runner.run((root) => {
      const input = root.apply(
        beam.create(["hello world", "hello beam"])
      );

      const counts = input.apply(wordCount);
      counts.map((x) => results.push(x));
    });

    expect(results).to.deep.include({ element: "hello", count: 2 });
    expect(results).to.deep.include({ element: "world", count: 1 });
    expect(results).to.deep.include({ element: "beam", count: 1 });
  });
});

Starter Project

Clone the official starter project:
git clone https://github.com/apache/beam-starter-typescript.git
cd beam-starter-typescript
npm install
npm run build
node dist/index.js

Resources

TypeScript SDK Docs

Official TypeScript SDK documentation

Starter Project

Template project to get started quickly

Code Examples

Sample pipelines and patterns

API Reference

Detailed API documentation

Next Steps