Use this file to discover all available pages before exploring further.
Beam SQL allows you to use SQL queries to process data in your Apache Beam pipelines. This page demonstrates how to query PCollections using standard SQL syntax.
import apache_beam as beamfrom apache_beam.transforms.sql import SqlTransform# Define schema-aware datawith beam.Pipeline() as p: # Create PCollection with schema data = ( p | 'Create' >> beam.Create([ beam.Row(name='Alice', age=30, city='NYC'), beam.Row(name='Bob', age=25, city='SF'), beam.Row(name='Charlie', age=35, city='NYC'), ]) ) # Query with SQL results = ( data | 'SQL Query' >> SqlTransform(""" SELECT name, age FROM PCOLLECTION WHERE age > 25 ORDER BY age DESC """) ) results | 'Print' >> beam.Map(print)
Key Points:
Use beam.Row to create schema-aware elements
Reference the PCollection as PCOLLECTION in SQL
Standard SQL syntax (SELECT, WHERE, ORDER BY, etc.)
import org.apache.beam.sdk.Pipeline;import org.apache.beam.sdk.extensions.sql.SqlTransform;import org.apache.beam.sdk.schemas.Schema;import org.apache.beam.sdk.values.PCollection;import org.apache.beam.sdk.values.Row;// Define schemaSchema schema = Schema.builder() .addStringField("name") .addInt32Field("age") .addStringField("city") .build();// Create data with schemaPCollection<Row> data = pipeline.apply(Create.of( Row.withSchema(schema) .addValues("Alice", 30, "NYC") .build(), Row.withSchema(schema) .addValues("Bob", 25, "SF") .build())).setRowSchema(schema);// Apply SQL queryPCollection<Row> results = data.apply( SqlTransform.query( "SELECT name, age FROM PCOLLECTION WHERE age > 25 ORDER BY age DESC" ));
Query external tables using Google Cloud Data Catalog.Based on sdks/java/extensions/sql/datacatalog/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlDataCatalogExample.java:61-83
import org.apache.beam.sdk.extensions.sql.SqlTransform;import org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogTableProvider;import org.apache.beam.sdk.extensions.sql.meta.provider.datacatalog.DataCatalogPipelineOptions;public class DataCatalogExample { public static void main(String[] args) { DataCatalogPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(DataCatalogPipelineOptions.class); Pipeline pipeline = Pipeline.create(options); // Use Data Catalog as table provider try (DataCatalogTableProvider tableProvider = DataCatalogTableProvider.create(options)) { pipeline .apply("SQL Query", SqlTransform.query(options.getQueryString()) .withDefaultTableProvider("datacatalog", tableProvider)) .apply("Convert to Strings", rowsToStrings()) .apply("Write output", TextIO.write().to(options.getOutputFilePrefix())); pipeline.run().waitUntilFinish(); } }}
Example Query:
SELECT product_name, SUM(quantity) as total_quantity, AVG(price) as avg_priceFROM `project.dataset.sales_table`WHERE date >= '2024-01-01'GROUP BY product_nameORDER BY total_quantity DESCLIMIT 10
Use SQL for complex filtering and data transformations.
import apache_beam as beamfrom apache_beam.transforms.sql import SqlTransform# User events dataevents = p | beam.Create([ beam.Row(user_id=1, event='login', timestamp=1640000000, value=None), beam.Row(user_id=1, event='purchase', timestamp=1640000100, value=99.99), beam.Row(user_id=2, event='login', timestamp=1640000050, value=None), beam.Row(user_id=2, event='view', timestamp=1640000150, value=None), beam.Row(user_id=1, event='logout', timestamp=1640000200, value=None),])# Complex filtering and transformationsresults = events | SqlTransform(""" SELECT user_id, event, CAST(timestamp AS TIMESTAMP) as event_time, COALESCE(value, 0.0) as event_value, CASE WHEN event = 'purchase' THEN 'conversion' WHEN event = 'view' THEN 'engagement' ELSE 'other' END as event_category FROM PCOLLECTION WHERE event IN ('login', 'purchase', 'view') AND (value IS NULL OR value > 0)""")
from apache_beam.transforms.sql import SqlTransform# Calculate running totals and rankingsresults = sales | SqlTransform(""" SELECT product, category, amount, SUM(amount) OVER ( PARTITION BY category ORDER BY amount DESC ) as running_total, ROW_NUMBER() OVER ( PARTITION BY category ORDER BY amount DESC ) as rank_in_category, AVG(amount) OVER ( PARTITION BY category ) as category_avg FROM PCOLLECTION""")