-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(pubsub/pstest): fix panic on undelivered message #7377
Conversation
pubsub/pstest/fake.go
Outdated
@@ -1143,7 +1133,9 @@ func (s *subscription) maintainMessages(now time.Time) { | |||
pubTime := m.proto.Message.PublishTime.AsTime() | |||
// Remove messages that have been undelivered for a long time. | |||
if !m.outstanding() && now.Sub(pubTime) > retentionDuration { | |||
s.publishToDeadLetter(m) | |||
if s.proto.DeadLetterPolicy != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think instead of putting this here, we should place this check inside publishToDeadLetter
since it's used in other places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually on second thought, the other places we call publishToDeadLetter
is guarded by a deadLetterCandidate
call. I'm also not entirely sure if placing messages that have expired passed retention duration into the dead letter queue is the behavior the live service uses. Let me double check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah after some follow up, this behavior is not consistent with the live service and is a bug.
My recommendation is that we should remove this call. This will be a breaking change but a minor one. Since this package is experimental it should be ok as a one off change. If you aren't ok with doing this, you can just keep this PR as parts that refactor the time changes (thanks for doing that btw) and I'll create a separate PR to remove this call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that was my rationale for not originally putting the check in deadLetterCandidate
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK I amended my commit to just not deliver to dead letter on expiry.
It is counterintuitive to plumb through a time.Now replacement function only to have it refer to a global variable. Instead, make the Server timeNowFunc an atomic.Value field and add a now method accessor that loads the function value atomically and invokes it. Use that in tests, and also in the exported SetTimeNowFunc variable.
If a running pstest Server has a subscription without a dead letter topic configured, and there's a pending message in the subscription that is not received after the retention duration (10 minutes), the deliver loop for the subscription will trigger a panic. This commit prevents the panic by not trying to deliver expired messages to the dead letter topic, since that's what the live service does either.
🤖 I have created a release *beep* *boop* --- ## [1.29.0](https://togithub.com/googleapis/google-cloud-go/compare/pubsub/v1.28.0...pubsub/v1.29.0) (2023-03-13) ### Features * **pubsub:** Add google.api.method.signature to update methods ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Add REST client ([06a54a1](https://togithub.com/googleapis/google-cloud-go/commit/06a54a16a5866cce966547c51e203b9e09a25bc0)) * **pubsub:** Add schema evolution methods and fields ([ee41485](https://togithub.com/googleapis/google-cloud-go/commit/ee41485860bcbbd09ce4e28ee6ddca81a5f17211)) * **pubsub:** Add support for schema revisions ([#7295](https://togithub.com/googleapis/google-cloud-go/issues/7295)) ([369b16f](https://togithub.com/googleapis/google-cloud-go/commit/369b16f9525f9ac9a0811c66ce61eda9f6c566e4)) * **pubsub:** Add temporary_failed_ack_ids to ModifyAckDeadlineConfirmation ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Make INTERNAL a retryable error for Pull ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) ### Bug Fixes * **pubsub/pstest:** Fix panic on undelivered message ([#7377](https://togithub.com/googleapis/google-cloud-go/issues/7377)) ([98dd29d](https://togithub.com/googleapis/google-cloud-go/commit/98dd29d372073605145f78f08205a9786c698881)) * **pubsub:** Allow updating topic schema fields individually ([#7362](https://togithub.com/googleapis/google-cloud-go/issues/7362)) ([f09e059](https://togithub.com/googleapis/google-cloud-go/commit/f09e059e3203de5294648d7434d5e65626a6dff5)) * **pubsub:** Dont compare revision fields in schema config test ([#7317](https://togithub.com/googleapis/google-cloud-go/issues/7317)) ([e364f7a](https://togithub.com/googleapis/google-cloud-go/commit/e364f7abfe3ec8fc20db78abcdaeaaf27d19269c)) * **pubsub:** Fix bug with AckWithResult with exactly once disabled ([#7319](https://togithub.com/googleapis/google-cloud-go/issues/7319)) ([c88fbdf](https://togithub.com/googleapis/google-cloud-go/commit/c88fbdf299205e8118b347430cf66540ffa68b27)) * **pubsub:** Pipe revision ID in name in DeleteSchemaRevision ([#7519](https://togithub.com/googleapis/google-cloud-go/issues/7519)) ([e211635](https://togithub.com/googleapis/google-cloud-go/commit/e211635216e553a9a6b00f9e8f2c5d2082ff68a8)) ### Documentation * **pubsub:** Add x-ref for ordering messages docs: Clarify subscription expiration policy ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Clarify BigQueryConfig PERMISSION_DENIED state ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Clarify subscription description ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Mark revision_id in CommitSchemaRevisionRequest deprecated ([2fef56f](https://togithub.com/googleapis/google-cloud-go/commit/2fef56f75a63dc4ff6e0eea56c7b26d4831c8e27)) * **pubsub:** Replacing HTML code with Markdown docs: Fix PullResponse description docs: Fix Pull description ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) * **pubsub:** Update Pub/Sub topic retention limit from 7 days to 31 days ([aeb6fec](https://togithub.com/googleapis/google-cloud-go/commit/aeb6fecc7fd3f088ff461a0c068ceb9a7ae7b2a3)) --- This PR was generated with [Release Please](https://togithub.com/googleapis/release-please). See [documentation](https://togithub.com/googleapis/release-please#release-please).
If a running pstest Server has a subscription without a dead letter topic configured, and there's a pending message in the subscription that is not received after the retention duration (10 minutes), the deliver loop for the subscription will trigger a panic.
This change prevents the panic by checking not trying to deliver to the dead letter topic unless one is configured.
Also includes a refactor to the time handling to make testing with time less awkward.