Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread Synchronization with concludeGroup and resetting of an AggregateGroup #938

Closed
Tracked by #829
graytaylor0 opened this issue Jan 24, 2022 · 4 comments · Fixed by #969
Closed
Tracked by #829

Thread Synchronization with concludeGroup and resetting of an AggregateGroup #938

graytaylor0 opened this issue Jan 24, 2022 · 4 comments · Fixed by #969
Assignees

Comments

@graytaylor0
Copy link
Member

graytaylor0 commented Jan 24, 2022

The RFC for Stateful Aggregation outlines two functions for an AggregateAction to perform, concludeGroup and handleEvent. When doExecute is run by a thread in the AggregateProcessor, it will get all of the groups that should be concluded from the AggregateGroupManager, and it will try to conclude all of them before moving on to handle the batch of Events.

The Aggregate Processor will need to support multiple worker threads, and a single instance of AggregateProcessor will contain state (groupState) that is shared between worker threads. The threading synchronization has the following requirements:

  • No thread can modify groupState at the same time
  • If a thread is waiting to concludeGroup, no threads should be allowed to start handling events. This is achieved in the pseudo-code below using a Turnstile synchronization pattern where the concludeGroupLock is locked and immediately unlocked before trying to handle an Event.
  • concludeGroup should wait for in process events to be handled before concluding
  • Each group must only be concluded once, and other threads should not wait if a group is already being concluded.
  • allows for multiple Aggregate Processors in the same pipeline that are sharing worker threads

In order to meet all of these requirements, each AggregateGroupwill contain two Locks, which will be called concludeGroupLock and handleEventForGroupLock. The following pseudocode shows the synchronization between handling events and concluding groups.

concludeGroup() {
 if (concludeGroupLock.tryLock()) {
  try {
    handleEventForGroupLock.lock();
    // critical section where concluding a group is completed
  } finally {
    handleEventForGroupLock.unlock();
    concludeGroupLock.unlock();
  } 
 }
}
handleEvent() {
  concludeGroupLock.lock();
  concludeGroupLock.unlock();

  try {
       handleEventForGroupLock.lock();
       // critical section where groupState is modified
  }  finally { 
        handleEventForGroup.unlock();
  }
}

Additional Information

The AggregateProcessor will not use the @SingleThread annotation (which makes a single instance per worker thread) , and will refrain from using static variables so as not to share AggregateGroups between instances. Because of this, multiple AggregateProcessors can be utilized in one pipeline, even if for some reason the identificationKeys are the same for both aggregate instances.

processor:
     - aggregate:
         ...
     - aggregate:
         ... 
            

Alternative Ideas for Threading

Locking on a single AggregateGroup should not result in performance issues, as the number of Events being processed at one time will likely vary widely in which AggregateGroup they belong to, and the concluding of a group will only happen once every window duration. However, if the performance does prove to be an issue, some alternatives could be considered to improve it.

A no lock AggregateProcessor would involve assigning each worker thread to a different section of the shared state. Before handling an event or concluding a group, the current thread would have to lookup which thread is assigned the AggregateGroup that it has, and would have to forward that AggregateGroup to the assigned thread. This would take some extensive design, and while it is possible, may not even improve performance. It may not improve performance because there is overhead with forwarding the AggregateGroup, and the assigned thread the AggregateGroup is sent to may not be ready to handle the AggregateGroup for a while (think of the scenario where it is in the processor following the AggregateProcessor and it has to go all the way back to the beginning of the pipeline, and come back to the AggregateProcessor before being able to process on the AggregateGroup

@graytaylor0 graytaylor0 self-assigned this Jan 24, 2022
@graytaylor0 graytaylor0 changed the title Implement the concludeGroup logic and resetting of groupState/window Thread Synchronization with concludeGroup and resetting of an AggregateGroup Jan 24, 2022
@dapowers87
Copy link
Contributor

Can you explain why you lock then immediately unlock on the second code snippet please?

@graytaylor0
Copy link
Member Author

Can you explain why you lock then immediately unlock on the second code snippet please?

Sure! This is to keep threads from handling new Events after a thread says that it wants to conclude a group. After the concludeGroupLock.tryLock() is called, threads will block on the concludeGroupLock.lock() call.

@cmanning09
Copy link
Contributor

cmanning09 commented Jan 26, 2022

@graytaylor0,

I am missing a few things from this design. Can you highlight the following in your proposal:

  • what is each lock used for?
  • what type of lock will these be?
  • how will we prevent thread locks?
  • do we have to worry about race conditions or any other potential issues around locking?

@graytaylor0
Copy link
Member Author

graytaylor0 commented Jan 26, 2022

@cmanning09

These locks are meant to be mutex-like, and the Java ReentrantLock will be used with the fair option enabled (which means that the longest waiting thread will get the lock when it becomes available).

For your other questions, I think they are all answered by the pseudo-code supplied. The pseudo-code shows how each lock is going to be used. I only put a comment for what is actually happening in their critical sections, but I feel like this portrays what is happening there. The code shown is also what I believe is going to keep deadlocks and race conditions from occurring.

There are a lot of execution orderings that can occur with synchronized code like this, and if any scenario results in the requirements stated not being met, then that should be called out.

Edit: After further reading on the fairness option for Locks, I think it makes the most sense to not use it at the moment. It has the potential to lower throughput, and starvation is unlikely with the current design, as AggregateGroups will vary widely. If a need for a better attempt at having Events be handled in the order they arrive and try to grab the Lock, then this could become a configurable option in the AggregateProcessor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
3 participants