Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14462; [17/N] Add CoordinatorRuntime #13795

Merged
merged 5 commits into from
Jun 13, 2023
Merged

Conversation

dajac
Copy link
Member

@dajac dajac commented Jun 1, 2023

This patch introduces the CoordinatorRuntime. The CoordinatorRuntime is a framework which encapsulate all the common features requires to build a coordinator such as the group coordinator. Please refer to the javadoc of that class for the details.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Jun 1, 2023
@dajac dajac force-pushed the KAFKA-14462-17 branch from fd4b01d to 6131fb5 Compare June 2, 2023 19:07
* @param tp The topic partition of the coordinator.
*/
private CoordinatorContext(
TopicPartition tp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this mean a coordinator will be associated with multiple coordinator contexts since a coordinator owns multiple partitions?

never mind -- looks like we have 1-1 mapping of coordinator to topic partition

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also wondering this, so thanks for bringing it up.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Every partition is mapped to a CondinatorContext and a Coordinator. The Coordinator here is basically the replicated state machine.

* @return A result containing a list of records and the RPC result.
* @throws KafkaException
*/
CoordinatorResult<T, U> generateRecordsAndResult(S state) throws KafkaException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think it's a bit confusing to use state since we have CoordinatorState. can we use stateMachine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that I will use coordinator to stay consistent with the rest of the class.

Comment on lines +390 to +468
// If the records are not empty, first, they are applied to the state machine,
// second, then are written to the partition/log, and finally, the response
// is put into the deferred event queue.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can apply to the state machine first because we will revert to latest committed snapshot if the append fails right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

Comment on lines 447 to 449
* Generates the response to implement this coordinator read operation. A read
* operation received that last committed offset and it can use it to only
* access committed state in the timeline datastructures.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm not sure what the comment is saying here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rephrases it.

*
* @param tp The topic partition of the coordinator. Records from this
* partitions will be read and applied to the coordinator.
* @param partitionEpoch The epoch of the partition.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we give a bit more description on what the difference is between this and context.epoch?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are basically the same: context.epoch = partitionEpoch. Is the name confusing?

CoordinatorContext context = getOrCreateContext(tp);
if (context.epoch < partitionEpoch) {
context.epoch = partitionEpoch;
if (context.state == CoordinatorState.LOADING) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since we only run one event at a given time for a partition, my intuition tells me that we should not hit this condition. when would this be true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loading is asynchronous so it could be that the leader epoch changes while we are already loading the coordinator.

*/
private void revertLastWrittenOffset(
long offset
) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we want a check that is the opposite of the updateLastWrittenOffset (ie, offset can not be greater than last written offset?)

LOADING {
@Override
boolean canTransitionFrom(CoordinatorState state) {
return state == INITIAL || state == CLOSED || state == FAILED;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we can go from closed/failed back to loading?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very good question. it actually depends on whether we want to recycle the context or not. after thinking a bit more about it, i think that it is preferable to use reuse it (cause of the snapshot registry). i have updated the state machine and the code to reflect this.


case CLOSED:
state = CoordinatorState.CLOSED;
partitionWriter.deregisterListener(tp, highWatermarklistener);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are all of these methods safe to call from initial state? seems like that state transition is valid.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

almost. updated the code.

result = op.generateRecordsAndResult(context.coordinator);

if (result.records().isEmpty()) {
// If the records are empty, it was a read operation after all. In this case,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What scenarios would we have a write event that was a "read operation after all"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imagine a simple heartbeat request. we have to treat is as a write because it could alter the state but the heartbeat may not do anything.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok -- so it's cases where a request has the potential to write, but if no change will not. Makes sense.

}

/**
* A coordinator internal event.
Copy link
Member

@jolshan jolshan Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that this event will not return a response -- but does it also not write records? Is it read only? Or it it only on the in memory state?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of them. This is just a way to schedule an internal task (e.g. load/unload ops).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in other words -- not writing or reading but doing some operation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth clarifying what "internal" means in the comment

* @param exception The exception to complete the future with.
*/
@Override
public void complete(Throwable exception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complete is a bit strange here since we only call this in the error case. Would it make more sense to just call this in the finally block and if the error is null do nothing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make more sense to just call this in the finally block

I am not sure to understand what you mean by this. Could you elaborate?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The others followed a pattern where they both called complete, but maybe it's not a huge deal. Was suggesting a way to continue this pattern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am not sure. In this case, complete is only useful in case of errors so I would not overuse it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I guess if we don't use futures, it's not as important.


scheduleInternalOperation("UnloadCoordinator(tp=" + tp + ", epoch=" + partitionEpoch + ")", tp, () -> {
CoordinatorContext context = contextOrThrow(tp);
if (context.epoch < partitionEpoch) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the context.epoch stay the same until the next load?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. The epoch is basically the leader epoch of the partition. The broker notifies us when a new ones comes and we update it here. However, we only load if we have to.

@dajac dajac force-pushed the KAFKA-14462-17 branch 2 times, most recently from 55569a9 to 5cf4392 Compare June 6, 2023 06:49
@dajac dajac force-pushed the KAFKA-14462-17 branch from 5cf4392 to 3c2c9f4 Compare June 6, 2023 14:26
@dajac dajac marked this pull request as ready for review June 6, 2023 14:26
private final CoordinatorBuilderSupplier<S, U> coordinatorBuilderSupplier;

/**
* Constructor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment helpful 😅

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree... but it does not hurt, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose not. It could also do without it. Up to you.

*/
void run();

/**
* Completes the event with the provided exception.
*
* @param exception An exception to complete the event with.
* @param exception An exception if the processing of the event failed or null.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe "An exception if the processing of the event failed or null otherwise"
I read this as exception if the event failed or was null.

@@ -39,7 +40,7 @@ public class MultiThreadedEventProcessor implements CoordinatorEventProcessor {
/**
* The accumulator.
*/
private final EventAccumulator<Integer, CoordinatorEvent> accumulator;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this changed from integer to topic partition so that we can use different coordinator state partitions (ie consumer offsets vs transactional state)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It was also just easier to use TopicPartition everywhere.

* An in-memory partition writer that accepts a maximum number of writes.
*/
private static class MockPartitionWriter extends InMemoryPartitionWriter<String> {
private int allowedWrites = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we don't need to set 1 right? It will not be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right.

CompletableFuture<Void> future = new CompletableFuture<>();
when(loader.load(TP, coordinator)).thenReturn(future);

// Getting the coordinator context fails because the coordinator
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this fail not because we aren't loaded, but because we haven't even started loading? On 201, it says we are still loading, but were able to get the context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it fails because the context does not even exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it -- the comment is a little unclear, since we can get the context while it is still not fully loaded, but the issue is that we didn't start loading (which starts with creating the context).

Maybe we could say "Getting the coordinator context fails because the coordinator hasn't started loading and hasn't crated the context"? That might be a bit wordy, but something like that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment.

assertEquals(CoordinatorRuntime.CoordinatorState.FAILED, ctx.state);

// Verify that onUnloaded is called.
verify(coordinator, times(1)).onUnloaded();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also want to check deregister was called? (I suppose we don't actually register, but the code path does contain this and the other test looks for registerListener)

verify(coordinator, times(1)).onUnloaded();

// Create a new coordinator.
coordinator = mock(MockCoordinator.class);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to create a new coordinator here? We don't seem to verify that it is different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do this to ensure that a new coordinator is created in this case. We verify the new value a few line below: assertEquals(coordinator, ctx.coordinator);.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see. When I saw assertEquals(coordinator, ctx.coordinator) before I was confused because that would be true with the old coordinator. But we are testing that the ctx is updating.

verify(writer, times(1)).deregisterListener(
eq(TP),
any(PartitionWriter.Listener.class)
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check that the state is closed?

// The last committed offset is updated.
assertEquals(3L, ctx.lastCommittedOffset);
// The snapshot is cleaned up.
assertEquals(Collections.singletonList(3L), ctx.snapshotRegistry.epochsList());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my understanding, we will always keep the latest snapshot in case we need to rollback?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. We always keep the last snapshot as we may have to rollback to it when a new write it applied but fails.


// It is completed immediately because the state is fully commited.
assertTrue(write4.isDone());
assertEquals("response4", write4.get(5, TimeUnit.SECONDS));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the snapshot registry have 4L as the epochList now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it won't. The last write here does not yield any records so a new snapshot is not created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. The same happens on line 498 👍

assertEquals(Arrays.asList(0L, 2L), ctx.snapshotRegistry.epochsList());
assertEquals(mkSet("record1", "record2"), ctx.coordinator.records());

// Write #2. It should fail.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fails because we only allow one write? (That took me a moment to realize)

If we returned 3 records in write2, I think this test would be stronger.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of records does not matter here. The writer is configured to only accept one successful call to PartitionWriter#append. Let me extend the comment to make this clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah -- comments would help. I think the other part I was suggesting is that write #2 seems to be the same as write #1. So if it didn't fail due to the write limit, would we have in the records ["record1", "record2", "record1", "record2"]? If that's the case this is fine.

I originally thought that if the records were the same, it was a no op, but I was confusing things.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that the records are not great in this test. For the context, the runtime always write what it gets. It does not compare the records themselves.

I have updated the writer to fail if the number of records in a single write is greater than a threshold. I think that it will be less confusing this way. What do you think?

CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP,
state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response1"));

// Verify that the write is not completed yet.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the write is done (as we update lastWrittenOffset), but isDone signified that the write has been committed. Is this correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is correct. The future is only completed when the write is committed. Let me use committed in the comment.

Copy link
Member

@divijvaidya divijvaidya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't looked at the business logic yet, just left some comment to understand the expectations of this code better.

Comment on lines +300 to +301
deferredEventQueue.completeUpTo(offset);
snapshotRegistry.deleteSnapshotsUpTo(offset);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be done async by background thread pool?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... All the events are already processed in a thread pool. We could defer those in another one but I don't really see the need for it for two reasons: 1) that would require to introduce locking because those are not thread safe; and 2) they should be quite fast operations.

* @throws CoordinatorLoadInProgressException
*/
private CoordinatorContext activeContextOrThrow(TopicPartition tp) {
CoordinatorContext context = coordinators.get(tp);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not aware of the threading model here but since we are using a concurrent hash map to store coordinators, I am assuming that this function can be accessed by multiple threads.

In multi thread access is true for coordinator when this function is executing, then there is a bug here because there might be a context switch after we have retrieved the value using get and by the time we execute the if/else below, the value of this context might have changed. We should use atomic primitives with ConcurrentHashMaps such as computeIfAbsent

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. coordinators is accessed by multiple threads. However, the runtime guarantees that the context for a given TopicPartition is never accessed concurrently - all the events of a TopicPartition are processed sequentially. This is why I don't have a lock for the context.

Regarding your suggestion, I am not sure how you could use an atomic primitive of ConcurrentHashMaps to implement this logic. Could you elaborate a bit more on this?

log.info("Closing coordinator runtime.");
// This close the processor, drain all the pending events and
// reject any new events.
processor.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CloseQuietly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure. I definitely use CloseQuietly when closing the runtime component but it seems better to raise the exception further if closing the processor fails here. That is because the rest of the closing procedure does not make sense if the closing the processor has failed.

// reject any new events.
processor.close();
// Unload all the coordinators.
coordinators.forEach((tp, context) -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note that this is not a thread safe operation since concurrentHashMap doesn't take an exclusive lock during forEach. Is that ok?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be OK. Closing the processor guarantees that coordinators is not accessed anymore. Your comment made me think that I should add an atomic boolean to make the close method idempotent.

@Override
public void complete(Throwable exception) {
if (exception != null) {
log.error("Execution of {} failed due to {}.", name, exception);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can get rid of the second {} because last parameter can be added as a exception without having to parameterize it. see: https://www.slf4j.org/faq.html#paramException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know. I actually prefer to have the exception's message included in the message logged. I mean on the same line because it is easier when you search the log. However, I could add a third argument to print the stack trace. Let me see if that makes sense here.

@dajac dajac merged commit 7556ce3 into apache:trunk Jun 13, 2023
@dajac dajac deleted the KAFKA-14462-17 branch June 13, 2023 07:46
dajac added a commit that referenced this pull request Jun 22, 2023
This patch introduces the GroupCoordinatorService. This is the new (incomplete) implementation of the group coordinator based on the coordinator runtime introduced in #13795.

Reviewers: Divij Vaidya <[email protected]>, Justine Olshan <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants