-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Streaming Telemetry 1] Prepares TopicPartitionChannel for telemetry #696
Conversation
@@ -284,12 +291,6 @@ public TopicPartitionChannel( | |||
+ " correct offset instead", | |||
this.getChannelName()); | |||
} | |||
|
|||
if (enableCustomJMXMonitoring) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved into snowflakeTelemetryChannelStatus constructor
@@ -1095,10 +1127,9 @@ protected SnowflakeTelemetryService getTelemetryServiceV2() { | |||
return this.telemetryServiceV2; | |||
} | |||
|
|||
protected void setLatestConsumerOffset(long consumerOffset) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved up to the other offset setters
* @param metricsJmxReporter wrapper class for registering all metrics related to above connector | ||
* and channel | ||
*/ | ||
public void registerChannelJMXMetrics( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved into SnowflakeTelemetryChannelStatus
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class SnowflakeTelemetryChannelStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will extend SnowflakeTelemetryBasicInfo and become a proper telemetry object in the next PR
.../com/snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryServiceV2.java
Outdated
Show resolved
Hide resolved
@@ -817,27 +835,52 @@ public void testBigAvroBufferBytesThreshold() throws Exception { | |||
} | |||
|
|||
@Test | |||
public void testRegisterJmxMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved into SnowflakeTelemetryChannelStatusTest
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Outdated
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Outdated
Show resolved
Hide resolved
src/test/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannelTest.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm, couple of questions and one request to add a new test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments, PTAL, thanks!
src/main/java/com/snowflake/kafka/connector/internal/streaming/SnowflakeSinkServiceV2.java
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
src/main/java/com/snowflake/kafka/connector/internal/streaming/TopicPartitionChannel.java
Outdated
Show resolved
Hide resolved
// offsets | ||
currentMetricRegistry.register( | ||
constructMetricName( | ||
this.channelName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with JMX, but can a channel name be uniquely identify a metric? What if we have two channels with the same name but different client sequencer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Channel names are meant to be unique identifiers for that particular topic and partition.
From what I understand, client sequencers are bumped up when a channel is reopened, which means the registry should replace the previous channel name's metrics. This should be ok behavior, wdyt?
Another potential issue is two connectors listening to the same topic, however this is not an issue because the MetricRegistry is created off the unique connector name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about during rebalance? Any concern about emit metrics from two nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There shouldn't be rebalance issues because JmxReporters are created at the serviceSink level. When registering new metrics for a TopicPartitionChannel during rebalance, we remove all existing metrics and then register new ones.
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Show resolved
Hide resolved
.../snowflake/kafka/connector/internal/streaming/telemetry/SnowflakeTelemetryChannelStatus.java
Outdated
Show resolved
Hide resolved
mockTelemetryService = Mockito.mock(SnowflakeTelemetryService.class); | ||
Mockito.when(mockStreamingClient.isClosed()).thenReturn(false); | ||
Mockito.when(mockStreamingClient.openChannel(ArgumentMatchers.any(OpenChannelRequest.class))) | ||
.thenReturn(mockStreamingChannel); | ||
Mockito.when(mockStreamingChannel.getFullyQualifiedName()).thenReturn(TEST_CHANNEL_NAME); | ||
this.topicPartition = new TopicPartition(TOPIC, PARTITION); | ||
this.sfConnectorConfig = TestUtils.getConfig(); | ||
this.streamingBufferThreshold = new StreamingBufferThreshold(10, 10_000, 1); | ||
this.streamingBufferThreshold = new StreamingBufferThreshold(1, 10_000, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reduced the time flush to 1 second to replicate our recommended settings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, ship it!
import com.snowflake.kafka.connector.internal.metrics.MetricsUtil; | ||
import java.util.concurrent.atomic.AtomicLong; | ||
|
||
public class SnowflakeTelemetryChannelStatus { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment about what this class is doing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will be added in the next PR
// offsets | ||
currentMetricRegistry.register( | ||
constructMetricName( | ||
this.channelName, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about during rebalance? Any concern about emit metrics from two nodes?
Changes coming in part 2 PR