-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Adds lag reporting and API for use in lag aware routing as described in KLIP 12 #4392
feat: Adds lag reporting and API for use in lag aware routing as described in KLIP 12 #4392
Conversation
@confluentinc It looks like @AlanConfluent just signed our Contributor License Agreement. 👍 Always at your service, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - most of my comments are code-level
ksql-engine/src/main/java/io/confluent/ksql/util/QueryMetadata.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/HeartbeatAgent.java
Outdated
Show resolved
Hide resolved
if (host.equals(localHostString)) { | ||
continue; | ||
} | ||
final TreeMap<Long, HeartbeatInfo> heartbeats = receivedHeartbeats.get(host); | ||
//For previously discovered hosts, if they have not received any heartbeats, mark them dead | ||
if (heartbeats == null || heartbeats.isEmpty()) { | ||
hostsStatus.get(host).setHostAlive(false); | ||
hostsStatus.put(host, new HostStatusEntity(status.getHostInfoEntity(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above (and one more time below), but to make life easier we could just use computeIfPresent
to make it clear that the key isn't changing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to computeIfPresent.
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LagReportingAgent.java
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LagReportingAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LagReportingAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LagReportingAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/LagReportingAgent.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/LagReportingResource.java
Outdated
Show resolved
Hide resolved
ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/LagReportingAgentTest.java
Outdated
Show resolved
Hide resolved
d3e4f95
to
f35a7bc
Compare
Description
This creates a Service which sends state store changelog topic lags retrieved from KafkaStreams to all of the nodes in the cluster. It also has an API which can be used by pull request logic for making routing decisions.
Testing done
Added unit tests for
LagReportingAgent
which tests APIs. Also added a functional testLagReportingAgentFunctionalTest
which actually sets up a two node cluster, sends lags, and verifies those reported with those read directly from Kafka.Will also do additional manual testing according to this document: #4360
Reviewer checklist