From the KSQL command prompt, we’ll first declare the stream of events (orders), using the Kafka topic populated from the CSV files:
ksql> CREATE STREAM orders WITH (KAFKA_TOPIC='orders',VALUE_FORMAT='avro');
It’s easy to check that we’ve got data—note the use of SET 'auto.offset.reset' = 'earliest'; to tell KSQL to process all data in the topic, rather than the default which to only new data:
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'
ksql> SELECT * FROM orders LIMIT 1;
1521209173235 | - | 1 | 288 | 2018-03-05T07:30:55Z | Wine - Sherry Dry Sack, William | 1.53
LIMIT reached for the partition.
Query terminated
Now let’s look at the customers. We are going to declare this as a table in KSQL, because it is a set of values for a given key (as opposed to a stream of events, which is what the orders are). But, we need to be careful with the key. Before declaring the object (CREATE TABLE) we can use the PRINT command to inspect the topic:
ksql> PRINT 'asgard.demo.customers' FROM BEGINNING;
Format:AVRO
16/03/18 14:01:27 GMT, +�, {"id": 245, "first_name": "Sergent", "last_name": "Greenmon", "email": "sgreenmon6s@wordpress.com", "gender": "Male", "comments": "Synergized optimizing pricing structure", "messagetopic": "asgard.demo.customers", "messagesource": "Debezium CDC from MySQL on asgard"}
Note the special characters in the record key (the , +�, after the timestamp, before the message payload). This is because the key is actually an Avro key — and KSQL can’t handle Avro keys yet, so blanks it out. We can verify that this is the case by looking at the raw message and its key:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic asgard.demo.customers --property print.key=true --max-messages=1 --from-beginning
{"id":1} {"id":1,"first_name":{"string":"Kania"},"last_name":{"string":"Eggleson"},"email":{"string":"keggleson0@tripadvisor.com"},"gender":{"string":"Female"},"comments":{"string":"Multi-channelled web-enabled ability"},"messagetopic":{"string":"asgard.demo.customers"},"messagesource":{"string":"Debezium CDC from MySQL on asgard"}}
Processed a total of 1 messages
We could workaround this upstream in the Kafka Connect config (by using "key.converter":"org.apache.kafka.connect.storage.StringConverter" in the configuration), or we could just use KSQL!
ksql> SET 'auto.offset.reset' = 'earliest';
ksql> CREATE STREAM CUSTOMERS_SRC WITH (KAFKA_TOPIC='asgard.demo.customers',VALUE_FORMAT='AVRO');
ksql> CREATE STREAM CUSTOMERS_REKEYED AS SELECT * FROM CUSTOMERS_SRC PARTITION BY ID;
The above statement takes the source topic which is flowing through from MySQL via Debezium, and explicitly partitions it on the supplied key—the ID column. KSQL does this and the resulting topic is keyed as we want, and using a simple String for the key this time:
ksql> PRINT 'CUSTOMERS_REKEYED' FROM BEGINNING;
Format:AVRO
02/03/18 23:48:05 GMT, 5, {"ID": 5, "FIRST_NAME": "Marion", "LAST_NAME": "Micklem", "EMAIL": "mmicklem4@mail.ru", "GENDER": "Male", "COMMENTS": "Reactive clear-thinking functionalities", "MESSAGETOPIC": "asgard.demo.customers", "MESSAGESOURCE": "Debezium CDC from MySQL on asgard"}
Inspect the message—we can’t use kafka-avro-console-consumer because it assumes that the key is also Avro-encoded, which it isn’t this time. Instead, we’ll use the excellent kafkacat tool:
$ kafkacat -C -K: -b localhost:9092 -f 'Key: %k\nValue: %s\n\n' -t CUSTOMERS_REKEYED -c1
Key: 5
Value:
MarionMicklem"mmicklem4@mail.rMaleNReactive clear-thinking functionalities*asgard.demo.customersBDebezium CDC from MySQL on asgard
We can now use the correctly-keyed topic for our KSQL table:
ksql> CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='CUSTOMERS_REKEYED', VALUE_FORMAT='AVRO', KEY='ID');
Check that the table’s declared key (ID) matches that of the Kafka message key:
ksql> SELECT ROWKEY,ID FROM CUSTOMERS LIMIT 5;
5 | 5
6 | 6
10 | 10
12 | 12
15 | 15
LIMIT reached for the partition.
Query terminated
Now the bit we’ve all been waiting for…enrich the stream of inbound orders data with customer data from MySQL!
ksql> SELECT O.ORDER_TS, O.PRODUCT, O.ORDER_TOTAL_USD, \
C.ID, C.FIRST_NAME, C.LAST_NAME, C.EMAIL \
FROM ORDERS O \
LEFT OUTER JOIN CUSTOMERS C \
ON O.CUSTOMER_ID = C.ID \
LIMIT 5;
2018-03-13T01:50:53Z | Onions - Spanish | 9.44 | 115 | Alexandr | Willcot | awillcot36@facebook.com
2018-03-04T07:58:10Z | Halibut - Whole, Fresh | 5.11 | 929 | Ulick | Dumberell | udumberellps@ucla.edu
2018-02-09T19:11:15Z | Beef Wellington | 7.33 | 632 | Jennie | McMichell | jmcmichellhj@miitbeian.gov.cn
2018-03-11T15:39:49Z | Chocolate Eclairs | 1.45 | 270 | Margareta | Kerfod | mkerfod7h@nhs.uk
2018-03-04T23:27:04Z | Wine - George Duboeuf Rose | 6.68 | 117 | Duky | Raden | draden38@marketwatch.com
LIMIT reached for the partition.
Query terminated
(I’m using the \ line-continuation character to make it easier to read the KSQL statements, but you can put it all on one line if you want)
We can persist this streaming query with a CREATE STREAM statement:
ksql> CREATE STREAM ORDERS_ENRICHED AS \
SELECT O.ORDER_TS, O.PRODUCT, O.ORDER_TOTAL_USD, \
C.ID, C.FIRST_NAME, C.LAST_NAME, C.EMAIL \
FROM ORDERS O \
LEFT OUTER JOIN CUSTOMERS C \
ON O.CUSTOMER_ID = C.ID \
Message
Stream created and running
This is a continuous query that executes in the background until explicitly terminated by the user. In effect, these are stream processing applications, and all we need to create them is SQL! Here all we’ve done is an enrichment (joining two sets of data), but we could easily add predicates to the data (simply include a WHERE clause), or even aggregations.
You can see which queries are running with the SHOW QUERIES; statement. All queries will pause if the KSQL server stops, and restart automagically when the KSQL server starts again.
The DESCRIBE EXTENDED command can be used to see information about the derived stream such as the one created above. As well as simply the columns involved, we can see information about the underlying topic, and run-time stats such as the number of messages processed and the timestamp of the most recent one.
ksql> DESCRIBE EXTENDED ORDERS_ENRICHED;
Type : STREAM
Key field : O.CUSTOMER_ID
Timestamp field : Not set - using <ROWTIME>
Key format : STRING
Value format : AVRO
Kafka output topic : ORDERS_ENRICHED (partitions: 4, replication: 1)
Field | Type
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
ORDER_TS | VARCHAR(STRING)
PRODUCT | VARCHAR(STRING)
ORDER_TOTAL_USD | VARCHAR(STRING)
ID | INTEGER
FIRST_NAME | VARCHAR(STRING)
LAST_NAME | VARCHAR(STRING)
EMAIL | VARCHAR(STRING)
Queries that write into this STREAM
id:CSAS_ORDERS_ENRICHED - CREATE STREAM ORDERS_ENRICHED AS SELECT O.ORDER_TS,O.PRODUCT,O.ORDER_TOTAL_USD,C.ID, C.FIRST_NAME, C.LAST_NAME, C.EMAIL FROM ORDERS O LEFT OUTER JOIN CUSTOMERS C ON O.CUSTOMER_ID = C.ID LIMIT 5;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
messages-per-sec: 15.08 total-messages: 1500 last-message: 14/03/18 16:15:07 GMT
failed-messages: 0 failed-messages-per-sec: 0 last-failed: n/a
(Statistics of the local KSQL server interaction with the Kafka topic ORDERS_ENRICHED)
Underneath every persisted KSQL stream or table query (i.e. CSAS or CTAS) is a Kafka topic. This is just a Kafka topic as any other:
$ kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic ORDERS_ENRICHED --max-messages=1 --from-beginning|jq '.'
{
"ORDER_TS": {
"string": "2018-03-13T01:50:53Z"
},
"PRODUCT": {
"string": "Onions - Spanish"
},
"ORDER_TOTAL_USD": {
"string": "9.44"
},
"ID": {
"int": 115
},
"FIRST_NAME": {
"string": "Alexandr"
},
"LAST_NAME": {
"string": "Willcot"
},
"EMAIL": {
"string": "awillcot36@facebook.com"
}
}
Processed a total of 1 messages
Streaming the Enriched Data to S3 for Visual Analysis
We’ve seen how easy it is to ingest data from multiple sources—whether flat-file or RDBMS—and join it effortlessly. Now let’s see how we can stream it to a target datastore in order to built analytics on it.
S3 is Amazon’s ubiquitous object store, used extremely widely for both long-term storage of data for analytics, as well as operational data files. Confluent Platform ships with a Kafka Connect connector for S3, meaning that any data that is in Kafka can be easily streamed to S3. The connector supports exactly-once delivery semantics, as well as useful features such as customisable partitioning.
To set up the S3 connector you just need your bucket name, region, and your AWS access keys that have permission to write to the bucket. You can make the credentials available to the connector in several ways, the simplest being to set the required environment variables before launching the Connect worker.
export AWS_ACCESS_KEY_ID=XXXXXXXXXXXXXXXXXXXX
export AWS_SECRET_ACCESS_KEY=YYYYYYYYY/YYYYYYYYY
Restart the Connect worker to pick up the new environment variables:
confluent local stop connect
confluent local start connect
Now create the connector:
curl -i -X POST -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/ \
-d '{
"name": "s3-sink-orders",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"tasks.max": "1",
"topics": "ORDERS_ENRICHED",
"s3.region": "us-east-2",
"s3.bucket.name": "rmoff-demo-orders-02",
"s3.part.size": "5242880",
"flush.size": "3",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"schema.compatibility": "NONE"
}
}'
One thing to note is that we’re using "key.converter":"org.apache.kafka.connect.storage.StringConverter", because the messages that KSQL is writing are not keyed with Avro, but String. Without this override Kafka Connect will use the default worker settings (which in my case are Avro), and the task will fail with a org.apache.kafka.connect.errors.DataException, and error detail Error deserializing Avro message for id -1 Unknown magic byte!.
Check that the connector’s running:
$ curl -s "http://localhost:8083/connectors"| jq '.[]'| xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
csv-source-orders | RUNNING | RUNNING
mysql-demo-customers | RUNNING | RUNNING
s3-sink-orders | RUNNING | RUNNING
Go to S3 and you’ll see the files now exist! You can use the web GUI, or the aws cli:
$ aws s3 ls rmoff-demo-orders-02/topics/ORDERS_ENRICHED/partition=0/
2018-03-16 15:14:31 878 ORDERS_ENRICHED+0+0000000000.avro
2018-03-16 15:14:32 891 ORDERS_ENRICHED+0+0000000003.avro
2018-03-16 15:14:32 882 ORDERS_ENRICHED+0+0000000006.avro
2018-03-16 15:14:33 897 ORDERS_ENRICHED+0+0000000009.avro
2018-03-16 15:14:34 893 ORDERS_ENRICHED+0+0000000012.avro
People use S3 for a variety of reasons, and being able to stream data into it from Kafka via the Kafka Connect S3 connector is really useful. In this example here we can take the data, and use AWS’s Quicksight to do some analytical visualisation on top of it, first exposing the data via Athena and auto-discovered using Glue.
Summary
Apache Kafka and KSQL make for a powerful toolset for integrating and enriching data from one or more sources. All that’s needed is configuration files and SQL—not a single line of code was written in creating this article! Kafka Connect makes it easy to ingest data from numerous sources, as well as stream data from Kafka topics to many different targets.
Itching to get started with KSQL and see what it can do?
To read more about building streaming data pipelines with Apache Kafka and KSQL, check out the following articles: