-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add health check endpoint #3501
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @vcrfxia , LGTM overall with some questions.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/HealthcheckResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HealthcheckResponseDetail.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgentTest.java
Outdated
Show resolved
Hide resolved
public class HealthcheckAgent { | ||
|
||
private static final List<Check> DEFAULT_CHECKS = ImmutableList.of( | ||
new Check("metastore", "list streams; list tables; list queries;"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to make all these public static String variables from the HealthCheckAgent class. That way the constants can also be used in the test for this class and there won't be hardcoded strings split between this class and test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd to me to have these statements in non-private variables, since no other class (besides the test class) needs to access them. If the issue is the hard-coded strings, I can pull them out into private (static) variables, but they'd be duplicated in the test anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking if we were going to make a change to which statements to execute for the health check, having it not be hardcoded in the two files would make things easier to change. It's only a minor optimization though so if you don't think it's worth it then I'm fine with how it is now.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HealthcheckResponse.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! Left a couple bits of feedback inline.
|
||
public class HealthcheckAgent { | ||
|
||
private static final List<Check> DEFAULT_CHECKS = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be a bit more abstract. You just need an interface like:
interface Check {
String name();
HealthcheckResponseDetail check();
}
The current Check class can just be an implementation of this (e.g. ExecuteStatementCheck
). This way we can support more diverse extensions down the road.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though I updated the signature of the check()
method to take a KSQL client and server endpoint, to avoid needing to pass those into each of the individual checks.
} | ||
|
||
private HealthcheckResponse getResponse() { | ||
if (responseCache.isEmpty() || timeSinceLastResponse().compareTo(healthcheckInterval) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be cleaner to move the expiry into the cache, e.g.:
final Optional<HealthCheckResponse> response = responseCache.get();
if (response.isPresent()) {
return response.get();
}
final HealthcheckResponse fresh = healtcheckAgent.checkHealth();
responseCache.cache(fresh);
return fresh;
and in the Cache, get() can return empty if nothing is cached or if its expired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though I wonder if it's clear from just reading
final Optional<HealthCheckResponse> response = responseCache.get();
if (response.isPresent()) {
return response.get();
}
that the cache handles expiration (rather than returning any previously cached response). Maybe I should update the cache name to be ExpiringResponseCache
or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe if we change the get() method to getIfPresent() it'll be clearer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fact that the return type is an Optional
already indicates that "if present" part. It was the fact that the cache expires that I didn't think was captured. I don't want to block the PR on this though, can always open a follow-up to change the name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getIfUnexpired() then to indicate expiration behavior? I do agree this is a minor point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai pointed out that using a real KsqlClient
as I have in my first implementation means healthcheck requests will fail unless relevant authentication credentials are provided for the client. As such, I’ve updated the HealthcheckAgent to use an internal KsqlClient (to send requests to KsqlResource directly).
private KsqlClientUtil() { | ||
} | ||
|
||
public static <T> RestResponse<T> toRestResponse( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The contents of this new util file were lifted from KsqlTarget
, so the methods may be accessible to the new ServerInternalKsqlClient
as well.
return response.isSuccessful(); | ||
} | ||
|
||
private static URI getServerAddress(final KsqlRestConfig restConfig) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't actually needed anymore since the ServerInternalKsqlClient
used by the HealthcheckAgent
ignores the serverEndpoint
parameter in makeKsqlRequest()
but I've left this code here in case we'd like to switch back to using a real KSQL client in the future. If preferable I can simply delete it for now instead.
...-rest-app/src/main/java/io/confluent/ksql/rest/server/services/ServerInternalKsqlClient.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reviews, @stevenpyzhang and @rodesai ! Applied your feedback, and also added a short section into the docs.
|
||
public class HealthcheckAgent { | ||
|
||
private static final List<Check> DEFAULT_CHECKS = ImmutableList.of( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though I updated the signature of the check()
method to take a KSQL client and server endpoint, to avoid needing to pass those into each of the individual checks.
public class HealthcheckAgent { | ||
|
||
private static final List<Check> DEFAULT_CHECKS = ImmutableList.of( | ||
new Check("metastore", "list streams; list tables; list queries;"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels odd to me to have these statements in non-private variables, since no other class (besides the test class) needs to access them. If the issue is the hard-coded strings, I can pull them out into private (static) variables, but they'd be duplicated in the test anyway.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgent.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private HealthcheckResponse getResponse() { | ||
if (responseCache.isEmpty() || timeSinceLastResponse().compareTo(healthcheckInterval) > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though I wonder if it's clear from just reading
final Optional<HealthCheckResponse> response = responseCache.get();
if (response.isPresent()) {
return response.get();
}
that the cache handles expiration (rather than returning any previously cached response). Maybe I should update the cache name to be ExpiringResponseCache
or something similar?
ksql-rest-app/src/main/java/io/confluent/ksql/rest/healthcheck/HealthcheckAgent.java
Outdated
Show resolved
Hide resolved
docs/developer-guide/api.rst
Outdated
|
||
Your output should resemble: | ||
|
||
.. codewithvars:: bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.. codewithvars:: bash | |
.. codewithvars:: json |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, with one tiny suggestion!
} | ||
|
||
/* Caches a HealthCheckResponse for the specified duration */ | ||
private static class ResponseCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can make a great and simple cache (in a few lines) by using the Guava cache library.
See https://www.baeldung.com/guava-cache (search for evict records based on their total live time
)
You could use a boolean key on the cache where to store the healthcheck response.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though the downside is I can no longer use a custom time supplier in the unit tests. One of the updated tests currently sleeps for 11 ms, but hopefully that's OK?
(Also, thanks to @agavra for pointing out that switching to Guava's implementation means we get concurrency handling for free!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great!. Yes, guava will help you with concurrency as well. Regarding the sleep time, can the test use a 0ms interval health check, then sleep for 1ms in the test so you check the past health check was evicted?
Btw, I think you can even reduce more code by using the CacheLoader
and not the ResponseCache
. Here's a code snippet I just imagined how this would look like:
private static final boolean HEALTHCHECK = true;
private static final CacheLoader<Boolean, HealthCheckResponse> HEALTHCHECK_LOADER
= new CacheLoader<Boolean, HealthCheckResponse>() {
@Override
public HealthCheckResponse load(Boolean a) throws Exception {
return healthCheckAgent.checkHealth();
}
};
private final LoadingCache<Boolean, HealthCheckResponse> cache;
HealthCheckResource(final Duration healthCheckInterval) {
cache = CacheBuilder.newBuilder()
.expireAfterWrite(healthCheckInterval.toMillis(), TimeUnit.MILLISECONDS)
.build(HEALTHCHECK_LOADER);
}
@GET
public Response checkHealth() {
return Response.ok(getResponse()).build();
}
private HealthCheckResponse getResponse() {
// This will call the healthCheckAgent.checkHealth() when the interval time has passed
return cache.getUnchecked(HEALTHCHECK);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the sleep time, can the test use a 0ms interval health check, then sleep for 1ms in the test so you check the past health check was evicted?
See discussion here: #3501 (comment)
Btw, I think you can even reduce more code by using the
CacheLoader
and not theResponseCache
.
Sure, I can update to use a LoadingCache
instead. My reason for wrapping the Guava cache in an internal ResponseCache
class was to ensure that only a single key is ever used with the cache, but I can move this enforcement into the CacheLoader
so I'll do that instead.
final boolean allHealthy = results.values().stream() | ||
.map(HealthCheckResponseDetail::getIsHealthy) | ||
.reduce(Boolean::logicalAnd) | ||
.orElse(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reduce this to:
results.values.stream()
.allMatch(HealthCheckResponseDetail::getIsHealthy);
|
||
@Override | ||
public RestResponse<KsqlEntityList> makeKsqlRequest( | ||
final URI serverEndpoint, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This serverEndpoint is unused, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup: #3501 (comment)
final URI serverEndpoint | ||
) { | ||
final RestResponse<KsqlEntityList> response = | ||
ksqlClient.makeKsqlRequest(serverEndpoint, ksqlStatement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that the 3 lines from SimpleKsqlClient
can be moved here instead, doesn't it?
The serverEndPoint
is unused there, and the sql is already in the ksqlStatement
. How could I use the SimpleKsqlClient
in other code besides the health check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other SimpleKsqlClient
implementations exist, including https://github.com/confluentinc/ksql/blob/c0bfa41a109580e76c76cf4fa23b603646ac40f2/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java and https://github.com/confluentinc/ksql/blob/c0bfa41a109580e76c76cf4fa23b603646ac40f2/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java, and is used by the StaticQueryExecutor
:
ksql/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
Line 666 in c0bfa41
final RestResponse<KsqlEntityList> response = serviceContext |
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HealthCheckResponse.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review, @spena ! Applied your suggestions.
|
||
@Override | ||
public RestResponse<KsqlEntityList> makeKsqlRequest( | ||
final URI serverEndpoint, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup: #3501 (comment)
} | ||
|
||
/* Caches a HealthCheckResponse for the specified duration */ | ||
private static class ResponseCache { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, though the downside is I can no longer use a custom time supplier in the unit tests. One of the updated tests currently sleeps for 11 ms, but hopefully that's OK?
(Also, thanks to @agavra for pointing out that switching to Guava's implementation means we get concurrency handling for free!)
final URI serverEndpoint | ||
) { | ||
final RestResponse<KsqlEntityList> response = | ||
ksqlClient.makeKsqlRequest(serverEndpoint, ksqlStatement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other SimpleKsqlClient
implementations exist, including https://github.com/confluentinc/ksql/blob/c0bfa41a109580e76c76cf4fa23b603646ac40f2/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/services/DefaultKsqlClient.java and https://github.com/confluentinc/ksql/blob/c0bfa41a109580e76c76cf4fa23b603646ac40f2/ksql-execution/src/main/java/io/confluent/ksql/services/DisabledKsqlClient.java, and is used by the StaticQueryExecutor
:
ksql/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/StaticQueryExecutor.java
Line 666 in c0bfa41
final RestResponse<KsqlEntityList> response = serviceContext |
ksql-rest-model/src/main/java/io/confluent/ksql/rest/entity/HealthCheckResponse.java
Show resolved
Hide resolved
healthCheckResource.checkHealth(); | ||
Thread.sleep(11); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
1 ms extra wait is too tight? could potentially be flaky? is there a better way to check this by waiting until a condition is met? e.g https://github.com/awaitility/awaitility
In this case, we would keep fetching the old value out of the cache in a loop and exit out when the underlying method to recheck health has been called once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tip! Updated the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Description
Adds a new HTTP endpoint (
/healthcheck
) to return high-level information about server health. The endpoint currently performs two checks, a metastore check (list streams; list tables; list queries;
) and a Kafka connectivity check (list topics extended;
). The results are structured as follows:Because the endpoint is unauthenticated, requests from the healthcheck endpoint to the KSQL server are rate-limited according to the new config
ksql.healthcheck.interval.ms
: healthcheck results are cached and only re-issued if more thanksql.healthcheck.interval.ms
have passed since the healthcheck last issued requests to the KSQL server.This PR looks big but is actually quite simple:
HealthcheckResource
specifies the new endpoint.HealthcheckAgent
specifies the checks that are run.HealthcheckResponse
andHealthcheckResponseDetail
specify the structure of the response.ServerInternalKsqlClient
is a KSQL client that sends requests directly toKsqlResource
, rather than going through the usual network and authentication paths. An earlier iteration of this PR used the regular KSQL client in theServiceContext
instead, but this approach fails without a way to pass required authentication credentials to the client.KsqlTarget
to a new util fileKsqlClientUtil
so they are accessible toServerInternalKsqlClient
.Testing done
Unit tests, integration test, and manual.
Reviewer checklist