-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add command topic offset to commands and perform validation based on offset when executing commands #3330
Conversation
final int commandTopicOffset = queuedCommand.getCommand().getCommandTopicOffset(); | ||
if (commandTopicOffset == -1 | ||
|| commandTopicOffset == commandStore.getSnapshotWithOffset().getSnapshotOffset()) { | ||
commandStore.setOffsetValue(queuedCommand.getOffset() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up splitting updating the offset and the snapshot into two separate calls because there appears to be a race condition that can occur when a KsqlRequest contains multiple statements.
The integration test KsqlResourceFunctionalTest.shouldHandleInterDependantCsasTerminateAndDrop was failing
final List<KsqlEntity> results = makeKsqlRequest(
"CREATE STREAM SS AS SELECT * FROM " + PAGE_VIEW_STREAM + ";"
+ "TERMINATE CSAS_SS_" + NEXT_QUERY_ID.get() + ";"
+ "DROP STREAM SS;"
These commands are being pushed to the command topic in this order
Command{statement='CREATE STREAM SS WITH (KAFKA_TOPIC='SS', PARTITIONS=1, REPLICAS=1) AS SELECT *
FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL
EMIT CHANGES;',commandTopicOffset=1, overwriteProperties={}}
Command{statement='TERMINATE CSAS_SS_0;',commandTopicOffset=1, overwriteProperties={}}
Command{statement='DROP STREAM SS;',commandTopicOffset=2, overwriteProperties={}}
The terminate command isn't being executed since the offset signature doesn't match the current one, which then causes the drop statement to fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with doing this is that we now have a race. The runner thread does:
1. Set the new offset/signature
2. Run the command
3. Set the new snapshot
A conflicting request can come in and be validated between 1 and 2, but not actually be a valid request.
Instead, we can add a recompute()
method to SnapshotWithOffset
that returns a new SnapshotWithOffset
with an updated offset/signature (implemented by incrementing the offset). We'd also need to know when to call recompute
. We can do that by passing in a predicate from KsqlResource
that returns true if the command doesn't have a custom executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another option, which I think I like better would be to add the notion of a "batch" to CommandStore
. So you could have something like:
class CommandStore {
BatchContext newBatch(final long offset) {
return new BatchContext(offset);
}
// then add BatchContext as an argument
enqueueCommand(final BatchContext ctx, final String statement, ...) {
final Command command = new Command(ctx.offset(),...);
ctx.incrementOffset();
}
}
36b6033
to
ace0cfb
Compare
…ed on offset when executing commands
ace0cfb
to
ff00fea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @stevenpyzhang! I think we are on the right track, but need to tighten up some raciness. I've left that feedback in-line. I also think that we should use the name "signature" rather than "offset" outside of the CommandStore - the fact that we're using the offset is an implementation detail.
@@ -91,4 +91,10 @@ void ensureConsumedPast(long seqNum, Duration timeout) | |||
*/ | |||
@Override | |||
void close(); | |||
|
|||
void setOffsetValue(int offsetValue); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combine these into a common call setSnapshotWithOffset
- we need to update them together atomically.
); | ||
commandStore.setSnapshot(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to complete the command with an error so that the request thread is not blocked.
@@ -102,8 +127,11 @@ public QueuedCommandStatus enqueueCommand(final ConfiguredStatement<?> statement | |||
final CommandId commandId = commandIdAssigner.getCommandId(statement.getStatement()); | |||
final Command command = new Command( | |||
statement.getStatementText(), | |||
snapshotWithOffset.getSnapshotOffset(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The offset we put in the command needs to be the exact same offset used when validating - by this point the offset may have changed. We should instead pass the offset in (which we validated against), and write that offset into the command.
@JsonProperty("streamsProperties") final Map<String, Object> overwriteProperties, | ||
@JsonProperty("originalProperties") final Map<String, String> originalProperties) { | ||
this.statement = statement; | ||
this.commandTopicOffset = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would refer to this as a validationSignature
. The fact that we're using the offset is an implementation detail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally like offset
because it also provides some guarantees its characteristics (i.e. monotonically increasing) - so it gives more, perhaps useful, information beyond "signature"
final int commandTopicOffset = queuedCommand.getCommand().getCommandTopicOffset(); | ||
if (commandTopicOffset == -1 | ||
|| commandTopicOffset == commandStore.getSnapshotWithOffset().getSnapshotOffset()) { | ||
commandStore.setOffsetValue(queuedCommand.getOffset() + 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with doing this is that we now have a race. The runner thread does:
1. Set the new offset/signature
2. Run the command
3. Set the new snapshot
A conflicting request can come in and be validated between 1 and 2, but not actually be a valid request.
Instead, we can add a recompute()
method to SnapshotWithOffset
that returns a new SnapshotWithOffset
with an updated offset/signature (implemented by incrementing the offset). We'd also need to know when to call recompute
. We can do that by passing in a predicate from KsqlResource
that returns true if the command doesn't have a custom executor.
@@ -85,6 +94,22 @@ public String getCommandTopicName() { | |||
return commandTopic.getCommandTopicName(); | |||
} | |||
|
|||
@Override | |||
public void setOffsetValue(final int offsetValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Combine setOffsetValue
and setSnapshot
into 1 method. In that method, we should create a new SnapshotWithOffset with the new snapshot/offset. Then, having made snapshotWithOffset
an atomic reference as suggested above, we should do snapshotWithOffset.set(/* the new object we created */)
@@ -45,6 +46,8 @@ | |||
private final CommandIdAssigner commandIdAssigner; | |||
private final Map<CommandId, CommandStatusFuture> commandStatusMap; | |||
private final SequenceNumberFutureStore sequenceNumberFutureStore; | |||
private SnapshotWithOffset snapshotWithOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to AtomicReference<SnapshotWithOffset>
. Then we can safely read it from the request threads even as its written by the command runner thread.
|
||
@Override | ||
public SnapshotWithOffset getSnapshotWithOffset() { | ||
return snapshotWithOffset; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having changed snapshotWithOffset
to an atomic reference, this would be snapshotWithOffset.get();
|
||
import io.confluent.ksql.KsqlExecutionContext; | ||
|
||
public class SnapshotWithOffset { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename to SnapshotWithSignature
Just leaving a comment saying that I'd like to review this before committing! I'll get to this tomorrow :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we're trying to batch too many nuanced changes together with this one PR, and some that (in my opinion) need a more thorough design, perhaps even a KLIP so that the problem can be clearly understood. Writing distributed validation is a really hard problem (you can see my failed attempt in #2582), and if I understand correctly the more urgent problem we're trying to solve isn't #2435 but rather #3269, which doesn't need that.
For this change, I'd be more comfortable sticking to just adding the offset to QueuedCommand
and using that in queryID generation (and just using -1 for queryID generation on the REST side validation) - we can handle race conditions in a future change.
cc @rodesai - thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @stevenpyzhang
I've not got time to review this this evening, but I'd like to review before its merged... hence requesting changes..
@@ -127,7 +127,8 @@ public RecordMetadata send(final CommandId commandId, final Command command) { | |||
new QueuedCommand( | |||
record.key(), | |||
record.value(), | |||
Optional.empty())); | |||
Optional.empty(), | |||
(int) record.offset())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offset is a long
- we shouldn't be casting it to an int
.
The problem is that this is likely to corrupt data if there are racing commands, and I'd rather not change the query id generation to something we know is broken for even the simple case without restarts. Ideally we'd build the command validation mechanism first, and then build query id generation on top of that. |
After some offline discussion, this PR will be put on pause for now. There's a smaller PR open now #3343 that just focuses on adding fields to QueuedCommand and Command objects in order to update query id generation code. |
The new approach is to use Kafka's transactional producer APIs so closing this pr as it's not relevant now. |
Description
Based off of discussion in #3278 and #2435.
This PR adds an offset value to both Command and QueuedCommand objects. The field in Command represents the point when a Ksql statement was validated and put on the CommandTopic. The field in QueuedCommand represents the offset that the command was read from, it's assigned from the corresponding Kafka ConsumerRecord.offset()
Request Validator was also rewritten to take a KsqlExecutionContext instead of a ServiceContext
This change also makes it so that when a statement that creates a query is executed, query_id generation can utilize the current statement offset instead of relying on incrementing a value (Future PR).
Testing done
Local tests
Update existing tests
Reviewer checklist