Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: klip-15 New client and API #4069

Merged
merged 12 commits into from
Jan 11, 2020
325 changes: 325 additions & 0 deletions design-proposals/klip-15-new-api-and-client.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,325 @@
# KLIP 15 - ksqlDB Client and New Server API

**Author**: Tim Fox (purplefox) |
**Release Target**: ? |
**Status**: _In Discussion_ |
**Discussion**:

Please read the initial discussion on the ksqlDB developer group first:

https://groups.google.com/forum/#!topic/ksql-dev/yxcRlsOsNmo

And there is a partially working prototype for this work described here:

https://groups.google.com/forum/#!topic/ksql-dev/5mLKvtZFs4Y

For ksqlDB to be a successful project we need to provide an awesome out of the box developer experience
and it should be super easy and fun to write powerful event streaming applications.
Currently this is somewhat diffificult to do, and it requires the use of multiple clients and multiple moving parts in order to build
a simple event sourcing / CQRS style application.
We should provide a new ksqlDB client that provides that currently missing single interaction point with the event streaming
platform so that we can improve the application development process.
To support the client we should provide an updated HTTP2 based API that allows streaming use cases to be handled better.

## Motivation and background

In order to increase adoption of ksqlDB we need the developer experience to be as slick as possible. Currently, in order to write
a simple event streaming application with ksqlDB, up to 4 clients may be needed:

1. An HTTP client to access the current HTTP/REST API
2. A WebSocket client to access the streaming query WebSockets endpoint
3. A JDBC client to access data in JDBC tables that are synced with KSQL via connect, as the current support for
pull queries does not make the data otherwise easily accessible.
4. A Kafka client for producing and consuming messages from topics.

This is a lot for an application to handle and creates a steep learning curve for application developers.

Moreover, our current HTTP API is lacking, especially in the area of streaming query results which makes it difficult to write an application
that uses this API.

This KLIP proposes that we:

* Create a ksqlDB client (initially in Java and JavaScript) as the primary interaction point to the event streaming platform
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To further the KLIP discussion from the JavaScript perspective, I opened a PR against @purplefox's prototype branch: purplefox#12.

From the PR description:

Provides a JS (TypeScript) implementation of the wire protocol and client semantics proposed in KLIP-15. The client works in Node.js and browser runtimes.

The goals of this exercise are to:

  1. Demonstrate a non-Java impl of KLIP-15's novel wire protocol.
  2. Consider the ideal client API for consuming streaming queries in JavaScript.
  3. Take the perspective of a front-end developer building a web app powered by ksqlDB.

The included example ports the KLIP's ksql-real-app to a single-page web app:
shopping-cart

See the README for information on how to run the example.

I will follow up in this thread with more thoughts on the outlined goals above.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Demonstrate a non-Java impl of KLIP-15's novel wire protocol.

I found it pretty straightforward to port the frame types and their binary encoding based on the Java implementation. I wrote some tests to confirm my understanding. And for sure there are many opportunities for optimization in my sample port.

It was a bit more challenging to build a mental model for the frame sequencing based only on the Java reference impl, particularly (and naturally so) for handling streamQuery semantics. Still, it was totally tractable, since the protocol is so simple.

Assuming we document the protocol specification as part of the KLIP implementation, I think folks would have a relatively easy time porting the suggested protocol to other languages.

Copy link
Contributor

@colinhicks colinhicks Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Consider the ideal client API for consuming streaming queries in JavaScript.

The client API for flowing push query results through a JS application is a principal opportunity to tell the reactive streaming story. On the flip side, if the developer experience for consuming pushes is less than ideal, we're not doing ksqlDB justice. Same for Java.

Should streamQuery results return the JS implementation of concurrent.Flow?

While there is an argument for emulating the Java client usage patterns, this seems overly prescriptive. Flowable has little traction in the JS world. In the worst case this approach creates a perception that the JS client is second-class.

What about RxJS Observables?

RxJS is de facto-ish for reactive patterns. Folks already using this library would be delighted. But if you are not acquainted with the Rx way, the breadth of concepts it prescribes knowing could be intimidating.

What is the case for AsyncIterable?

It's part of the ES 2018 standard. Its cross section is way smaller than the RxJS option.

Do we need to offer poll() in addition?

Probably. Familiarity is important for folks and AsyncIterable is new. Returning a simple event emitter might be the best bet.

What about client API methods other than streamQuery (with pushes)?

For other request patterns, there's not as much to consider. In general, it's natural to model the async work as a Promise in the same cases for which we return CompletableFuture in Java.

Copy link
Contributor

@colinhicks colinhicks Dec 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Take the perspective of a front-end developer building a web app powered by ksqlDB.

For me this was the fun part. FWIW, I did it after the bulk of the client implementation, and it felt really good to make stuff tangible. The React port of the ksql-real-app example is very little code.

Here's a way to deliver push queries with React hooks:
import React, { useState, useEffect } from 'react';

// where `db` is the ksqlDB client connection
 const usePushQuery = (db, query) => {
  const [results, setResults] = useState({ rows: [], error: null });

  useEffect(() => {
    (async () => {
      const rows = [];
      try {
        const pushResults = await db.streamQuery(query);
        for await (const row of pushResults) {
          rows.push(row);
          setResults({ rows });
        }
      } catch (error) {
        setResults({ error });
      }
    })();
  }, [query]);

  return results;
};

// usage in component fn
  const { rows, error } = usePushQuery(db, `SELECT * FROM ORDER_REPORT EMIT CHANGES`);

ksqlDB + JS can be as delightful as it is powerful.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome :)

for an application
* Create a new HTTP2 based server API using Vert.x that supports the client and enables the functionality of the client to be delivered in a straightforward
and efficient way
* Migrate the functionality of the current HTTP/REST API over to the new server implementation and retire the parts that are no longer needed.

## What is in scope

### The client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@purplefox can you include some mock/example client code that illustrates how using a client might look and feel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @derekjn - I put together a prototype which shows a real(ish) client, examples, and an example app. Links are on the post here https://groups.google.com/forum/#!topic/ksql-dev/5mLKvtZFs4Y


* We will create a new client, initially in Java (and potentially in JavaScript). Clients for other languages such as Python and Go will follow later.
* The client will initially support execution and streaming of queries (both push and pull), inserting of rows into streams, DDL operations and admin operations
such as list and describe.
* The client will support a reactive / async interaction style using Java CompletableFuture and Reactive Streams / JDK9 Flow API (or a copy thereof if we cannot upgrade from Java 8)
* The client will also support a direct / synchronous interaction style.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on json being more friendly for development.. But should we additionally consider a binary encoding format like protobuf as the payload over http/2, for better performance? If you think, this can be sequenced in a later phase. might be good to call that out here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the performance is inadequate we can consider this, but I suspect it will be sufficient for our target use cases.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From personal experience, I'd avoid PB for a public API. PB can work well as a serialization format within an organisation. Though I've still seen large orgs get stuck on one specific PB version.

* The server API will be very simple, based on HTTP2 with a simple text/json based encoding therefore it will be very simple to use directly from vanilla
HTTP2 clients if a ksqldb client is not available

### The Server API

We will create a new, simple HTTP2 API for streaming query results from server to client and
for streaming inserts from client to server. We will use a simple text/JSON encoding for data.
Please note, this is not a REST API, it's a streaming HTTP API. The API does not follow REST principles. REST is inherently designed for request/response (RPC)
style interactions, not streaming.

The API will have the following characteristics:

* Multiplexed (because of HTTP2 multiplexing)
* Back-pressure (because of HTTP2 flow control)
* Text based so easy to use and view results using command line tools such as curl
* Can be used from any vanilla HTTP2 client for any language
* Simple newline based delimitation so easy to parse results on client side
* JSON encoding of data so very easy to parse as most languages have good JSON support
* CORS support
* HTTP basic auth support
* TLS
* Easy to create new clients using the protocol

#### Query streaming

The request method will be a POST.

Requests will be made to a specific URL, e.g. "/query-stream" (this can be configurable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by "configurable", do you mean we can decide on specific name later? or actually configurable per server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either way, I don't have a strong opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find a better default name? Why not just query?


The body of the request is a JSON object UTF-8 encoded as text, containing the arguments for the
operation (newlines have been added here for the sake of clarity but the real JSON must not contain newlines)

````
{
"query": "select * from foo", <----- the SQL of the query to execute
"push": true, <----- if true then push query, else pull query
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it required to encode the query type? The query parameter contains this information implicitly, doesn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we find a better default name? Why not just query?

I don't have a strong opinion either way. "query-stream" is really just a placeholder. We can easily change it before release.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The KLIP is a bit out of date. "push" is indeed not needed (and isn't present in the code).
As a general point, I have deliberately tried to keep the KLIP as light as possible on details. I am not a big fan of detailed up-front design. In reality things will change a lot during development, so it's much more efficient to just get on with the development, see what works and what doesn't, than spend a lot of time trying to specify things at a detailed level at the beginning. The "design doc" can then be updated retrospectively as a good design is discovered during development. I will update the klip to reflect the current state.

"limit": 10 <---- If specified return at most this many rows,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious: would nt this be in the sql itself as a LIMIT clause?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was meant as an illustration, not an exact definition of the arguments to the real endpoint. (I think there is a comment in there already to that effect ;) )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense @purplefox. Otherwise I would have asked why there needs to be a "push" field, as the behavior of push vs. pull should be determined by the query itself (cf. "query"), not by an out-of-band parameter. :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was meant as an illustration, not an exact definition of the arguments to the real endpoint.

Should the KLIP not reflect the exact definition? IMHO, the KLIP should allow anybody to write a ksqlDB client in their language of choice.

I think there is a comment in there already to that effect

This sentence should go before the "illustrative example" to set expectations right upfront.

"properties": { <----- Optional properties for the query
"prop1": "val1",
"prop2": "val2"
}
}

````

Please note the parameters are not necessarily exhaustive. The description here is an outline not a detailed
low level design. The low level design will evolve during development.

In the case of a successful query

````
{
"query-id", "xyz123", <---- unique ID of the query, used when terminating the query
"columns":["col", "col2", "col3"], <---- the names of the columns
"column_types":["BIGINT", "STRING", "BOOLEAN"] <---- The types of the columns
"row_count": 101 <---- The number of rows - only set in case of pull query
}
````

Followed by zero or more JSON arrays:

````
[123, "blah", true]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we write the arrays outside of the json (IIUC) above, because when we stream results back, we don't know how many rows there are to return..

But in cases we do, e.g pull queries, could we support an encoding where this array is within the json above and the overall response is a valid json (as it is today)? The advantage of doing that would be

  • Opens up integration with lots of systems/tools that can do something useful by calling an endpoint and getting a json. cc @AlanConfluent @vpapavas who have built a ksqlDB/Grafana integration, that uses pull queries results returns as json.. I think the new encoding would break such an use-case?
  • Even for push queries, IMO returning a full json is great because, then a client can fully consume a small table with a LIMIT clause added.. Again we had a real use-case for this in the grafana integration, where we wanted to write a push query to consume all the new metric names (not actual metric events) ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n), which isn't preferable. Also, some frameworks, including grafana make assumptions about a single parsable json response. There might be workarounds, but this would probably go against a lot of clients out there. Best to avoid this.

Also, I think there's certainly something nice about getting just one format that has all of the column information and results in a single response. Consistency between pull and push would also be nice, so that handling logic could be shared, even if it means that results are a bit more verbose.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we write the arrays outside of the json (IIUC) above, because when we stream results back, we don't know how many rows there are to return..

This is more about we don't want to force users to have to parse huge JSON objects into memory in order to do something useful with them. Yes, streaming JSON parsers do exist but we shouldn't assume they do on all target client languages or mandate users to use them. Async (non blocking) streaming JSON parsers are even more rare.

But in cases we do, e.g pull queries, could we support an encoding where this array is within the json above and the overall response is a valid json (as it is today)? The advantage of doing that would be

We shouldn't assume that pull query results will be small, they could be huge. However, if you can provide a limit clause and thus provide a hard limit on the results, then returning the results as a single JSON object can make sense.

We can easily enable this by providing the content-type in the accepts header of the request. I.e. if you want the results a single JSON object provide content-type="text/json", we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default). The server can then fail requests with content-type="text/json" if a limit clause hasn't been provided.

  • Opens up integration with lots of systems/tools that can do something useful by calling an endpoint and getting a json. cc @AlanConfluent @vpapavas who have built a ksqlDB/Grafana integration, that uses pull queries results returns as json.. I think the new encoding would break such an use-case?

All of the proposals for the new API (custom protocol, gRPC) would break this. Custom protocol + gRPC are definitely not JSON ;)

So I'm working on the assumption that you would have to fix Grafana anyway. Presumably the tool allows you to provide some kind of plugin that you can code to work with some kind of custom API? If not, it seems rather limited.

I don't think it's generally a good approach to constrain our API based on the requirements of one particular tool which we happen to use internally.

Having said that, if Grafana does require a single JSON, then the new API can support this using an accepts header as mentioned above :)

  • Even for push queries, IMO returning a full json is great because, then a client can fully consume a small table with a LIMIT clause added.. Again we had a real use-case for this in the grafana integration, where we wanted to write a push query to consume all the new metric names (not actual metric events) ...

As long as limit is specified I see no issue with that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n), which isn't preferable. Also, some frameworks, including grafana make assumptions about a single parsable json response. There might be workarounds, but this would probably go against a lot of clients out there. Best to avoid this.

Please see reply to Vinoth who made a similar observation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another advantage of not providing results as a single json element. Quite often in the world of streaming you're just piping streams from one place to another. A concrete use case for this is piping the output of a query to a websocket or SSE so it can be sent to a browser (this will probably be a common use case for ksql based apps).
Ih this case you don't need to look at the individual JSON results you just want to pick up each individual row as bytes and write them as bytes. Using a streaming JSON parser you'd end up deserialising the JSON from bytes only to deserialise it again back to bytes before writing it to the websocket. This is very inefficient. Using some kind of framing outside the JSON (in this case delimited, but it also could be length prefixed) allows you to pick out the elements much more easily and efficiently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n),

It's generally going to be a lot simpler to parse on newlines in then to assume the presence of a streaming JSON parser in and figure out how to use it to extract the individual elements.

I do accept there are some use cases where you know the results are not going to be large (and you use limit) and you want to parse them all as a single JSON object, but this is a streaming API so we can't make that assumption in general.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you want the results a single JSON object provide content-type="text/json", we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default)

nit: the single-object content type should be application/json to match RFC 4627.

we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default)

For reference, the custom content type supported by the existing REST API is application/vnd.ksql.v1+json.

Copy link
Contributor

@colinhicks colinhicks Jan 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A concrete use case for this is piping the output of a query to a websocket or SSE

Speaking of this, what are the downsides of using SSE as the transport directly (over http/2), for streaming query responses? A response of Content-Type text/event-stream corresponding to the example under the KLIP's Query streaming heading could look like:

event: query-success
data: { "query-id", "xyz123", "columns":["col", "col2", "col3"],  "column_types":["BIGINT", "STRING", "BOOLEAN"], "row_count": 101 }

event: query-data
data: [123, "blah", true]

data: [432, "foo", true]

data: [765, "whatever", false]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think SSE is a valid alternate API we can provide that some users might prefer. One thing about the new API implementation is the large majority of the work is in setting up the infrastructure, getting the new Vert.x based server plugged-in and integrating it with the current backend. Once that is done exposing the query streams in different formats is relatively straightforward - it's pretty much just writing the output to the response in a different way. I'd recommend we initially ship with the formats as described in the KLIP and we can consider adding SSE support at a later date (it wouldn't be hard).

[432, "foo", true]
[765, "whatever", false]
````

Each JSON array or row will be delimited by a newline.

For a pull query the response will be ended by the server once all rows have been written, for
a push query the response will remain open until the connection is closed or the query is explicitly
terminated.

#### Terminating queries

Push queries can be explicitly terminated by the client by making a request to this endpoint

The request method will be a POST.

Requests will be made to a specific URL, e.g. "/query-terminate" (this can be configurable)

The body of the request is a JSON object UTF-8 encoded as text, containing the arguments for the
operation (newlines have been added here for the sake of clarity but the real JSON must not contain newlines)

````
{
"query-id": "xyz123", <----- the ID of the query to terminate
}

````


#### Inserts

The request method will be a POST.

Requests will be made to a specific URL, e.g. "/insert-stream" (this can be configurable)

The body of the request is a JSON object UTF-8 encoded as text, containing the arguments for the
operation (newlines have been added for clarity, the real JSON must not contain newlines):

````
{
"stream": "my-stream" <----- The name of the KSQL stream to insert into
"acks": true <----- If true then a stream of acks will be returned in the response
}

````

Followed by zero or more JSON objects representing the values to insert:

````
{
"col1" : "val1",
"col2": 2.3,
"col3", true
}
````
Each JSON object will be separated by a new line.

To terminate the insert stream the client should end the request.

If acks are requested then the response will be written to the response when each row has been
successfully committed to the underlying topic. Rows are committed in the order they are provided.
Each ack in the response is just an empty JSON object, separated by newlines:
Copy link
Member

@mjsax mjsax Jan 25, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reason for this design? When data is written into Kafka, the producer puts multiple records into a batch and effectively acks on a per batch basis. Would it be more efficient to apply a similar pattern?

What happens if a record cannot be inserted for whatever reason (for example a single record could exceed the maximum allowed message size, or the JSON is invalid and cannot be parsed).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too have concerns around the error path.

If we can stream results to the server, and get an empty ack message streamed back on success, then I'm wondering what we're expecting to happen on failure?

Are we going to end up with all client impls needing to track in-flight requests?

To allow clients to correlate error response to request we could always include the request itself as part of the error response.

Also, when is the ACK sent back? Once the server has processed the request and enqued the message to be produced to Kafka, or when the producer call back confirms the message was produced to Kafka? Hopefully the second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the KLIP not reflect the exact definition? IMHO, the KLIP should allow anybody to write a ksqlDB client in their language of choice.

There will be API documentation on the web-site which will explain the exact endpoints, arguments and how to use them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens is a record cannot be inserted for whatever reason (for example a single record could exceed the maximum allowed message size, if the JSON is invalid and cannot be parsed).

In case of error, an error response is sent back on the same stream and the stream is closed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully the second.

Ack!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To allow clients to correlate error response to request we could always include the request itself as part of the error response.

This is unnecessary as acks are always returned in the same order the inserts were sent. From the clients point of view this makes correlation of responses a bit simpler as they don't have to maintain a map of insert_id, request. They just need to store a queue of requests and when responses come back pop the head of the queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok...on second thoughts, I am going to send back the send_id in the ack. The reason being that on the server, acks for sends to Kafka can come back in a different order if they are sent to different partitions. I don't want to have to sort the acks on the server, so I'll send them back as they come in and the client can correlate them.


````
{}
{}
{}
{}
````

#### Errors

Apropriate status codes will be returned from failed requests. The response will also contain JSON
with further information on the error:

{
"error_code": <Error code>
"message": <Error Message>
}

#### Non streaming results

The API is designed for efficiently streaming rows from client to server or from server to client.

The amount of data that is streamed in any specific query or insert stream can be huge so we want to
avoid any solution that buffers it all in memory at any one time or requires specialised parsers to
parse.

For this reason we do not provide query results (or accept streams of inserts) by default as a single
JSON object. If we did so we would force users to find and use a streaming JSON parser in order to
parse the results as they arrive. If no parser is available on their platform they would be forced
to parse the entire result set as a single JSON object in memory - this might not be feasible or
desirable due to memory and latency constraints. Moreover, in the world of streaming it's quite common
to want to pipe the stream from one place to another without looking into it. In that case it's very
inefficient to deserialize the bytes as JSON simply to serialize them back to bytes to write them
to the output stream. For these reasons the results, by default, are sent as a set of JSON arrays
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case it's very inefficient to deserialize the bytes as JSON simply to serialize them back to bytes to write them to the output stream.

Not sure if I can follow. Why would somebody not be able to just copy bytes for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite a common pattern in messaging (e.g. when implementing a router) to want to only inspect the headers of a message and not deserialise the body of the message for efficiency reasons. If the whole envelope was JSON you'd have to deserialise the whole thing. In the future we might want to introduce headers outside of the JSON body.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will however remove this sentence as we don't support headers (yet)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense -- it not clear from the original write up though. It sounds like as if different message are sent a JSON array, what only helps to get efficient access to a single message. (And does not help at all for the use case you describe.)

If we want to support headers, each message itself would need to be represented a s JSON array though, with explicit record/header "slot" for example. Is that what you tried to say?

Atm it seems that a single record is a single JSON with a simple tuple layout (not even the Kafka internal attribute-to-key/value mapping can be inferred -- what is desired I guess -- only the schema metadata would allow to infer the mapping -- but we want to hide the internal Kafka key/value model anyway I guess).

Just following up to make sure I understand correctly.

delimited by newline. Newlines are very easy to parse by virtually every target platform without
having to rely on specialist libraries.

There are, however, some use cases where we can guarantee the results of the query are small, e.g.
when using a limit clause. In this case, the more general streaming use case degenerates into an RPC
use case. In this situation it can be convenient to accept the results as a single JSON object as we
may always want to parse them as a single object in memory. To support this use case we can allow the request
to contain an accept-header specifying the content-type of the response. To receive a response as
a single JSON object content-type should be specified as 'text/json', for our delimited format we
will specify our own custom content type. The delimited format will be the default, as the API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for our delimited format we will specify our own custom content type

The KLIP should specify this content type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be in the detailed API docs. At the moment it's just a placeholder.

is primarily a streaming API.

### Migration of existing "REST" API

We will migrate the existing Jetty based "REST" API to the new Vert.x based implementation as-is or with
minor modifications.

We will migrate the existing Jetty specific plug-ins to Vert.x

### Server implementation

The new server API will be implemented using Vert.x

The implementation will be designed in a reactive / non blocking way in order to provide the best performance / scalability characteristics with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: non-blocking

low resource usage. This will also influence the overall threading model of the server and position us with a better, more scalability internal
server architecture that will help future proof the ksqlDB server.

## What is not in scope

* We are not creating a new RESTful API.
* We are not currently looking to write clients for languages other than Java and JavaScript (although those may follow later)
* We will not initially allow for consuming directly from Kafka topics using the client (although tha may follow later)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to clarify that, if we choose that route, the encoding would still remain the same? i.e Kafka might just replace the http/2 transport and all of this will be transparently hidden under the client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea. Let's cross that bridge if we ever come to it. It's certainly not in scope for this KLIP :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not in scope of this KLIP, but it would be good to look at whether the new client interfaces are built to easily wrap the Kafka clients in a variety of languages. The KSQL Server shouldn't become a Kafka REST proxy. And ideally a KSQL client shouldn't have to implement the Kafka protocol.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether we decide to support direct access to Kafka topics from the ksqlDB client in the future or not, and how we decide to implement that, e.g. wrapping the Kafka client or proxying through ksqlDB (I would prefer proxying as it would be a pain to have to configure the client to know about two sets of servers) has no impact on the form of the client API that we're proposing here, afaict.


## Value/Return

We hope that by providing a delightful, easy to use client and new HTTP2 based server API, it will enable application developers to easily write powerful
applications that take advantage of their data plane / event streaming platform more effectively.

We hope that this could be transformative for the project in terms of adoption as it would position ksqlDB as a great choice for
writing typical event sourcing / CQRS / DDD style applications, which are currently hard to write using ksqlDB alone.

There are also further incidental advantages gained by this work. By using a modern server side implementation such as Vert.x
there are benefits in relation to performance, scalability, simplicity of implementation,
and threading model. Not to mention reduced dependencies and better resource usage. This will set us up better for the kinds of high
throughput operations that we will need to implement efficiently now that the project has pivoted to a more active application
facing server rather than a more passive DDL engine.

## Public APIS

The following changes / additions to APIs will occur:

* We will provide a new HTTP2 based streaming API. This will not be accessible using HTTP1.1
* The current chunked streaming and websockets streaming endpoints will be retired.
* The old API will be migrated to Vert.x possibly with some modifications and be accessible over HTTP1.1 and HTTP 2
* The Java client will provide a new public API

## Design

### The client

* The Java client will provide an API based on Reactive Streams and completable future. We can also consider providing a JDK 9 shim using the Flow API for those
users using Java 9+
* The networking will be handled by Vert.x (which uses Netty).
* The client will have minimal transitive jar dependencies - this is important as the client will be embedded in end user applications.
* Client connections are designed to be re-used.
* The client will be thread-safe.

### The server

* The toolkit used on the server side will be Vert.x
* The current Jetty / JAX-RS usage will be retired.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite being genuinely excited to learn Vert.x and rewrite our bulky server, I'm wondering if we can de-risk this KLIP by splitting out the server rewrite from the client/protocol work. Is there any way our existing server implementation can be used for phase 1? That way we can move forward with the wire protocol and the client implementation without working on a server 2.0.

The reason is that we need to make sure that all of the existing functionality that depends on the confluent's rest-utils (some port of dropwizard) and the jakarta/hk2 injection mechanisms still work as these are the standard method for confluent components (and there are plugins that already integrate with these injection mechanisms). Perhaps it is possible with Vert.x (and it's not unlikely that I'm totally missing the

@rodesai / @spena might be able to shine some light on this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite being genuinely excited to learn Vert.x and rewrite our bulky server, I'm wondering if we can de-risk this KLIP by splitting out the server rewrite from the client/protocol work. Is there any way our existing server implementation can be used for phase 1? That way we can move forward with the wire protocol and the client implementation without working on a server 2.0.

Yes, absolutely - I think the work on hosting the new protocol, and porting the existing API to Vert.x can be done in different phases. It just means we would have both Vert.x and Jetty servers running in parallel until the old API is ported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason is that we need to make sure that all of the existing functionality that depends on the confluent's rest-utils (some port of dropwizard) and the jakarta/hk2 injection mechanisms still work as these are the standard method for confluent components (and there are plugins that already integrate with these injection mechanisms). Perhaps it is possible with Vert.x (and it's not unlikely that I'm totally missing the

@rodesai / @spena might be able to shine some light on this

I chatted with @spena a couple of days about this and Sergio kindly showed me the code for the current Jetty plugins. It seemed to me they could be fairly easily ported to Vert.x, although there may be some angles I haven't completely understood.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about decoupling the API design versus the threading model from this KLIP.

Here's a java client lib that I wrote against the current API : https://github.com/daniellavoie/ksqldb-java-client

It's fully reactive (RxJava like) with Project Reactor. It supports Observable through Mono and Flux. I currently support all the push and pull queries in a observable way. The same library could be easily written for Node.js server but also frontend Javascript and Typescript.

From a developer perspective, having that client API is a good start. How the server behaves from performance perspective is not my problem. Reactive or blocking, I shouldn't be aware of that. I'm all for reactive. I would recommend Webflux from Spring over VertX but that is a different discussion. Simple HTTP 1.1 with REST and SSE events (that are super well supported on the browser) are good enough from a client perspective. gRPC, WebSocket and anything else is just friction and more complication for the developer. If the backend server is reactive, SSE will not exhaust the thread pool from the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @daniellavoie
I think it's important that our client API isn't tightly coupled to Spring, which is why I use vanilla jdk 9 flow interfaces (ok, they are backported for now!). (Not everyone wants to use Spring).
For those that want a very Spring Reactor style API, it would be a simple matter to provide a shim over our client to expose Mono/Flux etc :)

Copy link

@daniellavoie daniellavoie Jan 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@purplefox Reactor has no dependencies on Spring. It's a foundation for Spring. So clients pulls no dependencies from it. But I agree with you that offering a shim would be better approach. I'm concerned about the JDK 9 flow interface because it would enforce clients to use JDK11+. A lot of enterprise will be running Java 8 for a long time. Reactive Streams would probably the best interface for maximum compatibility (Flow is based on it actually).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies - I use "Spring" in a loose sense - meaning the Spring ecosystem, and reactor is a part of that. I should have said "I think it's important that our client API isn't tightly coupled to Project Reactor". Flow is in JDK 9 (not 11), but I agree, better standardise on Reactive Streams and optionally provide a JDK 9 flow shim for those who are using Java 9+

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentionned JDK 11 because 9 and 10 are not LTS. Only 11.

* The current non streaming HTTP/REST endpoints will be migrated to Vert.x - this should modernise and radically simplify and
clarify the server side implementation result in a cleaner implementation and reduced lines of code.
* The current query streaming endpoints (chunked response and Websockets) will be retired.
* Any current Jetty specific plugins (e.g. security plugins) will be migrated to Vert.x
* Vert.x has great support for working with various different network protocols and has has [unrivalled performance/scalability](https://www.techempower.com/benchmarks/)
characteristics for a JVM toolkit. It will also set us up well for a fully async / reactive internal threading model in the server that we
should aim towards for the future.

## Test plan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have stress or performance test plan too so we can catch bugs when running several clients. Sometimes severs can hang due to the number of available threads it can handle.


We will require unit/module level tests and integration tests for all the new or changed components as per standard best practice.

## Documentation Updates

* We will produce new guide(s) for the Java and JavaScript clients.
* We will provide a new guide for the new HTTP API and retire the existing "REST" API documentation.
* We will produce example applications showing how to use the client in a real app. E.g. using Spring Boot / Vert.x (Java) and Node.js (JavaScript)
* There may be some server side configuration changes due to the new server implementation.

# Compatibility Implications
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if the security handlers used by rest-utils will be compatible (see the note in Security Implications). If it is not compatible, should that be noted here?


The current chunked response query streaming endpoint will be removed so any users currently using that will have to upgrade.

The current websockets query streaming endpoint will be removed so any users currently using that will have to upgrade.
This endpoint is currently undocumented so it's not expected a lot of users are using it. It is used by C3 (?)
so that will need to be migrated to the new API.

There may be some minor incompatible changes on the migrated old server API.

## Performance Implications

* Streaming query results with the new client and server side implementation should provide significantly better performance than
the current websockets or HTTP streaming APIs
* Using Vert.x in general for hosting the API will provide excellent performance and scalability with very low resource usage.
vinothchandar marked this conversation as resolved.
Show resolved Hide resolved

## Security Implications

The new protocol should be available over TLS and we should support the same auth approach as we do with the current API
so there should be no extra security implications.
Comment on lines +322 to +325
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a security handler in Confluent that creates a user token during the authentication and passes that token to KSQL, then to Kafka. This confluent security handler uses is not part of rest-utils nor KSQL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this some more? I'm not sure I understand