Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIx SQS: Error during Dapr Shutdown and Message Polling Behavior Issue (AWS) #3174

Merged
merged 24 commits into from
Nov 1, 2023

Conversation

amimimor
Copy link
Contributor

Description

In this PR, several significant changes have been made (please bear with me):

  • Overhauled the Subscribe functionality, focusing on refining control constructs for categorizing messages by topic and ending short polling on SQS.
  • Relocated the intricate poller cancelation logic to a new structure, SubscriptionManager.
  • The SubscriptionManager oversees a control goroutine loop, monitoring subscribe/unsubscribe operations and context terminations.
  • Those events are fanned-in to a channel the control loop consumes from. When a subscribe/unsubscribe event emerges, a lock is employed to determine if SQS operations should cease or if only handler removal is required. Upcoming PRs will allow these subscribe actions to govern the initiation of operations at boot, contingent on the availability of infrastructure components.
  • Previously, the cancel function was derived from the initial subscription context and handed to the SQS poller. However, upon context cancel (unsubscribe) during shutdown, there was no guarantee this cancel function would be called last, thereby causing premature termination of consumption if called too early. This could have been prohibiting dynamic unsubscribes because if by chance a single subscription was cancelled and unfortunately the consumption cancel function was 'pinned' to this topic, then the entire component consumption would be terminated until a new event of subscribe would be created
  • Swapped the restrictive mutexes safeguarding the synchronous map with xsync.MapOf for lock-free thread-safe reads. Notably, this supports the Size method that is used to determine what/if actions like halting of SQS consumption should be taken.
  • Introduced a straightforward Singleton Poller mechanism.
  • Now, each Subscribe creates a listener unsubscribe goroutine improving readability and the overall sense the system makes. As simple as "a subscribe creates an unsubscribe listener watching for subscribe context cancellation and acting upon it"
  • All termination procedures are now housed in the SubscriptionManager, while the primary component logic only manages its own status (closed or open) and invokes the manager's Close function.
  • Post-refactoring, the remaining maps were transitioned to the thread-safe sync.Map.
  • Standardized error messages to lowercase and restructured the frequently used "... topic (sanitized): abc" to "... (sanitized) topic: abc".
  • Revamped getOrCreateTopic to align with linter standards, avoiding named return variables.
  • Modernized consumeSubscription error handling using Errors.Is and Errors.As, and removed redundant waits on channel status or the termination of pollerRunning within the consumption process, simplifying understanding.

Issue reference

We strive to have all PR being opened based on an issue, where the problem or feature have been discussed prior to implementation.

Please reference the issue this PR will close: #3156

Checklist

Please make sure you've completed the relevant tasks for this PR, out of the following list:

@ItalyPaleAle
Copy link
Contributor

@amimimor looks like this PR is in a "broken" state where unrelated commits have been added to the branch. I'm not sure what happened.

Could you please try fixing it before we can review it?

amimimor and others added 13 commits October 12, 2023 20:15
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
first integration of subs mgmt
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>
@amimimor amimimor force-pushed the 3156-shutdown-and-handlers-map branch from dc699b0 to ba0e5b5 Compare October 12, 2023 17:20
@amimimor
Copy link
Contributor Author

@amimimor looks like this PR is in a "broken" state where unrelated commits have been added to the branch. I'm not sure what happened.

Could you please try fixing it before we can review it?

should be now aligned. I had colab with a teammate and forgot to squash (as he is not DCOed) so I had to rebase like 17 commits back and probably made a mess. git keeps biting me 17yrs in the business

@ItalyPaleAle
Copy link
Contributor

git keeps biting me 17yrs in the business

I think most of us can relate 🤣

go.mod Outdated Show resolved Hide resolved
pubsub/aws/snssqs/topics_locker.go Show resolved Hide resolved
pubsub/aws/snssqs/snssqs.go Outdated Show resolved Hide resolved
pubsub/aws/snssqs/snssqs.go Outdated Show resolved Hide resolved
pubsub/aws/snssqs/snssqs.go Outdated Show resolved Hide resolved
pubsub/aws/snssqs/snssqs.go Outdated Show resolved Hide resolved
@@ -601,12 +606,13 @@ func (s *snsSqs) consumeSubscription(ctx context.Context, queueInfo, deadLetters
// iteration. Therefore, a global backoff (to the internal backoff) is used (sqsPullExponentialBackoff).
messageResponse, err := s.sqsClient.ReceiveMessageWithContext(ctx, receiveMessageInput)
if err != nil {
if err == context.Canceled || err == context.DeadlineExceeded {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || ctx.Err() != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) || ctx.Err() != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why? the context might be cancelled while we pull messages but still haven't started a new for loop iteration (that verifies the ctx isn't cancelled, but just at the top of the loop)

pubsub/aws/snssqs/snssqs.go Outdated Show resolved Hide resolved
pubsub/aws/snssqs/subscription_mgmt.go Outdated Show resolved Hide resolved
pubsub/aws/snssqs/subscription_mgmt.go Outdated Show resolved Hide resolved
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Signed-off-by: Amit Mor <[email protected]>

Signed-off-by: Amit Mor <[email protected]>
Copy link
Contributor

@ItalyPaleAle ItalyPaleAle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@ItalyPaleAle ItalyPaleAle changed the title 3156 shutdown and handlers map FIx SQS: Error during Dapr Shutdown and Message Polling Behavior Issue (AWS) Nov 1, 2023
@ItalyPaleAle ItalyPaleAle added this to the v1.13 milestone Nov 1, 2023
@ItalyPaleAle ItalyPaleAle merged commit 7fd5524 into dapr:master Nov 1, 2023
85 of 86 checks passed
@amimimor
Copy link
Contributor Author

amimimor commented Nov 1, 2023

thanks @ItalyPaleAle ! it wasn't an easy one and your comments had been very helpful. I'm now going to rush into adding documentation about the edge cases I came across with sns/sqs so that users could know what to expect. See you (maybe) in the coming PRs 🥳

@ItalyPaleAle
Copy link
Contributor

ItalyPaleAle commented Nov 1, 2023

thanks @ItalyPaleAle ! it wasn't an easy one and your comments had been very helpful. I'm now going to rush into adding documentation about the edge cases I came across with sns/sqs so that users could know what to expect. See you (maybe) in the coming PRs 🥳

Of course, thanks for the contribution and for your patience while we got this reviwwed :) Looking forward to see more good work from you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Error during Dapr Shutdown and Message Polling Behavior Issue (AWS)
2 participants