Skip to content

Commit

Permalink
docs: start updating docs to include EMIT CHANGES for push queries (#…
Browse files Browse the repository at this point in the history
…3472)

* docs: start updating docs to include `EMIT CHANGES` for push queries
  • Loading branch information
big-andy-coates authored Oct 4, 2019
1 parent e5e4512 commit 691569f
Show file tree
Hide file tree
Showing 29 changed files with 261 additions and 130 deletions.
15 changes: 9 additions & 6 deletions docs/capacity-planning.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Some queries require that the input stream be repartitioned so that all messages

.. code:: sql
CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;
DESCRIBE EXTENDED pageviews_by_page;
Your output should resemble:
Expand All @@ -92,7 +92,7 @@ Some queries require that the input stream be repartitioned so that all messages
...
Queries that write into this TABLE
-----------------------------------
id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
id:CTAS_PAGEVIEWS_BY_PAGE - CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Expand All @@ -107,7 +107,7 @@ Some queries require that the input stream be repartitioned so that all messages
::

Type : QUERY
SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid;
SQL : CREATE TABLE pageviews_by_page AS SELECT pageid, COUNT(*) FROM pageviews_original GROUP BY pageid EMIT CHANGES;

Execution plan
--------------
Expand Down Expand Up @@ -313,7 +313,8 @@ out all the views that lasted less than 10 seconds:
WITH (PARTITIONS=64) AS
SELECT *
FROM pageviews_original
WHERE duration > 10;
WHERE duration > 10
EMIT CHANGES;
KSQL
++++
Expand Down Expand Up @@ -354,13 +355,15 @@ and then count up views by city:
CREATE STREAM pageviews_meaningful_with_user_info
WITH (PARTITIONS=64) AS
SELECT pv.viewtime, pv.userid, pv.pageid, pv.client_ip, pv.url, pv.duration, pv.from_url, u.city, u.country, u.gender, u.email
FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid;
FROM pageviews_meaningful pv LEFT JOIN users u ON pv.userid = u.userid
EMIT CHANGES;
CREATE TABLE pageview_counts_by_city
WITH (PARTITIONS=64) AS
SELECT country, city, count(*)
FROM pageviews_meaningful_with_user_info
GROUP BY country, city;
GROUP BY country, city
EMIT CHANGES;
KSQL
++++
Expand Down
3 changes: 2 additions & 1 deletion docs/concepts/ksql-and-kafka-streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ For example, to implement simple fraud-detection logic on a Kafka topic named
CREATE STREAM fraudulent_payments AS
SELECT fraudProbability(data) FROM payments
WHERE fraudProbability(data) > 0.8;
WHERE fraudProbability(data) > 0.8
EMIT CHANGES;
The equivalent Java code on Kafka Streams might resemble:

Expand Down
3 changes: 2 additions & 1 deletion docs/concepts/ksql-architecture.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,8 @@ from the ``authorization_attempts`` stream:
WINDOW TUMBLING (SIZE 5 SECONDS)
WHERE region = ‘west’
GROUP BY card_number
HAVING count(*) > 3;
HAVING count(*) > 3
EMIT CHANGES;
The KSQL engine translates the DML statement into a Kafka Streams application.
The application reads the source topic continuously, and whenever the
Expand Down
15 changes: 10 additions & 5 deletions docs/concepts/time-and-windows-in-ksql-queries.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ run a query like this:
SELECT regionid, COUNT(*) FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
The hopping window's start time is inclusive, but the end time is exclusive.
This is important for non-overlapping windows, in which each record must be
Expand Down Expand Up @@ -270,7 +271,8 @@ per zip code per hour in an ``orders`` stream, you might run a query like this:
.. code:: sql
SELECT orderzip_code, TOPK(order_total, 5) FROM orders
WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode;
WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY order_zipcode
EMIT CHANGES;
Here's another example: to detect potential credit card fraud in an
``authorization_attempts`` stream, you might run a query for the number of
Expand All @@ -281,7 +283,8 @@ a time interval of five seconds.
SELECT card_number, count(*) FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number HAVING COUNT(*) > 3;
GROUP BY card_number HAVING COUNT(*) > 3
EMIT CHANGES;
The tumbling window's start time is inclusive, but the end time is exclusive.
This is important for non-overlapping windows, in which each record must be
Expand Down Expand Up @@ -325,7 +328,8 @@ per region:
SELECT regionid, COUNT(*) FROM pageviews
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
The start and end times for a session window are both inclusive, in contrast to
time windows.
Expand Down Expand Up @@ -356,7 +360,8 @@ For example, to find orders that have shipped within the last hour from an
FROM new_orders o
INNER JOIN shipments s
WITHIN 1 HOURS
ON o.order_id = s.order_id;
ON o.order_id = s.order_id
EMIT CHANGES;
For more information on joins, see :ref:`join-streams-and-tables`.

Expand Down
15 changes: 10 additions & 5 deletions docs/developer-guide/aggregate-streaming-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ because the result of the query is a KSQL table.
SELECT regionid,
COUNT(*)
FROM pageviews
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
Tombstone Records
=================
Expand Down Expand Up @@ -63,7 +64,8 @@ This query computes the pageview count per region per minute:
COUNT(*)
FROM pageviews
WINDOW TUMBLING (SIZE 1 MINUTE)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
To count the pageviews for “Region_6” by female users every
30 seconds, you can change the previous query to the following:
Expand All @@ -76,7 +78,8 @@ To count the pageviews for “Region_6” by female users every
FROM pageviews
WINDOW TUMBLING (SIZE 30 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE(regionid)='region_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
Aggregate Records Over a Hopping Window
=======================================
Expand All @@ -97,7 +100,8 @@ and substring matching.
FROM pageviews
WINDOW HOPPING (SIZE 30 SECONDS, ADVANCE BY 10 SECONDS)
WHERE UCASE(gender)='FEMALE' AND LCASE (regionid) LIKE '%_6'
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
Aggregate Records Over a Session Window
=======================================
Expand All @@ -113,7 +117,8 @@ the input data and performs the counting step per region.
COUNT(*)
FROM pageviews
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
GROUP BY regionid
EMIT CHANGES;
For more information, see :ref:`time-and-windows-in-ksql-queries`.

Expand Down
18 changes: 9 additions & 9 deletions docs/developer-guide/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ Here's an example request that retrieves streaming data from ``TEST_STREAM``:
curl -X "POST" "http://localhost:8088/query" \
-H "Content-Type: application/vnd.ksql.v1+json; charset=utf-8" \
-d $'{
"ksql": "SELECT * FROM TEST_STREAM;",
"ksql": "SELECT * FROM TEST_STREAM EMIT CHANGES;",
"streamsProperties": {}
}'
Expand Down Expand Up @@ -217,7 +217,7 @@ statements use the ``/query`` endpoint.
Content-Type: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
Expand All @@ -232,7 +232,7 @@ statements use the ``/query`` endpoint.
[
{
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';",
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_HOME/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -241,7 +241,7 @@ statements use the ``/query`` endpoint.
"commandSequenceNumber":10
},
{
"statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice';",
"statementText":"CREATE STREAM pageviews_alice AS SELECT * FROM pageviews_original WHERE userid='alice' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_ALICE/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -266,7 +266,7 @@ similar to the example request above:
Content-Type: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;"
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home'; CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;"
}
The second method is to submit the statements as separate requests and incorporate the interdependency by using ``commandSequenceNumber``.
Expand All @@ -279,7 +279,7 @@ Send the first request:
Content-Type: application/vnd.ksql.v1+json
{
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';"
"ksql": "CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;"
}
Make note of the ``commandSequenceNumber`` returned in the response:
Expand All @@ -291,7 +291,7 @@ Make note of the ``commandSequenceNumber`` returned in the response:
[
{
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home';",
"statementText":"CREATE STREAM pageviews_home AS SELECT * FROM pageviews_original WHERE pageid='home' EMIT CHANGES;",
"commandId":"stream/PAGEVIEWS_HOME/create",
"commandStatus": {
"status":"SUCCESS",
Expand All @@ -311,7 +311,7 @@ execute until after command number 10 has finished executing:
Content-Type: application/vnd.ksql.v1+json
{
"ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid;",
"ksql": "CREATE TABLE pageviews_home_count AS SELECT userid, COUNT(*) FROM pageviews_home GROUP BY userid EMIT CHANGES;",
"commandSequenceNumber":10
}
Expand Down Expand Up @@ -346,7 +346,7 @@ The ``/query`` resource lets you stream the output records of a ``SELECT`` state
Content-Type: application/vnd.ksql.v1+json
{
"ksql": "SELECT * FROM pageviews;",
"ksql": "SELECT * FROM pageviews EMIT CHANGES;",
"streamsProperties": {
"ksql.streams.auto.offset.reset": "earliest"
}
Expand Down
11 changes: 7 additions & 4 deletions docs/developer-guide/create-a-stream.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ In the KSQL CLI, paste the following CREATE STREAM statement:
userid VARCHAR,
pageid VARCHAR)
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED');
VALUE_FORMAT='DELIMITED')
EMIT CHANGES;
Your output should resemble:

Expand Down Expand Up @@ -155,7 +156,8 @@ like this:
WITH (KAFKA_TOPIC='pageviews',
VALUE_FORMAT='DELIMITED',
KEY='pageid',
TIMESTAMP='viewtime');
TIMESTAMP='viewtime')
EMIT CHANGES;
Confirm that the TIMESTAMP field is ``viewtime`` by using the DESCRIBE EXTENDED
statement:
Expand Down Expand Up @@ -223,7 +225,8 @@ results from a persistent query that matches "introductory" pages that have a
CREATE STREAM pageviews_intro AS
SELECT * FROM pageviews
WHERE pageid < 'Page_20';
WHERE pageid < 'Page_20'
EMIT CHANGES;
Your output should resemble:

Expand Down Expand Up @@ -273,7 +276,7 @@ Your output should resemble:

Query ID | Kafka Topic | Query String
--------------------------------------------------------------------------------------------------------------------------------------------
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20';
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS SELECT * FROM pageviews WHERE pageid < 'Page_20' EMIT CHANGES;
--------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Expand Down
30 changes: 24 additions & 6 deletions docs/developer-guide/create-a-table.rst
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ statement:

.. code:: sql
SELECT * FROM users;
SELECT * FROM users EMIT CHANGES;
Your output should resemble:

Expand Down Expand Up @@ -144,7 +144,8 @@ results from a persistent query for users that have ``gender`` set to ``FEMALE``
CREATE TABLE users_female AS
SELECT userid, gender, regionid FROM users
WHERE gender='FEMALE';
WHERE gender='FEMALE'
EMIT CHANGES;
Your output should resemble:

Expand Down Expand Up @@ -206,7 +207,7 @@ Your output should resemble:

Query ID | Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------
CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE';
CTAS_USERS_FEMALE_0 | USERS_FEMALE | CREATE TABLE users_female AS SELECT userid, gender, regionid FROM users WHERE gender='FEMALE' EMIT CHANGES;
-----------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

Expand All @@ -225,7 +226,8 @@ function like COUNT(*) in the SELECT clause.
CREATE TABLE pageviews_table AS
SELECT viewtime, userid, pageid, COUNT(*) AS TOTAL
FROM pageviews_original WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY viewtime, userid, pageid;
GROUP BY viewtime, userid, pageid
EMIT CHANGES;
Your output should resemble:

Expand All @@ -237,11 +239,11 @@ Your output should resemble:
---------------------------
ksql>

Inspect the table by using a SELECT statement.
Observe the changes happening to the table using a streaming SELECT statement.

.. code:: sql
SELECT * FROM pageviews_table;
SELECT * FROM pageviews_table EMIT CHANGES;
Your output should resemble:

Expand All @@ -257,6 +259,22 @@ Your output should resemble:
^CQuery terminated
ksql>

Lookup the value for a specific key within the table using a SELECT statement.

.. code:: sql
SELECT * FROM pageviews_table WHERE ROWKEY='1557183929488|+|User_9|+|Page_39';
Your output should resemble:

::

ROWKEY STRING KEY | WINDOWSTART BIGINT KEY | VIEWTIME BIGINT | USERID STRING | PAGEID STRING | TOTAL BIGINT
----------------------------------------------------------------------------------------------------------------------------
1557183929488|+|User_9|+|Page_39 | 1557183900000 | 1557183929488 | User_9 | Page_39 | 1
----------------------------------------------------------------------------------------------------------------------------
ksql>

Delete a KSQL Table
*******************

Expand Down
2 changes: 1 addition & 1 deletion docs/developer-guide/implement-a-udf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ Use the MULTIPLY function in a query. If you follow the steps in

::

SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original;
SELECT MULTIPLY(rowtime, viewtime) FROM pageviews_original EMIT CHANGES;

Your output should resemble:

Expand Down
6 changes: 4 additions & 2 deletions docs/developer-guide/join-streams-and-tables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ combination of a ``pageviews`` stream and a ``users`` table:
CREATE STREAM pageviews_enriched AS
SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews
LEFT JOIN users ON pageviews.userid = users.userid;
LEFT JOIN users ON pageviews.userid = users.userid
EMIT CHANGES;
For the full code example, see :ref:`ksql_quickstart-docker`.
Expand All @@ -51,7 +52,8 @@ expected time of two hours.
CREATE STREAM late_orders AS
SELECT o.orderid, o.itemid FROM orders o
FULL OUTER JOIN shipments s WITHIN 2 HOURS
ON s.orderid = o.orderid WHERE s.orderid IS NULL;
ON s.orderid = o.orderid WHERE s.orderid IS NULL
EMIT CHANGES;
Joins and Windows
*****************
Expand Down
Loading

0 comments on commit 691569f

Please sign in to comment.