-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Transactional Produces to Command Topic #3660
feat: Transactional Produces to Command Topic #3660
Conversation
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java
Outdated
Show resolved
Hide resolved
@@ -470,6 +470,10 @@ private Properties buildBrokerConfig(final String logDir) { | |||
config.put(KafkaConfig.LogRetentionTimeMillisProp(), -1); | |||
// Stop logs marked for deletion from being deleted | |||
config.put(KafkaConfig.LogDeleteDelayMsProp(), Long.MAX_VALUE); | |||
// Set to 1 because only 1 broker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without setting these, this error appears for some integration tests
ERROR [KafkaApi-0] Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis:74)```
ce57314
to
ba7c7a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey Steven, thanks for this!
I'm going to start a review, but I wanted to make sure I understand the motivations for this PR before I start:
We want to ensure that every statement written to the commandTopic is being validated after every statement before it has been processed before in order to have the most up to date snapshot of the KsqlEngine for validation purposes.
Could you elaborate a bit on the above statement? I'm trying to understand why this is important. Perhaps you could illustrate with an example?
@purplefox , Currently, before we produce commands to the command topic, we do validation on the statement to ensure it can be executed properly. And when we're actually executing the statement on a KSQL server after consuming it from the command topic, we're also doing these validations. There's two main forms of validations being done, internal state and external state. For example, a We're trying to move away from doing validation when executing the statement from the command topic. A simple example of something that this code would affect is currently if two KSQL servers concurrently try to produce the same CREATE STREAM FOO to the command topic, both would be enqueued. One of the statements would fail when it's actually executed. Although in this example the final state of the KSQL server is fine (a Stream named FOO is created), It would be better if we can ensure every statement written to the command topic at offset n has been validated against a KSQL internal state that has processed all statements up to n-1 (the internal KSQL state is most up to date). There's some discussion in this issue that would also be helpful in getting context. Also includes some more examples |
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManager.java
Outdated
Show resolved
Hide resolved
Thanks Steven, I think I have a clearer picture of the motivations here now.
This seems to imply that every time we add anything to the command topic we must first wait for all outstanding commands to be processed locally. This means we're going to incur, at least a round trip time to the Kafka broker for each command, i.e. our throughput we will be limited by the latency of ksql<->kafka. If that's measured in milliseconds it means our theoretical maximum throughput will be in the 100s of transactions of second, which seems very low. I'm wondering if we really want to impose such a bottleneck on the system? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! Just a few comments.
@@ -646,7 +668,8 @@ private static void maybeCreateProcessingLogStream( | |||
return; | |||
} | |||
|
|||
commandQueue.enqueueCommand(configured.get()); | |||
commandQueue.enqueueCommand(configured.get(), producerTransactionManager); | |||
producerTransactionManager.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we also be catching exceptions and aborting the transaction on failure?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Transactions have been isolated to DistributingExecutor and will be aborted if an exception is thrown.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/ProducerTransactionManager.java
Outdated
Show resolved
Hide resolved
|
||
try { | ||
int retries = 0; | ||
while (commandRunner.getNumCommandProcessed() < endOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of polling getNumCommandProcessed() we could add a method to commandRunner waitUntilOffset(long offset)
which didn't return until the required offset was reached. Internally it could use Object.wait() and Object.notify() to wait for the required condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assuming that every offset in the command topic corresponds to a command? I thought that with transactions these need not necessarily be in sync since transactions write additional messages into the topic (which take up offsets but aren't commands), though I'm having a surprisingly tough time finding docs about this. Would be good to have someone else confirm or deny.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I was testing this locally, I was able to make several transactions in a row without this method getting stuck. If additional messages were taking up offsets, I'd hit the retry limit and have an exception thrown which didn't happen. So that behavior makes me think it doesn't effect commandConsumer.endOffsets() but someone more familiar with this should chime in. I can't seem to find any documentation about these additional messages either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every transaction will write at least one control message to each partition in the transaction. This control message (commit or abort message) will have an offset. I'm actually surprised that your local testing worked.
Anothre risk is if we have aborted transactions: and there is a chance of these if an active produce gets fenced by a newer instance coming on line. In this case, the number of valid (non-aborted commands) will always be divergent from the end offsets.
The docs on how this works can be found here https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up reusing the functionality we already have in place for commandStore.ensureConsumedPast() which uses offsets. position() will get the next offset to fetch so this will ignore the control messages
} | ||
throw new RuntimeException(e.getCause()); | ||
} catch (final InterruptedException e) { | ||
throw new RuntimeException(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we not abort the tx on all exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the exception should bubble up and get handled in DistributingExecutor where the transaction would be aborted
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunner.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/RequestHandler.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java
Outdated
Show resolved
Hide resolved
The reason I'm not concerned about this is that we only need to follow the protocol implemented here for statements that need to be logged to the command topic (CREATE STREAM, CREATE TABLE, CREATE STREAM AS, CREATE TABLE AS, DROP, TERMINATE). We don't expect a significant rate of these on any KSQL cluster. |
5d0a69f
to
81037c5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Further, topics which are included in transactions should be configured for durability. In particular, the replication.factor should be at least 3, and the min.insync.replicas for these topics should be set to 2. Finally, in order for transactional guarantees to be realized from end-to-end, the consumers must be configured to read only committed messages as well.
As a part of this effort, could we also introduce configs for the ksql internal topics (i.e command topic) be configured for durability. When I was looking at something else, noticed that we only support setting "ksql.internal.topic.replicas"
, which may not be sufficient..
@@ -646,7 +668,8 @@ private static void maybeCreateProcessingLogStream( | |||
return; | |||
} | |||
|
|||
commandQueue.enqueueCommand(configured.get()); | |||
commandQueue.enqueueCommand(configured.get(), transactionalProducer); | |||
transactionalProducer.commit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the exception handling and the abortTransaction() handled inside enqueueCommand
> Transactional Producer::send
? could we avoid calling commit() for an aborted transaction ?
Ah I see the current implementation does this wait for all commands. We should fix that. |
Should the TransactionalProducer be moved to DistributingExecutor then? It would help get rid of that injection pattern I currently have and it makes sense since DistributingExecutor would be the only one that uses it. I think the main problem though is that the transaction needs to be started before the RequestValidator validation happens. If there's CREATE STREAM, CREATE TABLE, etc... |
81037c5
to
8b6fef4
Compare
I think the current structure is reasonable (other than the way the producer is injected - but I'll leave that for the code comments) - the distributing executor doesn't need to know about validation. It would definitely be worthwhile to optimize away creating the producer/consumer, and checking offsets for statements that don't need to distribute to the command topic though. |
One option I thought of for getting rid of injecting the TransactionalProducer into DistributingExecutor could be to have it implement a DistributedStatementExecutor interface which extends StatementExectuor. DistributedStatementExecutor interface would have an execute function with the required arg types for passing in the producer. |
@rodesai I thought during a discussion we had about the implementation details of this, you mentioned that we needed to create a new Producer for each transaction since there could be multiple threads on a server trying to produce to the command topic. I believe the consumers for getting offset could be optimized to just one consumer that's shared among all the TransactionalProducers (passed to each new TransactionalProducer from the factory class). |
I just mean optimizing it away when handling requests that don't need a transaction (e.g. |
import java.util.Objects; | ||
import org.apache.kafka.clients.producer.ProducerConfig; | ||
|
||
public class TransactionalProducerFactory { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than having a separate class for this, I'd just add an interface to CommandQueue like createTransactionalProducer
, and drop enqueueCommand
from the CommandQueue
interface. Also, to avoid breaking the current abstraction (KsqlResource
/RequestHandler
just talk to a CommandQueue
) we should have an interface for TransactionalProducer
:
interface TransactionalProducer {
void begin();
void waitForConsumer();
void QueuedCommandStatus send(final CommandId commandId, final Command command); // here send would just return the offset
void abort();
void commit();
void close();
}
The current TransactionProducer
can move to an implementation of this interface and be created by CommandStore
when createTransactionalProducer
is called.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
commandStore would need to keep track of the commandRunner in order to pass it to each new TransactionalProducer in createTransactionalProducer
. I'm not sure how exactly that would work out because the commandStore is created first in RestApp and then the commandRunner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't we reuse the existing mechanism in CommandStore
to do this? (see the implementation of completeSatisfiedSequenceNumberFutures
). Then the producer factory has no dependency on CommandRunner
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The CommandStore version introduces a race condition. The futures for the sequenceNumbers are completed before the commands are actually executed in commandRunner. There's a bit of lag between consuming and actually executing the command and updating the metastore. Also, the CommandRunner could get stuck executing consumed commands, but ensureConsumedPast() in CommandStore wouldn't block the TransactionalProducer.waitForConsumer() call and then the transaction would proceed on an out of sync metastore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it and playing around with the existing functionality of CommandStore.ensureComsumedPast()
, it fits with what I was trying to achieve by duplicating the code. I removed the code from CommandRunner and the factory class. I'm not sold on dropping enqueueCommand()
from commandQueue though since the CommandStore.commandStatusMap
is being updated in enqueueCommand()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sold on dropping enqueueCommand() from commandQueue though since the CommandStore.commandStatusMap is being updated in enqueueCommand()
Is the concern that you won't be able to get to the map from the queue? In this way of doing things the implementation of the producer interface could be inside CommandStore, so that shouldn't be a problem. So something like:
interface TransactionalProducer {
send()
}
interface CommandQueue {
getProducer()
}
class CommandStore {
Producer getProducer() {
return new TopicProducer();
}
class TopicProducer implements TransactionalProducer {
send() { ... }
}
}
This way, the underlying queue is still encapsulated away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind if I leave this for a future PR? I think this PR has gone through enough refactors already.
|
||
final QueuedCommandStatus queuedCommandStatus = | ||
commandQueue.enqueueCommand(injected, transactionalProducer); | ||
|
||
final CommandStatus commandStatus = queuedCommandStatus |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't actually wait for the status here any more, since the consumer thread will not have read the command yet. Instead, we should wait for the status at the end once all the commands have been enqueued. But, that raises another issue - we might have statements that read the meta-store (eg list streams
or describe
) after a logged command. These statements should see the changes applied by the logged command. So we either need to:
- maintain a shadow engine clone and apply the commands to it as we go along, and then execute statements that don't need to be logged against the clone.
- run the validate-execute-commit loop for each statement one-by-one (rather than validating them all up front).
My vote is for the latter option, but we should see what other folks think (cc @big-andy-coates @agavra)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also re-run validation for logged statements as part of execute. So DistributingExecutor
would just run the whole protocol (init txn, validate, log cmd, commit). It's a bit wasteful to re-run validation, but I think this is the simplest way to get this PR unblocked.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- maintain a shadow engine clone and apply the commands to it as we go along, and then execute statements that don't need to be logged against the clone.
This option could potentially return misleading results in the event of an abortTransaction();
//starting from no streams
CREATE STREAM foo1(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
CREATE STREAM foo2(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
show streams;
CREATE STREAM foo3(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
If the first two messages are sent and applied to the shadow engine, list streams
executed against the shadow engine would return that stream foo1 and foo2 have been created. If sending foo3 fails, the transaction wouldn't be committed and the commandRunner never processes any of the 3 Create Stream statements. Running list streams
again would show that the server never actually created the streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- run the validate-execute-commit loop for each statement one-by-one (rather than validating them all up front).
I think there's also issues with this. With how the code is currently designed, a KsqlEntityList is only returned if all the statements are successfully executed.
ksql> CREATE TABLE foo400(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
>CREATE TABLE foo200(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
>Show Tables;
CREATE TABLE foo400(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
Message
---------------
Table created
---------------
CREATE TABLE foo200(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
Message
---------------
Table created
---------------
Show Tables;
Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
FOO200 | foo | JSON | false
FOO400 | foo | JSON | false
----------------------------------------------
ksql>
ksql> CREATE TABLE foo100(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
>CREATE TABLE foo200(age BIGINT) WITH (KAFKA_TOPIC='foo', VALUE_FORMAT='JSON');
>Show Tables;
Cannot add table 'FOO200': A table with the same name already exists
ksql> show tables;
Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
FOO100 | foo | JSON | false
FOO200 | foo | JSON | false
FOO400 | foo | JSON | false
----------------------------------------------
ksql>
The code would need to be refactored to be able to handle processing individual commands (which should probably be done before implementing the TransactionalProduces)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This option could potentially return misleading results in the event of an abortTransaction();
In this case we'd just fail the whole request - so no response to list streams;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code would need to be refactored to be able to handle processing individual commands (which should probably be done before implementing the TransactionalProduces)
I'm not sure I follow. It's possible today for any command to fail when executing, in which case we just fail the whole command. We could do the same thing if validation inside the transaction fails. I agree it's better to return the partial response with the results for what we could successfully execute. My point is that this is orthogonal to what we're trying to do here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it's better to return the partial response with the results for what we could successfully execute. My point is that this is orthogonal to what we're trying to do here.
I also think it would be good to have a partial response, but it's not a deal breaker for me. If the team is OK with the behavior I outlined here #3660 (comment) then I'm on board. It could also be fixed in a follow up PR to return a partial response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up going with
Running the protocol on each statement one-by-one (re running validation on distributed statements)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could also re-run validation for logged statements as part of execute. So DistributingExecutor would just run the whole protocol (init txn, validate, log cmd, commit). It's a bit wasteful to re-run validation, but I think this is the simplest way to get this PR unblocked.
I think we're breaking an abstraction by passing the validator into the executor and it'll cause coupling problems down the line. It makes sense to run the validate-execute-commit
one by one but not this way. Instead, the transaction logic should be pushed up to the top level that does the validate-execute loop instead of pushing it down into the execute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah doing the validation in DistributedExecutor
was just meant to be an immediate-term thing so we can get this change merged without blocking on deciding how we really want the validation to be done. I'm personally OK with validating-executing each statement one by one. This is the way it was done at first. At some point @big-andy-coates changed it to validate everything up-front. It would be good to get his perspective on why this was done.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/TransactionalProducer.java
Outdated
Show resolved
Hide resolved
67467f5
to
db2dd8a
Compare
Updates to PR: New Configs
The new protocol is:
One UX point is that some early statements may be committed, but a later one fails to be committed (since statements aren't batch committed to the command topic). We'd return an error message for that failed statement, but not any partial success responses
This would need to be addressed in a follow up PR to return partial success responses. |
…face to default kafka producer one
…log stream statemen
aad8942
to
5e42500
Compare
2062bf8
to
e862fb2
Compare
e862fb2
to
62756fd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
)" This reverts commit cba2877.
…entinc#3660)"" This reverts commit 219852b.
Description
Protocol
We want to ensure that every command written to the commandTopic is being validated only after all previous commands have been processed in order to have the most up to date snapshot of the KsqlEngine for validation purposes.
Some notes
waitForCommandConsumer
in order to get the end offset of the command topic, this can't be done with the existing Consumer in CommandTopic.java because it's running in the CommandRunner thread, which is separate from the thread handling requests in KsqlResource (consumers are not thread-safe)-new configs added to improve durability of command topic, set directly in code unless otherwise specified.
These changes also require additional ACLs to be set including
Follow up to this PR:
#3795
#3768
Transactional Producer API Docs
https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
Kip introducing transactions
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging#KIP-98-ExactlyOnceDeliveryandTransactionalMessaging-TestPlan
More on Transactional Messaging in Kafka
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+in+Kafka
Testing done
Existing tests fixed, may need a new integration test to ensure functionality.
Reviewer checklist