Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub Subscriber health check #613

Merged
merged 71 commits into from
Nov 8, 2021

Conversation

gkatzioura
Copy link
Contributor

@gkatzioura gkatzioura commented Sep 11, 2021

The purspose of this PR is to add health check to PubSub subscribers.

This is not in conflict with the health check of the pubsub publisher, and can be used in parallel.

The criteria for the health check of a subscriber is the message backlog and the last processed message.

If the message has been recently processed in a reasonable time threshold, then the subscriber is healthy.
If the backlog of messages is big but the subscriber consumes messages then this is a scalling issue.

If there hasn't been any processing of recent messages but the backlog increases, then the subscriber is unhealthy

Since there can be multiple subscriptions, the subscriptions shall be enabled per subscriber creation.

The method that processes the messages shall be wrapped in order for a method tracker to receive the recently processed message.

Eventually there would be multiple health trackers per subscription

Since PubSubInboundChannelAdapter precedes DefaultSubscriberFactory it will register the tracker first and DefaultSubscriberFactory will avoid adding any health checks.

If DefaultSubscriberFactory is used directly is will add a health check if enabled.

Added a separate PubSubExecutorConfiguration since the health check for subscribers should be autoconfigured before the actual PubSub configuration.

@sonarcloud
Copy link

sonarcloud bot commented Sep 15, 2021

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 21 Code Smells

80.9% 80.9% Coverage
0.0% 0.0% Duplication

@elefeint
Copy link
Contributor

@gkatzioura Apologies for the delay. You've put a lot of thought and work into the pull request, and we'd like to treat it as carefully.

We currently have some work in-flight restructuring the Pub/Sub properties to enable per-subscription setting customization (example: #562) that will both conflict, and possibly affect direction of your PR. It would be better to rebase and review it in a couple of weeks, once that work is completed.

One question I had about the addition of monitoring metrics -- how will subscription/num_undelivered_messages differ from the built-in Pub/Sub subscription/num_outstanding_messages metric?

@elefeint
Copy link
Contributor

Another directional question -- GKE forces pod restart when it detects unhealthy indicators. But restarting pods when there is already a scaling issue is counterproductive, as the load won't get any better if the pods are thrashing (we've had issues with the existing global indicator under load -- spring-attic/spring-cloud-gcp#2628). So I am wary of indicating down when messagesOverThreshold is detected.

@gkatzioura
Copy link
Contributor Author

gkatzioura commented Sep 23, 2021

Hi @elefeint
I understand the transitional phase so will rebase from time to time as you progress.

Will answer your questions.

One question I had about the addition of monitoring metrics -- how will subscription/num_undelivered_messages differ from the built-in Pub/Sub subscription/num_outstanding_messages metric?

num_outstanding_messages messages in flight that have been sent to the subscribers
num_undelivered_messages messages in the backlog not sent yet or probably were not able to get processed

Will explain marking unhealthy based on the backlog. In a scenario of multiple pods/workers it is expected to have lot's of outstanding messages, also if subscribers are closed or stopped pulling messages the num_outstanding_messages will be a small number because they cannot pull, thus they would seem healthy while they are not.
By using num_undelivered_messages the criteria is the backlog in combination of not processing any messages.

GKE forces pod restart when it detects unhealthy indicators. But restarting pods when there is already a scaling issue is counterproductive, as the load won't get any better if the pods are thrashing (we've had issues with the existing global indicator under load -- spring-cloud/spring-cloud-gcp#2628). So I am wary of indicating down when messagesOverThreshold is detected.

If a subscriber consumes messages, regardless of how big the backlog gets (num_undelivered_messages) the health indicator will indicate healthy. A user will probably autoscale based on the backlog, or will optimise the code to increase the throughput. But as long as messages are processed and acknowledged the subscriber serves its purpose thus is healthy.

If the subscriber is not healthy messages cannot be processed or acknowledged. An error or exception probably occurs while processing. On this scenario because the indicator of recently completed processing is not there, we need to evaluate if there are no messages on the backlog or if messages do exist and wait to be processed (thus our subscriber is unhealthy).

Based on this logic the following scenarios happen

  • If the subscriber has processed messages recently, then by default is healthy, regardless of the backlog size
  • If the subscriber has not processed any messages recently and the backlogs is small/empty then the subscriber is healthy since there are no messages to pull
  • If the subscriber has not processed any messages and the backlog is big then the subscriber is unhealthy, because there are indeed messages to pull but something probably fails

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your patience! We can start working on this PR -- it's very useful to have this per-subscription healthcheck functionality.

First, could you revert the unrelated whitespace changes? This project still uses Spring's checkstyle for now -- https://github.com/spring-cloud/spring-cloud-gcp/blob/main/CONTRIBUTING.adoc#code-formatting-guidelines

@mpeddada1
Copy link
Contributor

Hey @gkatzioura, thank you for waiting and your contribution! PR #638 is merged so you can rebase this PR now.

@google-cla
Copy link

google-cla bot commented Oct 21, 2021

All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter.

We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only @googlebot I consent. in this pull request.

Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the cla label to yes (if enabled on your project).

ℹ️ Googlers: Go here for more info.

@gkatzioura gkatzioura force-pushed the subscriber-health-check branch 2 times, most recently from 520743a to 4616ffa Compare October 21, 2021 06:32
@google-cla
Copy link

google-cla bot commented Oct 21, 2021

All (the pull request submitter and all commit authors) CLAs are signed, but one or more commits were authored or co-authored by someone other than the pull request submitter.

We need to confirm that all authors are ok with their commits being contributed to this project. Please have them confirm that by leaving a comment that contains only @googlebot I consent. in this pull request.

Note to project maintainer: There may be cases where the author cannot leave a comment, or the comment is not properly detected as consent. In those cases, you can manually confirm consent of the commit author(s), and set the cla label to yes (if enabled on your project).

ℹ️ Googlers: Go here for more info.

@gkatzioura
Copy link
Contributor Author

Hi @elefeint & @mpeddada1 did rebase so seems ok for a review!

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another batch of comments. I apologize for the delays -- I will also be out most of next week, which will not help matters.

Comment on lines 58 to 71
@Bean
@ConditionalOnMissingBean(name = "pubsubSubscriberThreadPool")
@ConditionalOnProperty("spring.cloud.gcp.pubsub.subscriber.executorThreads")
public ThreadPoolTaskScheduler pubsubSubscriberThreadPool() {
return threadPool("gcp-pubsub-subscriber", this.gcpPubSubProperties.getSubscriber().getExecutorThreads());
}

@Bean
@ConditionalOnMissingBean(name = "subscriberExecutorProvider")
@ConditionalOnProperty("spring.cloud.gcp.pubsub.subscriber.executorThreads")
public ExecutorProvider subscriberExecutorProvider(
@Qualifier("pubsubSubscriberThreadPool") ThreadPoolTaskScheduler scheduler) {
return FixedExecutorProvider.create(scheduler.getScheduledExecutor());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice structure. However, after the rebase, we have two "default" subscribers thread pools/executors -- one defined here that is exclusively used for the health checks, and the globalScheduler that's dynamically registered. We've been trying to deprecate the global scheduler bean that is not dynamic that's being extracted here.

Since our subscriber thread pool management is so complicated now (there is a deprecated global subscriber threadpool bean, and then there is a default subscriber threadpool constructed from properties, plus separate configurations possible for each subscription), I think it would be best to have a dedicated thread pool for Pub/Sub healthchecks. Then it will never interfere with production workloads.

Could you:

  • rename the subscriber executor provider bean to make it clear that it's for health checks. It can be shared between publisher and subscriber health checks in the future; no need to differentiate here.
  • Fold the new dedicated threadpool/executor beans into PubSubSubscriptionHealthIndicatorAutoConfiguration.java; this way it will be possible to turn it off with the rest of configuration.
  • Put the publisher thread pool configuration back. Let's keep the scope of this PR to subscription logic only.
  • This class, pleasant to look at as it is, will then go away.

Comment on lines 297 to 302
private Integer lagThreshold;

private Integer backlogThreshold;

private Integer lookUpInterval = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's get those in a subclass, for logical grouping.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has moved to PubSubConfiguration.Health

Comment on lines 89 to 91
public void setProjectId(String projectId) {
this.projectId = projectId;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about keeping the current project ID inside of HealthTrackerRegistry? Then it does not need to be set separately. registerTracker() could b eoverloaded to operate on either an object or a string (which can be either short or fully qualified subscription names).

Copy link
Contributor Author

@gkatzioura gkatzioura Oct 31, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will have a default-project-id on the HealthTrackerRegistry, the one derived by default by the other modules GcpProjectIdProvider.
If the user wants to use subscriptions from multiple projects (my everyday case), then the user should provide the fully qualified subscription name.
For example projects/<project_name>/subscriptions/<subscription_name> instead <subscription_name>
If short subscription name is provided the the project-id on the registry shall be used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense.

@gkatzioura
Copy link
Contributor Author

For the direct usage of SubscriberFactory, should GcpPubSubAutoConfiguration.defaultSubscriberFactory() accept an optional HealthTrackerRegistry?

@elefeint
This is correct, probably was missed on rebase. Just added it

@Qualifier("subscriberExecutorProvider") Optional<ExecutorProvider> executorProvider,
@Qualifier("subscriberSystemExecutorProvider") ObjectProvider<ExecutorProvider> systemExecutorProvider,
@Qualifier("subscriberFlowControlSettings") ObjectProvider<FlowControlSettings> flowControlSettings,
@Qualifier("subscriberApiClock") ObjectProvider<ApiClock> apiClock,
@Qualifier("subscriberRetrySettings") ObjectProvider<RetrySettings> retrySettings,
@Qualifier("healthTrackerRegistry") ObjectProvider<HealthTrackerRegistry> healthTrackerRegistry,
@Qualifier("subscriberTransportChannelProvider") TransportChannelProvider subscriberTransportChannelProvider) {
DefaultSubscriberFactory factory = new DefaultSubscriberFactory(this.finalProjectIdProvider,
this.gcpPubSubProperties);
factory.setThreadPoolTaskSchedulerMap(this.threadPoolTaskSchedulerMap);
factory.setGlobalScheduler(this.globalScheduler);
if (executorProvider.isPresent()) {
logger.warn(
"The subscriberExecutorProvider bean is being deprecated. Please use application.properties to configure properties");
factory.setExecutorProvider(executorProvider.get());
}
factory.setCredentialsProvider(this.finalCredentialsProvider);
factory.setHeaderProvider(this.headerProvider);
factory.setChannelProvider(subscriberTransportChannelProvider);
systemExecutorProvider.ifAvailable(factory::setSystemExecutorProvider);
if (flowControlSettings.getIfAvailable() != null) {
logger.warn(
"The subscriberFlowControlSettings bean is being deprecated. Please use application.properties to configure properties");
factory.setFlowControlSettings(flowControlSettings.getIfAvailable());
}
apiClock.ifAvailable(factory::setApiClock);
if (retrySettings.getIfAvailable() != null) {
logger.warn(
"The subscriberRetrySettings bean is being deprecated. Please use application.properties to configure properties");
factory.setSubscriberStubRetrySettings(retrySettings.getIfAvailable());
}
if (this.gcpPubSubProperties.getSubscriber().getRetryableCodes() != null) {
factory.setRetryableCodes(gcpPubSubProperties.getSubscriber().getRetryableCodes());
}
healthTrackerRegistry.ifAvailable(factory::setHealthTrackerRegistry);
return factory;

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are in the final stages here -- I've tested the health check with one of our samples, and it works great. Really nice to look at code, too.

No rush on this round of comments -- I am unavailable until Thursday for review. I apologize for the multiple layers of delays here.

  • Add documentation
    • add warning that the health indicator will not behave entirely as expected if Dead Letter Queueing is enabled on the subscription being checked (num_undelivered_messages will drop down by itself after DLQ threshold is reached).
  • document the new required (lagThresholad, backlogThreshold) and optional (lookupInterval) properties. Describe units of measurement -- seconds, number of messages.
  • Javadoc all public interface methods. HealthTrackerRegistry especially needs javadoc, make sure to describe seconds being the units of measurement for lagThreshold -- we've had users confused about granularity before.
  • Replace @since 2.0.5 with @since 2.0.6
  • Could you take a look at Sonar's code smell list?
    No problem on the thread pool creation duplication; the logic is straightforward enough. In principle, `GcpPubSubAutoConfiguration.createThreadPoolTaskScheduler(Integer executorThreads, String threadName) could be turned into a reusable utility but it's probably not worth it.

@sonarcloud
Copy link

sonarcloud bot commented Nov 7, 2021

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

84.7% 84.7% Coverage
0.0% 0.0% Duplication

Copy link
Contributor

@elefeint elefeint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the very useful healthcheck, and putting up with the lengthy review process. This is now good to go.

@elefeint elefeint merged commit 287e231 into GoogleCloudPlatform:main Nov 8, 2021
@gkatzioura
Copy link
Contributor Author

Hi @elefeint ! I am grateful for your time and assistance on this!

kateryna216 added a commit to kateryna216/spring-cloud-gcp that referenced this pull request Oct 20, 2022
Add per-subscription healthcheck validating that there was a recent successfully processed message on the subscription or that the backlog represented by `num_undelivered_messages` Cloud Monitoring metric is under acceptable threshold.
prash-mi pushed a commit that referenced this pull request Jun 20, 2023
Bumps [reactor-bom](https://github.com/reactor/reactor) from 2020.0.26 to 2022.0.1.
- [Release notes](https://github.com/reactor/reactor/releases)
- [Commits](reactor/reactor@2020.0.26...2022.0.1)

---
updated-dependencies:
- dependency-name: io.projectreactor:reactor-bom
  dependency-type: direct:production
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <[email protected]>

Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants