Skip to content

Commit

Permalink
Merge Get and Push Protocol (#4377)
Browse files Browse the repository at this point in the history
* WIP:Push telemetry is being scheduled now.

* Push

* Compliation Fix

* Working

* Use UUID in PUSH

* Remove fprintf

* Changes

* Fix CONFIGURATION.md

* Fix size

* Update s2i bounds

---------

Co-authored-by: Milind L <[email protected]>
  • Loading branch information
anchitj and milindl authored Aug 1, 2023
1 parent 56fa55a commit 9546d71
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 75 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ topic.metadata.refresh.fast.cnt | * | 0 .. 1000 | 10
topic.metadata.refresh.sparse | * | true, false | true | low | Sparse metadata requests (consumes less network bandwidth) <br>*Type: boolean*
topic.metadata.propagation.max.ms | * | 0 .. 3600000 | 30000 | low | Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce(). <br>*Type: integer*
topic.blacklist | * | | | low | Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist. <br>*Type: pattern list*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
debug | * | generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, mock, assignor, conf, telemetry, all | | medium | A comma-separated list of debug contexts to enable. Detailed Producer debugging: broker,topic,msg. Consumer: consumer,cgrp,topic,fetch <br>*Type: CSV flags*
socket.timeout.ms | * | 10 .. 300000 | 60000 | low | Default timeout for network requests. Producer: ProduceRequests will use the lesser value of `socket.timeout.ms` and remaining `message.timeout.ms` for the first message in the batch. Consumer: FetchRequests will use `fetch.wait.max.ms` + `socket.timeout.ms`. Admin: Admin requests will use `socket.timeout.ms` or explicitly set `rd_kafka_AdminOptions_set_operation_timeout()` value. <br>*Type: integer*
socket.blocking.max.ms | * | 1 .. 60000 | 1000 | low | **DEPRECATED** No longer used. <br>*Type: integer*
socket.send.buffer.bytes | * | 0 .. 100000000 | 0 | low | Broker socket send buffer size. System default is used if 0. <br>*Type: integer*
Expand Down
15 changes: 8 additions & 7 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,8 @@ void rd_kafka_broker_fail(rd_kafka_broker_t *rkb,
/* TODO(milind): check if this right. */
mtx_lock(&rkb->rkb_rk->rk_telemetry.lock);
if (rkb->rkb_rk->rk_telemetry.preferred_broker == rkb) {
rd_kafka_broker_destroy(rkb->rkb_rk->rk_telemetry.preferred_broker);
rd_kafka_broker_destroy(
rkb->rkb_rk->rk_telemetry.preferred_broker);
rkb->rkb_rk->rk_telemetry.preferred_broker = NULL;
}
mtx_unlock(&rkb->rkb_rk->rk_telemetry.lock);
Expand Down Expand Up @@ -2276,18 +2277,18 @@ void rd_kafka_broker_connect_up(rd_kafka_broker_t *rkb) {
rkb, RD_KAFKAP_GetTelemetrySubscriptions, 0, 0, &features) !=
-1) {
rd_kafka_t *rk = rkb->rkb_rk;

mtx_lock(&rkb->rkb_rk->rk_telemetry.lock);
if (!rkb->rkb_rk->rk_telemetry.preferred_broker &&
rk->rk_telemetry.state ==
RD_KAFKA_ADMIN_STATE_WAIT_BROKER) {
rk->rk_telemetry.preferred_broker =
rd_kafka_broker_keep(rkb);
rk->rk_telemetry.state == RD_KAFKA_TELEMETRY_AWAIT_BROKER) {
rd_kafka_broker_keep(rkb);
rk->rk_telemetry.preferred_broker = rkb;
rk->rk_telemetry.state =
RD_KAFKA_TELEMETRY_GET_SUBSCRIPTIONS_SCHEDULED;
rd_kafka_timer_start_oneshot(
&rk->rk_timers, &rk->rk_telemetry.request_timer,
rd_false, 0 /* immediate */, rd_kafka_telemetry_fsm,
rk);
rd_false, 1 /* immediate */,
rd_kafka_telemetry_fsm_tmr_cb, (void *)rk);
}
mtx_unlock(&rkb->rkb_rk->rk_telemetry.lock);
}
Expand Down
3 changes: 2 additions & 1 deletion src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ struct rd_kafka_property {
const char *str;
const char *unsupported; /**< Reason for value not being
* supported in this build. */
} s2i[20]; /* _RK_C_S2I and _RK_C_S2F */
} s2i[21]; /* _RK_C_S2I and _RK_C_S2F */

const char *unsupported; /**< Reason for propery not being supported
* in this build.
Expand Down Expand Up @@ -508,6 +508,7 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
{RD_KAFKA_DBG_MOCK, "mock"},
{RD_KAFKA_DBG_ASSIGNOR, "assignor"},
{RD_KAFKA_DBG_CONF, "conf"},
{RD_KAFKA_DBG_TELEMETRY, "telemetry"},
{RD_KAFKA_DBG_ALL, "all"}}},
{_RK_GLOBAL, "socket.timeout.ms", _RK_C_INT, _RK(socket_timeout_ms),
"Default timeout for network requests. "
Expand Down
25 changes: 19 additions & 6 deletions src/rdkafka_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,17 @@ typedef enum {
} rd_kafka_telemetry_state_t;


static RD_UNUSED const char *
rd_kafka_telemetry_state2str(rd_kafka_telemetry_state_t state) {
static const char *names[] = {"AwaitBroker",
"GetSubscriptionsScheduled",
"GetSubscriptionsSent",
"PushScheduled",
"PushSent",
"Terminating"};
return names[state];
}

/**
* Kafka handle, internal representation of the application's rd_kafka_t.
*/
Expand Down Expand Up @@ -638,15 +649,16 @@ struct rd_kafka_s {
/**< Lock for preferred telemetry broker and state. */
mtx_t lock;

/* Fields obtained from broker as a result of GetSubscriptions. */
/* TODO: use rd_kafka_uuid_t as in https://github.com/confluentinc/librdkafka/pull/4300/files
* when it is merged. */
char *client_instance_id;
/* Fields obtained from broker as a result of GetSubscriptions.
*/
rd_kafka_uuid_t client_instance_id;
int32_t subscription_id;
char** accepted_compression_types;
rd_kafka_compression_t *accepted_compression_types;
size_t accepted_compression_types_cnt;
int32_t push_interval_ms;
rd_bool_t delta_temporality;
char** requested_metrics;
char **requested_metrics;
size_t requested_metrics_cnt;
} rk_telemetry;

/* Test mocks */
Expand Down Expand Up @@ -890,6 +902,7 @@ const char *rd_kafka_purge_flags2str(int flags);
#define RD_KAFKA_DBG_MOCK 0x10000
#define RD_KAFKA_DBG_ASSIGNOR 0x20000
#define RD_KAFKA_DBG_CONF 0x40000
#define RD_KAFKA_DBG_TELEMETRY 0x80000
#define RD_KAFKA_DBG_ALL 0xfffff
#define RD_KAFKA_DBG_NONE 0x0

Expand Down
35 changes: 35 additions & 0 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,28 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}

rd_kafka_resp_err_t
rd_kafka_mock_telemetry_set_requested_metrics(rd_kafka_mock_cluster_t *mcluster,
char **metrics,
size_t metrics_cnt) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_MOCK);

rko->rko_u.mock.hi = metrics_cnt;
rko->rko_u.mock.metrics = NULL;
if (metrics_cnt) {
size_t i;
rko->rko_u.mock.metrics =
rd_calloc(metrics_cnt, sizeof(char *));
for (i = 0; i < metrics_cnt; i++)
rko->rko_u.mock.metrics[i] = rd_strdup(metrics[i]);
}
rko->rko_u.mock.cmd = RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET;

return rd_kafka_op_err_destroy(
rd_kafka_op_req(mcluster->ops, rko, RD_POLL_INFINITE));
}



/**
* @brief Apply command to specific broker.
Expand Down Expand Up @@ -2254,6 +2276,7 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_topic_t *mtopic;
rd_kafka_mock_partition_t *mpart;
rd_kafka_mock_broker_t *mrkb;
size_t i;

switch (rko->rko_u.mock.cmd) {
case RD_KAFKA_MOCK_CMD_TOPIC_CREATE:
Expand Down Expand Up @@ -2367,6 +2390,18 @@ rd_kafka_mock_cluster_cmd(rd_kafka_mock_cluster_t *mcluster,
.MaxVersion = (int16_t)rko->rko_u.mock.hi;
break;

case RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET:
mcluster->metrics_cnt = rko->rko_u.mock.hi;
if (!mcluster->metrics_cnt)
break;

mcluster->metrics =
rd_calloc(mcluster->metrics_cnt, sizeof(char *));
for (i = 0; i < mcluster->metrics_cnt; i++)
mcluster->metrics[i] =
rd_strdup(rko->rko_u.mock.metrics[i]);
break;

default:
rd_assert(!*"unknown mock cmd");
break;
Expand Down
18 changes: 18 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ extern "C" {
* - Low-level consumer
* - High-level balanced consumer groups with offset commits
* - Topic Metadata and auto creation
* - Telemetry (KIP-714)
*
* @remark This is an experimental public API that is NOT covered by the
* librdkafka API or ABI stability guarantees.
Expand Down Expand Up @@ -365,6 +366,23 @@ rd_kafka_mock_set_apiversion(rd_kafka_mock_cluster_t *mcluster,
int16_t MaxVersion);


/**
* @brief Set the metrics that are expected by the broker for telemetry
* collection.
*
* @param metrics List of prefixes of metric names or NULL.
* @param metrics_cnt
*
* @note if \p metrics is NULL, no metrics will be expected by the broker. If
* the first elements of \p metrics is an empty string, that indicates the
* broker expects all metrics.
*/
RD_EXPORT rd_kafka_resp_err_t
rd_kafka_mock_telemetry_set_requested_metrics(rd_kafka_mock_cluster_t *mcluster,
char **metrics,
size_t metrics_cnt);


/**@}*/

#ifdef __cplusplus
Expand Down
95 changes: 95 additions & 0 deletions src/rdkafka_mock_handlers.c
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,98 @@ rd_kafka_mock_handle_OffsetForLeaderEpoch(rd_kafka_mock_connection_t *mconn,
return -1;
}

/**
* @brief Handle GetTelemetrySubscriptions
*/
static int rd_kafka_mock_handle_GetTelemetrySubscriptions(
rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_t *rkbuf) {
const rd_bool_t log_decode_errors = rd_true;
rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster;
rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf);
rd_kafka_resp_err_t err;
int32_t TopicsCnt, i;
rd_kafka_uuid_t ClientInstanceId;
rd_kafka_uuid_t zero_uuid = RD_KAFKA_ZERO_UUID;

/* Request: ClientInstanceId */
rd_kafka_buf_read_uuid(rkbuf, &ClientInstanceId);
if (ClientInstanceId.least_significant_bits ==
zero_uuid.least_significant_bits &&
ClientInstanceId.most_significant_bits ==
zero_uuid.most_significant_bits) {
/* Some random numbers */
ClientInstanceId.least_significant_bits = 129;
ClientInstanceId.most_significant_bits = 298;
}

/* Response: ThrottleTimeMs */
rd_kafka_buf_write_i32(resp, 0);

/* Inject error */
err = rd_kafka_mock_next_request_error(mconn, resp);

/* Response: ErrorCode */
rd_kafka_buf_write_i16(resp, err);

/* Response: ClientInstanceId*/
rd_kafka_buf_write_uuid(resp, &ClientInstanceId);

/* Response: SubscriptionId */
// TODO: Calculate subscription ID.
rd_kafka_buf_write_i32(resp, 0);

/* Response: #AcceptedCompressionTypes */
rd_kafka_buf_write_arraycnt(resp, 1);

/* We send NONE here, despite the broker never actually sending NONE. */
/* Response: AcceptedCompressionTypes */
rd_kafka_buf_write_i8(resp, RD_KAFKA_COMPRESSION_NONE);

/* Response: PushIntervalMs */
rd_kafka_buf_write_i32(resp, 5000);

/* Response: DeltaTemporality */
rd_kafka_buf_write_bool(resp, rd_true);

/* Response: #RequestedMetrics */
rd_kafka_buf_write_arraycnt(resp, mcluster->metrics_cnt);

for (i = 0; i < mcluster->metrics_cnt; i++)
rd_kafka_buf_write_str(resp, mcluster->metrics[i], -1);

rd_kafka_mock_connection_send_response(mconn, resp);

return 0;

err_parse:
rd_kafka_buf_destroy(resp);
return -1;
}

/**
* @brief Handle PushTelemetry
*/
static int rd_kafka_mock_handle_PushTelemetry(rd_kafka_mock_connection_t *mconn,
rd_kafka_buf_t *rkbuf) {
const rd_bool_t log_decode_errors = rd_true;
rd_kafka_mock_cluster_t *mcluster = mconn->broker->cluster;
rd_kafka_buf_t *resp = rd_kafka_mock_buf_new_response(rkbuf);
rd_kafka_resp_err_t err;

rd_kafka_buf_write_i32(resp, 0);

/* Inject error */
err = rd_kafka_mock_next_request_error(mconn, resp);

rd_kafka_mock_connection_send_response(mconn, resp);

return 0;

err_parse:
rd_kafka_buf_destroy(resp);
return -1;
}

/**
* @brief Default request handlers
Expand Down Expand Up @@ -2136,6 +2228,9 @@ const struct rd_kafka_mock_api_handler
[RD_KAFKAP_EndTxn] = {0, 1, -1, rd_kafka_mock_handle_EndTxn},
[RD_KAFKAP_OffsetForLeaderEpoch] =
{2, 2, -1, rd_kafka_mock_handle_OffsetForLeaderEpoch},
[RD_KAFKAP_GetTelemetrySubscriptions] =
{0, 0, 0, rd_kafka_mock_handle_GetTelemetrySubscriptions},
[RD_KAFKAP_PushTelemetry] = {0, 0, 0, rd_kafka_mock_handle_PushTelemetry},
};


Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_mock_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,12 @@ struct rd_kafka_mock_cluster_s {
/**< Request handlers */
struct rd_kafka_mock_api_handler api_handlers[RD_KAFKAP__NUM];

/**< Requested metrics. */
char **metrics;

/** < Requested metric count. */
size_t metrics_cnt;

/**< Mutex for:
* .errstacks
* .apiversions
Expand Down
6 changes: 6 additions & 0 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) {
case RD_KAFKA_OP_MOCK:
RD_IF_FREE(rko->rko_u.mock.name, rd_free);
RD_IF_FREE(rko->rko_u.mock.str, rd_free);
if (rko->rko_u.mock.metrics) {
size_t i;
for (i =0; i < rko->rko_u.mock.hi; i++)
rd_free(rko->rko_u.mock.metrics[i]);
rd_free(rko->rko_u.mock.metrics);
}
break;

case RD_KAFKA_OP_BROKER_MONITOR:
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ struct rd_kafka_op_s {
RD_KAFKA_MOCK_CMD_BROKER_SET_RACK,
RD_KAFKA_MOCK_CMD_COORD_SET,
RD_KAFKA_MOCK_CMD_APIVERSION_SET,
RD_KAFKA_MOCK_CMD_REQUESTED_METRICS_SET,
} cmd;

rd_kafka_resp_err_t err; /**< Error for:
Expand Down Expand Up @@ -594,7 +595,10 @@ struct rd_kafka_op_s {
* TOPIC_CREATE (repl fact)
* PART_SET_FOLLOWER_WMARKS
* APIVERSION_SET (maxver)
* REQUESTED_METRICS_SET (metrics_cnt)
*/
char **metrics; /**< Metrics requested, for:
* REQUESTED_METRICS_SET */
} mock;

struct {
Expand Down
2 changes: 2 additions & 0 deletions src/rdkafka_proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ static RD_UNUSED const char *rd_kafka_ApiKey2str(int16_t ApiKey) {
[RD_KAFKAP_DescribeTransactions] = "DescribeTransactions",
[RD_KAFKAP_ListTransactions] = "ListTransactions",
[RD_KAFKAP_AllocateProducerIds] = "AllocateProducerIds",
[RD_KAFKAP_GetTelemetrySubscriptions] = "GetTelemetrySubscriptions",
[RD_KAFKAP_PushTelemetry] = "PushTelemetry",
};
static RD_TLS char ret[64];

Expand Down
Loading

0 comments on commit 9546d71

Please sign in to comment.