Select Star: Flink SQL for Pulsar Folks

A presentation at Uber Apache Flink x Apache Pulsar Meetup 2021 in March 2021 in by Marta Paes

Slide 1

Slide 1

Select Star: Flink SQL for Pulsar Folks Marta Paes (@morsapaes) Developer Advocate © 2020 Ververica

Slide 2

Slide 2

Why Pulsar + Flink? J. Doe ● 1st 2 @morsapaes

Slide 3

Slide 3

Why Pulsar + Flink? 3 @morsapaes

Slide 4

Slide 4

Why Pulsar + Flink? “Batch as a special case of streaming” “Stream as a unified view on data” 4 @morsapaes

Slide 5

Slide 5

Pulsar: Unified Storage “Stream as a unified view on data” ● Pub/Sub messaging layer (Streaming) ● Durable storage layer (Batch) Unified Storage (Segments / Pub/Sub) Credit: StreamNative 5 @morsapaes Learn more: Apache Pulsar as One Storage System for Both Real-time and Historical Data Analysis

Slide 6

Slide 6

Flink: Unified Processing Engine “Batch as a special case of streaming” ● Reuse code and logic across batch and stream processing ● Ensure consistent semantics between processing modes ● Simplify operations ● Power applications mixing historic and real-time data Unified Processing Engine (Batch / Streaming) bounded query start of the stream unbounded query 6 @morsapaes bounded query future unbounded query past now Learn more: Flink Ahead: What Comes After Unified Batch and Streaming?

Slide 7

Slide 7

A Unified Data Stack Unified Storage (Segments / Pub/Sub) Unified Processing Engine (Batch / Streaming) 7 @morsapaes

Slide 8

Slide 8

Flink is broad Streaming Analytics & ML Stateful Stream Processing Event-Driven Applications Streams, State, Time SQL, PyFlink, Tables Stateful Functions Flink Runtime Stateful Computations over Data Streams 8 @morsapaes

Slide 9

Slide 9

Flink is broad Streaming Analytics & ML Stateful Stream Processing Event-Driven Applications Streams, State, Time SQL, PyFlink, Tables Stateful Functions Flink Runtime Stateful Computations over Data Streams 9 @morsapaes

Slide 10

Slide 10

Use Cases More high-level or domain-specific use cases can be modeled with SQL/Python and dynamic tables. ● Focus on business logic, not implementation ● Mixed workloads (batch and streaming) ● Maximize developer speed and autonomy Examples Unified Online/Offline Model Training 10 @morsapaes E2E Streaming Analytics Pipelines ML Feature Generation

Slide 11

Slide 11

Flink SQL “Everyone knows SQL, right?” SELECT user_id, COUNT(url) AS cnt FROM clicks GROUP BY user_id; This is standard SQL (ANSI SQL) 11 @morsapaes

Slide 12

Slide 12

Flink SQL “Everyone knows SQL, right?” SELECT user_id, COUNT(url) AS cnt FROM clicks GROUP BY user_id; This is standard SQL (ANSI SQL) also Flink SQL 12 @morsapaes

Slide 13

Slide 13

A Regular SQL Engine Take a snapshot when the query starts user cTime url Mary 12:00:00 https://… Bob 12:00:00 https://… Mary 12:00:02 https://… Liz 12:00:03 https://… SELECT user_id, COUNT(url) AS cnt FROM clicks GROUP BY user_id; A row that was added after the query was started is not considered 13 @morsapaes A final result is produced user cnt Mary 2 Bob 1 The query terminates

Slide 14

Slide 14

A Streaming SQL Engine Ingest all changes as they happen user Mary Bob cTime 12:00:00 12:00:00 Continuously update the result url https://… https://… Mary 12:00:02 https://… Liz 12:00:03 https://… SELECT user_id, COUNT(url) AS cnt FROM clicks GROUP BY user_id; The result is identical to the one-time query (at this point) 14 @morsapaes user cnt Mary Mary 1 2 Bob 1 Liz 1

Slide 15

Slide 15

Flink SQL in a Nutshell ● Standard SQL syntax and semantics (i.e. not a “SQL-flavor”) ● Unified APIs for batch and streaming ● Support for advanced time handling and operations (e.g. CDC, pattern matching) Execution Streaming Native Connectors Formats Batch FileSystems Apache Kafka Debezium Metastore TPC-DS Coverage UDF Support Java Data Catalogs Postgres (JDBC) Kinesis Python HBase JDBC Scala Elasticsearch

  • 15 @morsapaes For an overview of supported operations, check the Flink documentation: Table API&SQL / SQL / Queries

Slide 16

Slide 16

Pulsar Integration Over Time Streaming Source/Sink Connectors Table Sink Connector Flink 1.6+ 2018 16 @morsapaes Read more on Pulsar support in Flink 1.6+: When Flink & Pulsar Come Together

Slide 17

Slide 17

Pulsar Integration Over Time Streaming Source/Sink Connectors Table Sink Connector Flink 1.9+ Flink 1.6+ 2018 Pulsar Schema + Flink Catalog Integration Table API/SQL as first-class citizens Exactly-once Source At-least once Sink 17 @morsapaes Read more on Pulsar support in Flink 1.9+: How to Query Pulsar Streams using Apache Flink

Slide 18

Slide 18

Pulsar Integration Over Time Upserts (upsert-pulsar) DDL Computed Columns, Watermarks and Metadata Streaming Source/Sink Connectors End-to-end Exactly-once Table Sink Connector Key-shared Subscription Model Flink 1.9+ Flink 1.6+ Flink 1.12 2018 Pulsar Schema + Flink Catalog Integration Table API/SQL as first-class citizens Exactly-once Source At-least once Sink 18 @morsapaes Read more on Pulsar support in Flink 1.12+: What’s New in the Pulsar Flink Connector 2.7.0?

Slide 19

Slide 19

Pulsar Integration Over Time Upserts (upsert-pulsar) DDL Computed Columns, Watermarks and Metadata Streaming Source/Sink Connectors End-to-end Exactly-once Table Sink Connector Key-shared Subscription Model Flink 1.13 * Flink 1.9+ Flink 1.6+ Apr/May’21 Flink 1.12 2018 Pulsar Schema + Flink Catalog Integration Contribution to the Apache Flink repository [FLINK-20726] Table API/SQL as first-class citizens Port connector to the new, unified Source API (FLIP-27) Exactly-once Source At-least once Sink 19 @morsapaes

  • For upcoming improvements to Flink SQL as a whole, check this Wiki page

Slide 20

Slide 20

How does this look like, in practice? 20 @morsapaes

Slide 21

Slide 21

DEMO

  1. Use the Twitter Firehose built-in connector to consume tweets about gardening 🌿 into a Pulsar topic (tweets). 21 @morsapaes Follow along with this example: https://github.com/morsapaes/flink-sql-pulsar

Slide 22

Slide 22

DEMO 2. Start the Flink SQL Client and use a Pulsar catalog to access the topic directly as a table in Flink. SQL Client Catalog DDL CREATE CATALOG pulsar WITH ( ‘type’ = ‘pulsar’, ‘service-url’ = ‘pulsar://pulsar:6650’, ‘admin-url’ = ‘http://pulsar:8080’, ‘format’ = ‘json’ ); 22 @morsapaes

Slide 23

Slide 23

DEMO 2.1. You can query the tweets topic off-the-bat using a simple SELECT statement — it’s treated as a Flink table! SQL Client 23 @morsapaes

Slide 24

Slide 24

DEMO 2.2. But then you find out that most Firehose events have a null createdTime. What now? SQL Client Not cool. 👹 24 @morsapaes

Slide 25

Slide 25

DEMO 3. One way to get a relevant timestamp is to use Pulsar metadata to get the publishTime (i.e. ingestion time). Source Table DDL CREATE TABLE pulsar_tweets ( publishTime TIMESTAMP(3) METADATA, WATERMARK FOR publishTime AS publishTime - INTERVAL ‘5’ SECOND Read and use Pulsar message metadata ) WITH ( ‘connector’ = ‘pulsar’, ‘topic’ = ‘persistent://public/default/tweets’, ‘value.format’ = ‘json’, ‘service-url’ = ‘pulsar://pulsar:6650’, Define the source connector (Pulsar) ‘admin-url’ = ‘http://pulsar:8080’, ‘scan.startup.mode’ = ‘earliest-offset’ ) LIKE tweets; 25 @morsapaes Derive schema from the original topic Pulsar Flink connector repo: https://github.com/streamnative/pulsar-flink

Slide 26

Slide 26

DEMO 4. Perform a simple windowed aggregation (count), and insert results into a new pulsar topic (tweets_agg). Sink Table DDL Continuous SQL Query CREATE TABLE pulsar_tweets_agg ( INSERT INTO pulsar_tweets_agg tmstmp TIMESTAMP(3), SELECT TUMBLE_START(publishTime, INTERVAL ‘10’ SECOND) AS tweet_cnt BIGINT wStart, ) WITH ( ‘connector’=’pulsar’, FROM pulsar_tweets ‘topic’=’persistent://public/default/tweets_agg’, GROUP BY TUMBLE(publishTime, INTERVAL ‘10’ SECOND); ‘value.format’=’json’, ‘service-url’=’pulsar://pulsar:6650’, ‘admin-url’=’http://pulsar:8080’ ); 26 COUNT(id) AS tweet_cnt @morsapaes

Slide 27

Slide 27

DEMO 5. We’ll get a count of the # of tweets in windows of 10 seconds (based on event time!). 27 @morsapaes

Slide 28

Slide 28

There’s a lot more to it! Check out the Flink SQL Cookbook, where we share hands-on examples, patterns, and use cases for Flink SQL. 28 @morsapaes

Slide 29

Slide 29

Thank you! Follow me on Twitter: @morsapaes Learn more about Flink: https://flink.apache.org/ © 2020 Ververica