-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs: klip-15 New client and API #4069
docs: klip-15 New client and API #4069
Conversation
Thanks for the detailed writeup @purplefox! I think this provides a great overview of one possible solution to designing a server API and corresponding client. On the other hand, for such a critical component of our system, I think we're missing a higher level discussion about the decision calculus (design goals) and different approaches we can take. I think we need to understand the priority of various features before we commit to a solution, and also understand what the product requirements are (cc @derekjn @MichaelDrogalis @colinhicks):
I'm probably missing a ton of other open questions that we might want to enumerate, and since you've clearly given this more thought than I have - I'd love to hear the tradeoffs from your perspective. It's also nice to know what alternatives we have (i.e. how does this compare to using Kafka clients for streaming? how does it compare to just improving our existing Websocket/HTTP APis? are there any other suggestions I'm missing) |
|
||
## What is in scope | ||
|
||
### The client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@purplefox can you include some mock/example client code that illustrates how using a client might look and feel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @derekjn - I put together a prototype which shows a real(ish) client, examples, and an example app. Links are on the post here https://groups.google.com/forum/#!topic/ksql-dev/5mLKvtZFs4Y
Hey @agavra , replies inline
Do you mean of the client, or of KSQL in general? There are lots of dimensions to performance/scalability - throughput, latency, scalability with number of connections / request, memory footprint etc. They're all important in their own way. There's nothing in this proposal that should reduce performance in any of those dimensions, and it's quite likely it will improve performance in all of them.
The client integrates with JDK 9 flow / reactive streams.
Initially just Java, also maybe JavaScript (TBD but I think it's a nice idea)
Is it ok to write a Java client? I'd say yes, that's the main purpose of this proposal. One thing to note, is that the clients are pretty easy to knock together. It took me a couple of days to write the Java one in the prototype, and I'd say it's probably around 50% complete.
HTTP requires a client too ;) But yes, it can be a standard client. For those kinds of interactions (specifically request/response) that are well suited to HTTP I think it we should consider allowing them over HTTP as an alternative access method. This proposal doesn't change that. Some things though (e.g. streaming) are just not well suited to HTTP.
In later versions I think we can consider adding extra features such as transactions and EOS to the client. But I am trying to keep things simple for now and provide the simplest API that allows the user to create a compelling application. Also, note that we're not trying to compete with the Kafka client and many users might want to continue using that especially for more sophisticated interactions.
Probably not much different to the cost of maintaining any other feature of a similar size in KSQL ;) I've worked with many different servers over the years, many (most) of which had custom clients, and some had several. I didn't see it as any particular burden. |
|
||
### The client | ||
|
||
* The Java client will provide an API based on JDK 9 flow / reactive streams and completable future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't we going to have conflicts with other JDK versions used by CP? I think JDK 9 is not an officially supported version yet for CP, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is a problem. I really hope we can upgrade from Java 8 (Java 8 is EOL already!), but if not we can implement our own copy of the Java 9 Flow API, and then, when we do eventually upgrade to Java >= 9, we can implement the correct interfaces and deprecate the old ones.
We can also consider shipping a simple wrapper that wraps the client with actual Java 9 flow interfaces and is only distributed as part of the OSS project (so no problems with CP compatibility).
But I think a better solution would be to get CP moving on from Java 8, it can't stay there for ever!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we just backport JDK 9 flow :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we've already backported the flow API: https://github.com/confluentinc/ksql/blob/master/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/Flow.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool! I did see that, but I think we would need to copy it to the client libs - we don't want to the user apps pulling in all the server side dependencies because our flow API lives in a server module.
Hey, thanks for putting this together Tim! I have a couple of higher-level thoughts that are irrespective of the implementation of the server and client.
|
Thanks for answering all of my questions @purplefox - I think your perspective makes sense. My questioning was trying to reason about what we might need going forward but I might be just paranoid. I'm also summarizing here a conversation I had with @mhowlett (correct me if I'm misrepresenting anything you said) trying to understand the kafka client and problems they faced (to see if we would hit similar problems):
I think I'm pretty convinced now that your idea of implementing our own custom, simple and lightweight protocol might be the best way forward. I know this was your intuition initially @purplefox, but I wanted to explore that more deeply before we dove into reinventing the wheel. FYI @rodesai since I think you were interested in this as well. |
Agreed on all those points! |
Yes, at least initially, dumb failover is fine.
We will proxy everything through the KSQL server. Other than a few ms of extra latency there's no particular disadvantage of hiding the complexity on the server, and very few users of ksqDB will be super latency sensitive, and there are lots of advantages to keeping the client simple.
Agreed, no need to do this. Again, keeping the client simple. I think some of the concerns related to maintaining a client stem from people's experiences with the Kafka client. But this is quite a different beast (as an exercise for the reader please compare the amount of code in the Kafka client with the client in the prototype!). I'm not an expert on the Kafka protocol but I am have looked into it as part of this exercise. I guess, for historical reasons that I don't understand, Kafka uses a request/response (RPC) interaction style. RPC is a strange choice for a messaging protocol and it seems to me quite a lot of the complexity in the client comes from that choice. I can see that the protocol has introduced the notion of batching which appears as an optimisation to workaround the inefficiencies of using RPC to implement streaming. But all of that would be unnecessary if streaming was designed into the protocol from the start. I think a second part of the complexity of the Kafka client comes from maintaining multiple connections, handling the partitioning etc, which again, is not something we want to do. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've almost reached alignment on this KLIP. I'm convinced on most points, but there are two things outstanding:
- Do we need to couple a server rewrite with the new clients? (See inline comment)
- It took me a long time to conclude that the ksqlDB client is actually a dramatically different use case than consuming from a topic (c.f. Nick's comment on the google thread). It would help to have a somewhat prescriptive comparison that would explain when you would want to use a Kafka client and materialize a topic vs when you would want to use the new ksqlDB client and just stream the messages transitively. We should make it clear we're not just creating a "baby Kafka client"
### The server | ||
|
||
* The toolkit used on the server side will be Vert.x | ||
* The current Jetty / JAX-RS usage will be retired. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite being genuinely excited to learn Vert.x and rewrite our bulky server, I'm wondering if we can de-risk this KLIP by splitting out the server rewrite from the client/protocol work. Is there any way our existing server implementation can be used for phase 1? That way we can move forward with the wire protocol and the client implementation without working on a server 2.0.
The reason is that we need to make sure that all of the existing functionality that depends on the confluent's rest-utils (some port of dropwizard) and the jakarta/hk2 injection mechanisms still work as these are the standard method for confluent components (and there are plugins that already integrate with these injection mechanisms). Perhaps it is possible with Vert.x (and it's not unlikely that I'm totally missing the
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Despite being genuinely excited to learn Vert.x and rewrite our bulky server, I'm wondering if we can de-risk this KLIP by splitting out the server rewrite from the client/protocol work. Is there any way our existing server implementation can be used for phase 1? That way we can move forward with the wire protocol and the client implementation without working on a server 2.0.
Yes, absolutely - I think the work on hosting the new protocol, and porting the existing API to Vert.x can be done in different phases. It just means we would have both Vert.x and Jetty servers running in parallel until the old API is ported.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason is that we need to make sure that all of the existing functionality that depends on the confluent's rest-utils (some port of dropwizard) and the jakarta/hk2 injection mechanisms still work as these are the standard method for confluent components (and there are plugins that already integrate with these injection mechanisms). Perhaps it is possible with Vert.x (and it's not unlikely that I'm totally missing the
I chatted with @spena a couple of days about this and Sergio kindly showed me the code for the current Jetty plugins. It seemed to me they could be fairly easily ported to Vert.x, although there may be some angles I haven't completely understood.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree about decoupling the API design versus the threading model from this KLIP.
Here's a java client lib that I wrote against the current API : https://github.com/daniellavoie/ksqldb-java-client
It's fully reactive (RxJava like) with Project Reactor. It supports Observable through Mono and Flux. I currently support all the push and pull queries in a observable way. The same library could be easily written for Node.js server but also frontend Javascript and Typescript.
From a developer perspective, having that client API is a good start. How the server behaves from performance perspective is not my problem. Reactive or blocking, I shouldn't be aware of that. I'm all for reactive. I would recommend Webflux from Spring over VertX but that is a different discussion. Simple HTTP 1.1 with REST and SSE events (that are super well supported on the browser) are good enough from a client perspective. gRPC, WebSocket and anything else is just friction and more complication for the developer. If the backend server is reactive, SSE will not exhaust the thread pool from the server.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @daniellavoie
I think it's important that our client API isn't tightly coupled to Spring, which is why I use vanilla jdk 9 flow interfaces (ok, they are backported for now!). (Not everyone wants to use Spring).
For those that want a very Spring Reactor style API, it would be a simple matter to provide a shim over our client to expose Mono/Flux etc :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@purplefox Reactor has no dependencies on Spring. It's a foundation for Spring. So clients pulls no dependencies from it. But I agree with you that offering a shim would be better approach. I'm concerned about the JDK 9 flow interface because it would enforce clients to use JDK11+. A lot of enterprise will be running Java 8 for a long time. Reactive Streams would probably the best interface for maximum compatibility (Flow is based on it actually).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies - I use "Spring" in a loose sense - meaning the Spring ecosystem, and reactor is a part of that. I should have said "I think it's important that our client API isn't tightly coupled to Project Reactor". Flow is in JDK 9 (not 11), but I agree, better standardise on Reactive Streams and optionally provide a JDK 9 flow shim for those who are using Java 9+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentionned JDK 11 because 9 and 10 are not LTS. Only 11.
There is a lot of work to do here and it should be split up into separate work packages rather than trying to complete it as a single | ||
monolithic piece of work. Some obvious partitions of the work would be: | ||
|
||
* Protocol implementation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to have a detailed section in this KLIP for the protocol (can be very lightweight) mostly for educational and documentation purposes, I'd love to learn about how you designed https://github.com/purplefox/ksql/blob/api_client_proto/ksql-api/src/main/java/io/confluent/ksql/api/protocol/WireProtocol.java - alternatively super detailed javadocs in the implementation could suffice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. I will document this in the javadoc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking more about this... In general I am not a fan of doing detailed up front design. Invariably, during the development process you will find that design needs to be adapted and what you end up implementing can diverge a lot from the original design. In other words the design and development are tightly coupled and it's an iterative/evolutionary process.
I see the purpose of a KLIP (and please feel free correct me if this isn't the originally intended purpose) as more of an overview and outline of the design, rather than trying to prescribe a very specific low level approach. I think the KLIP already provides an overview of the approach for the protocol, but we should maintain a different document during development to document the actual protocol (or just have good JavaDoc as you suggest :) )
|
||
### The client | ||
|
||
* The Java client will provide an API based on JDK 9 flow / reactive streams and completable future |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we've already backported the flow API: https://github.com/confluentinc/ksql/blob/master/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/streaming/Flow.java
Hey, this is a really good point @agavra. Here's a shot at it, I'm sure I'm missing some things: The ksqlDB client should basically give you a subset of functionality that the Kafka client has. If you need something more powerful or more flexible than what we offer, dropping down to the low-level Kafka consumer is a completely fine solution. Use the ksqlDB client when:
Use the Kafka consumer when:
|
I think, perhaps, a simpler way of thinking about it is the ksqlDB client is for accessing stuff at the abstraction level of ksqlDB - i.e. rows, streams, tables and queries (these things don't exist in raw Kafka). The Kafka client is more low level and for accessing things at the level of Kafka - messages and topics. I think it's perfectly fine (and expected) for the client of any particular server to expose the features of that server in this way. In fact, it would be kind of weird if it didn't! It would be like saying "We've accepted the need for a product (ksqlDB) which creates a new abstraction layer on top of Kafka based on the concepts of rows, queries, streams and tables and this is strategic for us, but, we're not quite comfortable with you using those concepts in your apps yet" ;) |
Has anybody looked at http://rsocket.io/ for prior art/standards in this area? It seems to support the interactions we're looking to do (push, pull) in addition to supporting things like back pressure, multiple transport protocols (tcp, udp, websocket), and multiple libraries/runtimes. |
Hey Sarwar, I'm familiar with rsocket, and looked into it quite closely as part of this work.
|
I've also added a comment to the KLIP about GraphQL - Another advantage of using Vert.x is Vert.x has support for GraphQL so using Vert.x would make it easier if we wanted to implement GraphQL in ksqlDB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the write up @purplefox. This is really quite exciting! KSQL basically hasn't had a client before. It's had a CLI, but that is definitely not the same thing. As you found, using KSQL from an application without a client is currently... painful. Mainly because in the initial POC the server API was purely for use by the CLI.
It's high time we improved our API and user experience: so this is a very welcome KLIP and considering your background you're somewhat perfectly placed to work on this.
I have a few high level questions and concerns.
Main concerns:
- One of the selling points of KSQL was that its not Java centric like Kafka streams: you can use it if you're not a Java house. At the moment, any language that supports a HTTP/WS client, (i.e. most), can interact with KSQL, all bit it using a nasty API. I'm assuming that we'll lose this the new API? If this is the case we should clearly call this out in the KLIP. Are there alternatives or variations that would allow us to maintain some basic level of functionality via standard HTTP/WS clients? e.g. you can issue DDL and queries, but you won't get the multiplexing & flow control?.
- Given point 1 above, have we considered the cost and practicalities of writing and maintaining clients for multiple languages? (This is more a question to the large team @apurvam @derekjn @MichaelDrogalis). What is the set of languages we intend/wish to support? Java, JS, Python? C#? C++? Go? etc. Who will write each of these clients? Who will maintain and enhance them?
Requests/Questions:
- I think the KLIP would benefit from some more details about the proposed client API. I know you link to a prototype, but this KLIP will become the record of what was agreed, so it's good practice to include the API / Protocol
- Have you talked with the people doing the REST Proxy redesign? There seems to be potential for overlap here and it might be good to have a chat to see if things can be aligned and duplicate / divergent work avoided.
- The auth model for OSS KSQL is basic. Does Vert.x help provide richer functionality here?
- The KLIP talks about flow control / back pressure: do you know how this will work with the KS topology? I might be out of date in my understanding of KS, (cc @mjsax), but I believe the KS topology is still using Consumer Groups, which will eject consumers that haven't pinged the broker in a while. Hence, there is a risk that back pressure can cause consumers to be ejected. The current impl gets around this by sticking a unbounded queue in between the two, (which obviously has issues). How do you see the same being done with the new API or something else?
Scalability concerns around encouraging / exposing transient queries:
One final thing to keep in mind: I think in your POC your app is kicking off transient push queries: is that right? If so, it's worth keeping in mind this is probably a bad pattern to be encouraging (allowing?) through a client at the moment. Each query creates a new KS topology, with potentially internal topics. This simply does not scale: image a web farm starting up and all instances doing this!
I see a clear opportunity for people to miss use the client to create many transient queries, have a bad experience and then either feed this back or just disappear. Though maybe this is little different to the learning curve people go through on how to scale any new technology.
Long term this may be solved server side, but I think we need to be realistic on the timeframes of solving such a problem and design the client with current server design + short term improvements in mind.
So the question is: should we support transient push queries through the client or not? I'd be interested to hear people's thoughts.
Once again, thanks Tim!
Let me know once you've updates the KLIP with all the answers and clarifications you've given so I can review again.
Hi @big-andy-coates thanks for your comments. Responses inline.
In this proposal I suggest we retain the current HTTP API as-is with the exception of the current websocket and HTTP query endpoints which I think we should remove. I think we should definitely lose the chunked response endpoint but we could consider retaining a non multiplexed websocket query endpoint similar to the current one. This would have to be one query per connection because of lack of flow control - I'm not madly keen on the idea but it wouldn't be a whole lot more work to do this.
I covered this in earlier responses but. IMO, we should start with Java and JS. The clients should be relatively lightweight (at least, compared to the Kafka client) so I think it's easy to overestimate the effort required to write and maintain them. But yes, of course there is some overhead, but I don't see the overhead as being any different to the overhead of maintaining any other piece of code in KSQL of similar size. I think we should maintain them. Clients are an intrinsic part of the project.
Ack
We have chatted and I plan to chat in more depth after the holidays. But, summary so far... we're not writing a new REST API so I don't think there is much overlap here.
Vert.x has a pluggable auth architecture and contains pre-made implementations for jwt, oauth etc, and you can write your own, so hopefully that will help (?) https://vertx.io/docs/#authentication_and_authorisation
This is an interesting question. The last time I looked I believe we are currently using a LinkedBlockingQueue to buffer the messages from KS before they are polled to send to the transient query consumer. AIUI, this means that if the client is slow the queue will become full (default size is 100) and any further attempt for KS to deposit a new message on the queue will cause the KS thread to block until the client polls some from the queue. Presumably this is the same thread that has consumed from the Kafka consumer so it could result in the consumer being ejected. The new implementation is likely to suffer from the same issue if we continue to use a blocking queue. In an ideal world Kafka Streams would support back pressure (ideally using reactive streams Publishers and Subscribers) so we can push all the way back to the Kafka consumer and pause consumption of messages if the client is slow. But I think this will require major changes in KS. I think having KS supporting back pressure and a reactive model is going to be more and more important for us going ahead. I think there's scope to start a new discussion with the greater streams team about how we could move towards that goal. To mitigate this, other strategies we can employ to prevent blocking of the KS thread could be:
The API for pull or push queries and most of the plumbing is very similar so I'd suggest we allow the client to execute push queries but we can mitigate this by:
But I think the best solution here would be to fix our current implementation which creates a new topology each time. I think we probably agree on this but perhaps we should raise its priority? |
I don't think what would be ideal (and yes, it would be a major change in the runtime...). Kafka's strength is about avoiding back pressure by decoupling. Hence, to me, the ideal solution for KSQL would be to write the result of a query into a topic (instead of using an in-memory queue) and read it back to serve it to a client. This decouples query execution from result serving. Just my 2ct. |
Hi @mjsax I am curious why you would want to avoid back pressure? If KS pushes back to the Kafka consumer than the consumer can be paused and threads can do other stuff. This avoids threads blocking and a much more efficient use of resources in the system. |
In the case of a slow query consumer this could result in a lot of unnecessary work done in producing messages to a topic that the slow client never consumes. Much better to match the rate of processing of messages to the rate the client consumes them. Again this provides a much better use of server resources (threads, RAM and CPU) as you're only doing the work you need to :) |
Well, I don't think that back pressure really solve the issue. If you have back pressure and an upstream application keeps writing more data into the input topic than your downstream app can consume, the query would lag more and more and eventually hit the issue that input topic segments are deleted because of topic retention. To avoid this, the only way it to scale out. Hence, the downstream app would need to scale out first and allow to open multiple connections (to potentially different ksqlDB servers) to receive the result in parallel. If you don't apply back pressure, you might have the same issue in the query serving layer that reads from the result topic obviously, however, it decouples the query and allows for an effective and robust query execution. I see your point about "this is wasteful", but this argument only holds from a ksqlDB perspective, not from a KS perspective. The kind of query result serving ksqlDB wants to do, is a side effect from a KS perspective and there is a conceptual gap between both. Also, even if we would apply back-pressure, this would only apply to the last sub-topology, but not apply (at least not easily) to upstream sub-topologies that write data into repartitions topics. Hence, you would only gain that the last sub-topology is throttled by backpressure. Last but not least, beside the question if back pressure does make sense of not, it would be a major re-architecture of KS to add it and it's not gonna happen quickly (if at all, as for KS applying back pressure does not seem to be necessary). Hence, given that there is no back pressure, I would advise to build a solution within ksqlDB that handles missing back pressure. Maybe, if you want to go fancy, ksqlDB could ingest a "throttle operator" into the query (directly after the very upstream source operators) that would do "sleeps" to "slow down" query processing if you really want to avoid producing a result output data rate the client can't consume (but as mentioned above, IMHO, back pressure just shifts the problem somewhere else, it does not solve it...). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if we can leverage http/2 and other frameworks like gRPC more.
### The protocol | ||
|
||
We will create a new super simple streaming protocol designed to efficiently stream data from server to client or client to server. | ||
HTTP/REST is a poor fit for streaming data as it is designed for a request/response interaction style. Attempting to implement "streaming" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about http/2? For e.g gRPC bi directional streaming is all on top of http/2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To add more, we can also look into gRPC as a substrate for RPC as well? It has language bindings in so many languages and we should be able to wrap gRPC and get our clients done in many languages quickly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @vinothchandar
To add more, we can also look into gRPC as a substrate for RPC as well? It has language bindings in so many languages and we should be able to wrap gRPC and get our clients done in many languages quickly?
I looked into gRPC (and Thrift), as similar reasoning applies as to why I don't think they would be a good solution.
- The generated clients have pretty ugly APIs and we'd need to wrap them in custom code to give them a nice simple idiomatic reactive streams interface. This negates much of the advantage of generation in the first place.
- We'd also have to adapt the flow control behaviour to something reactive streams friendly - again, fiddly work that we'd have to do on both client and server side.
- We'd have no control of the threading model (esp. on the server side) - this means it couldn't use Vert.x and integrate nicely with the rest of the API using Vert.x
- Sadly HTTP2 streaming wasn't designed to present a streaming API to the client. The streaming is really as a back channel for keeping HTTP caches in sync. I.e. HTTP2 can pre-emptively stream resources from a server to a browser cache so that when the browser requests a resource it's already cached locally. But that's an internal implementation detail of the browser and there's no API exposed to user JavaScript code to actually get streaming results - from the user's point of view it's just the same old request/response HTTP. This means you can't use HTTP2 as a good means of streaming to JavaScript browser clients, and this is the reason WebSockets haven't been deprecated :)
So to summarise: If we did use gRPC generation there'd be a lot of work adapting the generated clients to a useful form, plus we'd lose control of the threading model. And it wouldn't work in browser.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated clients have pretty ugly APIs and we'd need to wrap them in custom code to give them a nice simple idiomatic reactive streams interface.
https://grpc.io/docs/tutorials/basic/java/ I have actually seen many companies move to gRPC for bi-directional streaming recently and IMO the APIs are not that ugly :) . I realize this is a subjective matter.
We'd also have to adapt the flow control behaviour to something reactive streams friendly - again, fiddly work that we'd have to do on both client and server side.
Using http/2 we would get some flow control, prioritization for free. Atleast, gRPC's underlying chromium stack is at the cutting edge of this. In my previous life, we have used it at massive scale to stream data back and forth between data centers and mobile phones..
We'd have no control of the threading model (esp. on the server side) - this means it couldn't use Vert.x and integrate nicely with the rest of the API using Vert.x
You may be right here.. I don't have great insights into gRPC server side threading model.
Sadly HTTP2 streaming wasn't designed to present a streaming API to the client. The streaming is really as a back channel for keeping HTTP caches in sync.
I respectfully disagree here.. I think you may be talking about HTTP 1.1 SSEs, which is limiting in nature. HTTP/2 supports bi directional streams and solves many of the issues you mentioned already - multiplexing, flow control, prioritization.. Just to show how simialr they are, here is a proposal underway from Mozilla https://tools.ietf.org/html/rfc8441, that sends websocket traffic over the same http/2 connection to use the tcp connection underneath
This means you can't use HTTP2 as a good means of streaming to JavaScript browser clients, and this is the reason WebSockets haven't been deprecated
Could be true. But is streaming results directly to a web browser the core use case for push queries? I can't imagine why that would happen without a middle layer in between ksqlDB and browser.. I mean, we atleast need a load balancing or broker layer to terminate connections right.. IMO, we should not be swayed too much by the serving to browser directly use-case.. the middle layer <=> browser, can very well be using websockets.
In summary, yes, we need a wrapper layer on top of gRPC to frame our own messages.. but there is so much already built it and a nice (sure, subjective) bi-directional API is handed back to us for building this wrapper.. And there are so many languages supported already (go, java, python, c++ all have the bidi api)
Nonetheless, can you clarify if we will be using thrift/protobuf, even if we build our own binary protocol? (may be I missed that detail when reading)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generated clients have pretty ugly APIs and we'd need to wrap them in custom code to give them a nice simple idiomatic reactive streams interface.
https://grpc.io/docs/tutorials/basic/java/ I have actually seen many companies move to gRPC for bi-directional streaming recently and IMO the APIs are not that ugly :) . I realize this is a subjective matter.
We need our client API to expose KSQL abstractions - the generated gRPC ones are not going to do that. Moreover we need reactive streams abstractions too, again, gRPC won't do that either, so we'd need to wrap and that is significant work.
We'd also have to adapt the flow control behaviour to something reactive streams friendly - again, fiddly work that we'd have to do on both client and server side.
Using http/2 we would get some flow control, prioritization for free. Atleast, gRPC's underlying chromium stack is at the cutting edge of this. In my previous life, we have used it at massive scale to stream data back and forth between data centers and mobile phones..
I'm not saying that gRPC doesn't have flow control - it does (well it's just HTTP2 flow control), what I'm saying is we'd have to adapt the flow control to the way reactive streams does flow control on both the client and server. This would be a significant amount of extra work to do on both client and server side.
We'd have no control of the threading model (esp. on the server side) - this means it couldn't use Vert.x and integrate nicely with the rest of the API using Vert.x
You may be right here.. I don't have great insights into gRPC server side threading model.
Sadly HTTP2 streaming wasn't designed to present a streaming API to the client. The streaming is really as a back channel for keeping HTTP caches in sync.
I respectfully disagree here.. I think you may be talking about HTTP 1.1 SSEs,
No, this is HTTP2. Afaik, there is no new HTTP2 JavaScript API that can be used in browser. The streaming in HTTP2 was never intended as something that would be exposed to the JavaScript developer in the browser. Server push was designed as a mechanism to pre-emptively load resources in to the browsers local cache before they were requested, but from the JavaScript devs point of view nothing changes - it's an optimisation.
Now of course you can write your own client that uses HTTP2 and does expose the streaming to the end user, and that's exactly what gRPC does, but you can't use that from a browser.
This is by design, aiui, the official streaming mechanism currently is websockets and there's no plan to deprecate them and expose HTTP2 streaming in browser instead.
which is limiting in nature. HTTP/2 supports bi directional streams and solves many of the issues you mentioned already - multiplexing, flow control, prioritization.. Just to show how simialr they are, here is a proposal underway from Mozilla https://tools.ietf.org/html/rfc8441, that sends websocket traffic over the same http/2 connection to use the tcp connection underneath
Well sure, if you write the browser you can do stuff like this, but we're talking about what APIs are available to mortal standard JavaScript devs :)
This means you can't use HTTP2 as a good means of streaming to JavaScript browser clients, and this is the reason WebSockets haven't been deprecated
Could be true. But is streaming results directly to a web browser the core use case for push queries?
Probably not the most important case but certainly a useful case.
I can't imagine why that would happen without a middle layer in between ksqlDB and browser.. I mean, we atleast need a load balancing or broker layer to terminate connections right..
Most loadbalancers these days are websocket friendly. And proxying a websocket through a front end wouldn't be too much work.
We did a very similar thing with Vert.x. Vert.x has a concept of an event bus - a simple messaging system that allows servers to communicate. We created something called the event bus bridge which basically allows browsers to communicate on the same event bus by proxying traffic through a websocket from browser to the server where it was bridged to the server side event bus. It proved to be very popular. https://vertx.io/docs/vertx-web/java/#_sockjs_event_bus_bridge
IMO, we should not be swayed too much by the serving to browser directly use-case.. the middle layer <=> browser, can very well be using websockets.
Yes, but don't forget that the ksqlDB connection is just a websocket too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, we should not be swayed too much by the serving to browser directly use-case.. the middle layer <=> browser, can very well be using websockets.
@vinothchandar Yeah, I agree. This is something that I haven't been explicit about when I advocated for front-end apps. Similar to how you wouldn't want a browser talking right to Postgres, you probably wouldn't want one talking straight to ksqlDB. I'm not sure how much that changes in proposed implementation, but I do think in practice there ought to be some middle tier, whether that's node.js, a web server, graphQL, or whatever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In most cases I think most apps will indeed have a front end and not expose ksqldb directly, however I don't think we should mandate that and leave that as a choice for the app developer. The ksqlDB client is completely agnostic to that and will work just as well in either situation. Arguably it's pretty cool to talk straight from browser to a front end of ksqldb nodes, but YMMV
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more thoughts on gRPC:
I see the main value of tools like gRPC (and to a lesser extent Thrift) as great tools for quickly creating programmatic interoperability for internal service architectures. E.g, you're a big enterprise and you have lots of internal [micro]services and you want a way for your clients (internal apps and other services) to quickly use those services.
But I don't see those tools as a great fit for creating clients for public products or open source projects that we create. Why? Our client is going to be the primary way that people interact with the product - it's like the facade of your house. The form of it is going to be of fundamental importance to the user experience, therefore it's important that we control it, not someone else. If we use a tool like gRPC them we're effectively outsourcing the primary interaction point of our product to someone else. I don't think that's where we want to be.
over HTTP/REST usually results in a polling approach which is awkward and inefficient compared to a protocol which is designed with streaming in mind from | ||
the start. | ||
|
||
* Binary multiplexed protocol. The protocol will support multiple independent channels over a single connection. We want to discourage clients from opening new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again this feels like re-inventing something thats in http/2 already? Could we reuse that ? It may be much easier for others in community to write new language clients that way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see previous comment about HTTP 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again this feels like re-inventing something thats in http/2 already?
Binary multiplexed streaming protocols existed long before HTTP 2, so if anything it's HTTP 2 doing the "re-inventing" ;) Not that that is a bad thing, it's the best way to design a streaming protocol and that's why just about every messaging system which dominated the market before Kafka came along does something similar. (Kafka itself is notably the odd one out in not using a streaming protocol but taking a batching and RPC approach).
|
||
## Performance Implications | ||
|
||
* Streaming query results with the new client and protocol should provide significantly better performance than |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in terms of end-end latency of delivery? can you please clarify what metric might get better with this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are a few reasons why we could expect performance to improve:
- The current PollingSubscription is inefficient as it polls the internal blocking queue and when drained schedules a timer to try again in 100ms. This means throughput is effectively capped at 100 msgs every 100ms, in other words 1000 msgs per sec (not very fast!). With a non polling approach we should have no such limit and I know from experience with similar systems that we should be able push millions of small messages per second over the wire with a simple protocol. Now clearly we are also limited here by how fast KS can deliver messages to us, and that's unlikely to be millions of messages second. But if it can deliver more than a 1000 per second we should expect an increase in throughput.
- Latency should also improve as we won't have the 100ms timer.
- Because we won't be blocking threads to execute transient queries, we should have a much better resource usage on the server allowing us to scale to larger number of queries with lower RAM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also... encoding overhead of the protocol is pretty low as its so simple. It should be faster to parse than HTTP. But again we may not notice the difference unless we push a lot of traffic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because we won't be blocking threads to execute transient queries, we should have a much better resource usage on the server allowing us to scale to larger number of queries with lower RAM.
I would note that transient queries today deploy their own streams topology, with 4 streams threads, 4 consumer heartbeat threads, 4 background sender threads on the producer, etc. I think optimizing / reusing threads in these topologies is a much larger task and arguably will have a higher ROI when it comes to scaling up the number of transient queries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for clarifying.. Seems like the throughput would improve.. If you can distill some of these into the KLIP, that would be great.
+1 on @apurvam 's comment.. Bottleneck could be somewhere else as well.. (we were surprised for e.g when pull queries bottlenecked on ksql code and not streams) and it may take other projects to actually realize this benefit..
I'm not suggesting that back pressure in KS should prevent messages from building up in Kafka - clearly it won't. For that you'd Kafka itself would need to support back pressure from consumers to producers and I don't know if that's desirable and even if it was it's unlikely to happen. (It should be noted though this is a fairly common feature in other messaging systems - yes I know that Kafka is not a messaging system! ;)) Messages are going to build up in Kafka if you have slower consumers than producers whether or not KS supports back-pressure. Those things are orthogonal. The advantages of back pressure are about an optimal use of resources - threads don't need to block waiting for consumers, which mean they can do other things. Which means fewer threads, better resource usage. These are all really quite general observations on the advantages of non blocking / reactive systems over ones using a blocking model (The same reasoning can be applied to pretty much any kind of server).
I think it would apply to any sink which does some work that might involve blocking - this could involve blocking on adding to an internal memory as ksqlDB does, but it might also involve writing back to a topic via a blocking API or doing some custom user logic - writing to a file etc. Anything IO bound is especially pernicious. The threads that are blocked can't do any work during that time, thus starving the system which has free CPU from doing any work. The traditional solution here is to increase thread pool sizes. But large number of threads mean large amounts of memory and increased context switching overhead which eats into CPU usage and it swiftly hits a law of diminishing returns when adding new threads. As mentioned these are all general reasons why blocking threading models are generally a bad idea when you have IO to do.
We're not trying to push back across Kafka topics as mentioned previously.
+1 Absolutely, I'm not expecting this is something that is going to happen any time soon. I fully appreciate it would be a lot of work :)
I suggested some mitigations in previous replies - e.g. closing the query or dropping messages.
I'm not sure I understand this suggestion - how would this operator work? |
Well, that is the point. KS is designed to write back to Kafka topics. Everything else is an "anti pattern" (given the current design). There is no concept of "other sinks" in Kafka Streams.
If I would be a user, I would not like those solutions.
The idea is, that you put "sleeps" into the pipeline to throttle the forwarding, ie, the operator just dose "sleep and forward". The "sleep time" would be adjusted dynamically, eg, if the lag grows (ie, the client lag on the output topic that is used for query serving), the sleep time is increased and vice versa. (It's just a high level idea---not sure if it's a good one.) This does not free up the thread itself, but at least allows the CPU to schedule other threads, and also does not full up the output topic too much and hence avoid some "wasteful work". |
* We will write up a document on the new wire protocol to encourage 3rd parties to implement new clients. | ||
* There may be some server side configuration changes due to the new server implementation. | ||
|
||
# Compatibility Implications |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if the security handlers used by rest-utils
will be compatible (see the note in Security Implications). If it is not compatible, should that be noted here?
in the same codebase during development, and only after switchover will the old implementation be removed. | ||
That way we can ensure a seamless switchover as there is a working API at all times. | ||
|
||
## Test plan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have stress or performance test plan too so we can catch bugs when running several clients. Sometimes severs can hang due to the number of available threads it can handle.
Folks, I have updated the KLIP to reflect the recent discussions in our meetings and on slack with respect to the protocol. I have removed the plan to create a custom protocol and updated the doc to include a new HTTP2 based streaming API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @purplefox for updating the KLIP! Few comments on the encoding itself.. Looped in vicky/alan, who may also want to chime in. Should be smaller detail, that we can work through and then I am good to go.
* The client will initially support execution and streaming of queries (both push and pull), inserting of rows into streams, DDL operations and admin operations | ||
such as list and describe. | ||
* The client will support a reactive / async interaction style using Java CompletableFuture and Reactive Streams / JDK9 Flow API (or a copy thereof if we cannot upgrade from Java 8) | ||
* The client will also support a direct / synchronous interaction style. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on json being more friendly for development.. But should we additionally consider a binary encoding format like protobuf as the payload over http/2, for better performance? If you think, this can be sequenced in a later phase. might be good to call that out here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the performance is inadequate we can consider this, but I suspect it will be sufficient for our target use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From personal experience, I'd avoid PB for a public API. PB can work well as a serialization format within an organisation. Though I've still seen large orgs get stuck on one specific PB version.
|
||
The request method will be a POST. | ||
|
||
Requests will be made to a specific URL, e.g. "/query-stream" (this can be configurable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by "configurable", do you mean we can decide on specific name later? or actually configurable per server?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Either way, I don't have a strong opinion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a better default name? Why not just query
?
{ | ||
"query": "select * from foo", <----- the SQL of the query to execute | ||
"push": true, <----- if true then push query, else pull query | ||
"limit": 10 <---- If specified return at most this many rows, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious: would nt this be in the sql itself as a LIMIT
clause?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was meant as an illustration, not an exact definition of the arguments to the real endpoint. (I think there is a comment in there already to that effect ;) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense @purplefox. Otherwise I would have asked why there needs to be a "push"
field, as the behavior of push vs. pull should be determined by the query itself (cf. "query
"), not by an out-of-band parameter. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was meant as an illustration, not an exact definition of the arguments to the real endpoint.
Should the KLIP not reflect the exact definition? IMHO, the KLIP should allow anybody to write a ksqlDB client in their language of choice.
I think there is a comment in there already to that effect
This sentence should go before the "illustrative example" to set expectations right upfront.
Followed by zero or more JSON arrays: | ||
|
||
```` | ||
[123, "blah", true] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that we write the arrays outside of the json (IIUC) above, because when we stream results back, we don't know how many rows there are to return..
But in cases we do, e.g pull queries, could we support an encoding where this array is within the json above and the overall response is a valid json (as it is today)? The advantage of doing that would be
- Opens up integration with lots of systems/tools that can do something useful by calling an endpoint and getting a json. cc @AlanConfluent @vpapavas who have built a ksqlDB/Grafana integration, that uses pull queries results returns as json.. I think the new encoding would break such an use-case?
- Even for push queries, IMO returning a full json is great because, then a client can fully consume a small table with a
LIMIT
clause added.. Again we had a real use-case for this in the grafana integration, where we wanted to write a push query to consume all the new metric names (not actual metric events) ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n), which isn't preferable. Also, some frameworks, including grafana make assumptions about a single parsable json response. There might be workarounds, but this would probably go against a lot of clients out there. Best to avoid this.
Also, I think there's certainly something nice about getting just one format that has all of the column information and results in a single response. Consistency between pull and push would also be nice, so that handling logic could be shared, even if it means that results are a bit more verbose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that we write the arrays outside of the json (IIUC) above, because when we stream results back, we don't know how many rows there are to return..
This is more about we don't want to force users to have to parse huge JSON objects into memory in order to do something useful with them. Yes, streaming JSON parsers do exist but we shouldn't assume they do on all target client languages or mandate users to use them. Async (non blocking) streaming JSON parsers are even more rare.
But in cases we do, e.g pull queries, could we support an encoding where this array is within the json above and the overall response is a valid json (as it is today)? The advantage of doing that would be
We shouldn't assume that pull query results will be small, they could be huge. However, if you can provide a limit clause and thus provide a hard limit on the results, then returning the results as a single JSON object can make sense.
We can easily enable this by providing the content-type in the accepts header of the request. I.e. if you want the results a single JSON object provide content-type="text/json", we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default). The server can then fail requests with content-type="text/json" if a limit clause hasn't been provided.
- Opens up integration with lots of systems/tools that can do something useful by calling an endpoint and getting a json. cc @AlanConfluent @vpapavas who have built a ksqlDB/Grafana integration, that uses pull queries results returns as json.. I think the new encoding would break such an use-case?
All of the proposals for the new API (custom protocol, gRPC) would break this. Custom protocol + gRPC are definitely not JSON ;)
So I'm working on the assumption that you would have to fix Grafana anyway. Presumably the tool allows you to provide some kind of plugin that you can code to work with some kind of custom API? If not, it seems rather limited.
I don't think it's generally a good approach to constrain our API based on the requirements of one particular tool which we happen to use internally.
Having said that, if Grafana does require a single JSON, then the new API can support this using an accepts header as mentioned above :)
- Even for push queries, IMO returning a full json is great because, then a client can fully consume a small table with a
LIMIT
clause added.. Again we had a real use-case for this in the grafana integration, where we wanted to write a push query to consume all the new metric names (not actual metric events) ...
As long as limit is specified I see no issue with that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n), which isn't preferable. Also, some frameworks, including grafana make assumptions about a single parsable json response. There might be workarounds, but this would probably go against a lot of clients out there. Best to avoid this.
Please see reply to Vinoth who made a similar observation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another advantage of not providing results as a single json element. Quite often in the world of streaming you're just piping streams from one place to another. A concrete use case for this is piping the output of a query to a websocket or SSE so it can be sent to a browser (this will probably be a common use case for ksql based apps).
Ih this case you don't need to look at the individual JSON results you just want to pick up each individual row as bytes and write them as bytes. Using a streaming JSON parser you'd end up deserialising the JSON from bytes only to deserialise it again back to bytes before writing it to the websocket. This is very inefficient. Using some kind of framing outside the JSON (in this case delimited, but it also could be length prefixed) allows you to pick out the elements much more easily and efficiently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it's probably preferable if we have a single response that's parsable as valid json. Otherwise, you have to do some parsing yourself (even if separated by just \n),
It's generally going to be a lot simpler to parse on newlines in then to assume the presence of a streaming JSON parser in and figure out how to use it to extract the individual elements.
I do accept there are some use cases where you know the results are not going to be large (and you use limit) and you want to parse them all as a single JSON object, but this is a streaming API so we can't make that assumption in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you want the results a single JSON object provide content-type="text/json", we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default)
nit: the single-object content type should be application/json
to match RFC 4627.
we can define a new content-type for our newline delimited sequence of JSON elements (which should be the default)
For reference, the custom content type supported by the existing REST API is application/vnd.ksql.v1+json
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A concrete use case for this is piping the output of a query to a websocket or SSE
Speaking of this, what are the downsides of using SSE as the transport directly (over http/2), for streaming query responses? A response of Content-Type text/event-stream
corresponding to the example under the KLIP's Query streaming
heading could look like:
event: query-success
data: { "query-id", "xyz123", "columns":["col", "col2", "col3"], "column_types":["BIGINT", "STRING", "BOOLEAN"], "row_count": 101 }
event: query-data
data: [123, "blah", true]
data: [432, "foo", true]
data: [765, "whatever", false]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SSE is a valid alternate API we can provide that some users might prefer. One thing about the new API implementation is the large majority of the work is in setting up the infrastructure, getting the new Vert.x based server plugged-in and integrating it with the current backend. Once that is done exposing the query streams in different formats is relatively straightforward - it's pretty much just writing the output to the response in a different way. I'd recommend we initially ship with the formats as described in the KLIP and we can consider adding SSE support at a later date (it wouldn't be hard).
|
||
* We are not creating a new RESTful API. | ||
* We are not currently looking to write clients for languages other than Java and JavaScript (although those may follow later) | ||
* We will not initially allow for consuming directly from Kafka topics using the client (although tha may follow later) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just want to clarify that, if we choose that route, the encoding would still remain the same? i.e Kafka might just replace the http/2 transport and all of this will be transparently hidden under the client?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea. Let's cross that bridge if we ever come to it. It's certainly not in scope for this KLIP :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not in scope of this KLIP, but it would be good to look at whether the new client interfaces are built to easily wrap the Kafka clients in a variety of languages. The KSQL Server shouldn't become a Kafka REST proxy. And ideally a KSQL client shouldn't have to implement the Kafka protocol.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether we decide to support direct access to Kafka topics from the ksqlDB client in the future or not, and how we decide to implement that, e.g. wrapping the Kafka client or proxying through ksqlDB (I would prefer proxying as it would be a pain to have to configure the client to know about two sets of servers) has no impact on the form of the client API that we're proposing here, afaict.
FYI I have updated the docs with a new section on receiving results as a single JSON object. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @purplefox ! It looks good to me. I am glad that we are settling on HTTP2 / json. One thing that would be worth calling out explicitly as a design goal is to be able to support clients in as many languages as possible, and to make sure that our apis / payloads / protocols are compatible for that goal.
I think by standardizing on http2/json, we have achieved that target for now. but if we want to move to a binary protocol for performance, read from kafka, etc, this goal will help guide the future choices.
|
||
* We are not creating a new RESTful API. | ||
* We are not currently looking to write clients for languages other than Java and JavaScript (although those may follow later) | ||
* We will not initially allow for consuming directly from Kafka topics using the client (although tha may follow later) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely not in scope of this KLIP, but it would be good to look at whether the new client interfaces are built to easily wrap the Kafka clients in a variety of languages. The KSQL Server shouldn't become a Kafka REST proxy. And ideally a KSQL client shouldn't have to implement the Kafka protocol.
Sounds good. We can iterate on these pieces individually. Thanks @purplefox for driving this across the finish line! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late comments -- I did not get time earlier to participate in the discussion. Overall a great KIP! Couple of minor comments and question.
What I am also wondering: the KLIP basically describe the communication protocol between the client and the server, but it does not describe the client API a developer would use to write their application, what seems to be an important part of this work (the KLIP just mentions the client API in a high level manner, and also only for the Java clients -- not sure if this high level description applies to the JavaScript client, too). Do you plan to do follow up KLIPs for the Java and JavaScript client?
|
||
The request method will be a POST. | ||
|
||
Requests will be made to a specific URL, e.g. "/query-stream" (this can be configurable) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a better default name? Why not just query
?
```` | ||
{ | ||
"query": "select * from foo", <----- the SQL of the query to execute | ||
"push": true, <----- if true then push query, else pull query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it required to encode the query type? The query
parameter contains this information implicitly, doesn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we find a better default name? Why not just query?
I don't have a strong opinion either way. "query-stream" is really just a placeholder. We can easily change it before release.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The KLIP is a bit out of date. "push" is indeed not needed (and isn't present in the code).
As a general point, I have deliberately tried to keep the KLIP as light as possible on details. I am not a big fan of detailed up-front design. In reality things will change a lot during development, so it's much more efficient to just get on with the development, see what works and what doesn't, than spend a lot of time trying to specify things at a detailed level at the beginning. The "design doc" can then be updated retrospectively as a good design is discovered during development. I will update the klip to reflect the current state.
{ | ||
"query": "select * from foo", <----- the SQL of the query to execute | ||
"push": true, <----- if true then push query, else pull query | ||
"limit": 10 <---- If specified return at most this many rows, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was meant as an illustration, not an exact definition of the arguments to the real endpoint.
Should the KLIP not reflect the exact definition? IMHO, the KLIP should allow anybody to write a ksqlDB client in their language of choice.
I think there is a comment in there already to that effect
This sentence should go before the "illustrative example" to set expectations right upfront.
|
||
If acks are requested then the response will be written to the response when each row has been | ||
successfully committed to the underlying topic. Rows are committed in the order they are provided. | ||
Each ack in the response is just an empty JSON object, separated by newlines: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the reason for this design? When data is written into Kafka, the producer puts multiple records into a batch and effectively acks on a per batch basis. Would it be more efficient to apply a similar pattern?
What happens if a record cannot be inserted for whatever reason (for example a single record could exceed the maximum allowed message size, or the JSON is invalid and cannot be parsed).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I too have concerns around the error path.
If we can stream results to the server, and get an empty ack message streamed back on success, then I'm wondering what we're expecting to happen on failure?
Are we going to end up with all client impls needing to track in-flight requests?
To allow clients to correlate error response to request we could always include the request itself as part of the error response.
Also, when is the ACK sent back? Once the server has processed the request and enqued the message to be produced to Kafka, or when the producer call back confirms the message was produced to Kafka? Hopefully the second.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should the KLIP not reflect the exact definition? IMHO, the KLIP should allow anybody to write a ksqlDB client in their language of choice.
There will be API documentation on the web-site which will explain the exact endpoints, arguments and how to use them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens is a record cannot be inserted for whatever reason (for example a single record could exceed the maximum allowed message size, if the JSON is invalid and cannot be parsed).
In case of error, an error response is sent back on the same stream and the stream is closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hopefully the second.
Ack!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To allow clients to correlate error response to request we could always include the request itself as part of the error response.
This is unnecessary as acks are always returned in the same order the inserts were sent. From the clients point of view this makes correlation of responses a bit simpler as they don't have to maintain a map of insert_id, request. They just need to store a queue of requests and when responses come back pop the head of the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok...on second thoughts, I am going to send back the send_id in the ack. The reason being that on the server, acks for sends to Kafka can come back in a different order if they are sent to different partitions. I don't want to have to sort the acks on the server, so I'll send them back as they come in and the client can correlate them.
desirable due to memory and latency constraints. Moreover, in the world of streaming it's quite common | ||
to want to pipe the stream from one place to another without looking into it. In that case it's very | ||
inefficient to deserialize the bytes as JSON simply to serialize them back to bytes to write them | ||
to the output stream. For these reasons the results, by default, are sent as a set of JSON arrays |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case it's very inefficient to deserialize the bytes as JSON simply to serialize them back to bytes to write them to the output stream.
Not sure if I can follow. Why would somebody not be able to just copy bytes for this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's quite a common pattern in messaging (e.g. when implementing a router) to want to only inspect the headers of a message and not deserialise the body of the message for efficiency reasons. If the whole envelope was JSON you'd have to deserialise the whole thing. In the future we might want to introduce headers outside of the JSON body.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will however remove this sentence as we don't support headers (yet)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense -- it not clear from the original write up though. It sounds like as if different message are sent a JSON array, what only helps to get efficient access to a single message. (And does not help at all for the use case you describe.)
If we want to support headers, each message itself would need to be represented a s JSON array though, with explicit record/header "slot" for example. Is that what you tried to say?
Atm it seems that a single record is a single JSON with a simple tuple layout (not even the Kafka internal attribute-to-key/value mapping can be inferred -- what is desired I guess -- only the schema metadata would allow to infer the mapping -- but we want to hide the internal Kafka key/value model anyway I guess).
Just following up to make sure I understand correctly.
may always want to parse them as a single object in memory. To support this use case we can allow the request | ||
to contain an accept-header specifying the content-type of the response. To receive a response as | ||
a single JSON object content-type should be specified as 'text/json', for our delimited format we | ||
will specify our own custom content type. The delimited format will be the default, as the API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for our delimited format we will specify our own custom content type
The KLIP should specify this content type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be in the detailed API docs. At the moment it's just a placeholder.
|
||
The new server API will be implemented using Vert.x | ||
|
||
The implementation will be designed in a reactive / non blocking way in order to provide the best performance / scalability characteristics with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: non-blocking
Description
This PR introduces klip-15 - proposal for a new client and API updates.
Testing done
N/A
Reviewer checklist