diff --git a/common/events_common.h b/common/events_common.h index 0aec96e9d..ae9bb7d58 100644 --- a/common/events_common.h +++ b/common/events_common.h @@ -310,7 +310,15 @@ struct serialization more = 0; zmq_msg_init(&msg); int rc = zmq_msg_recv(&msg, sock, flag); - if (rc != -1) { + if (rc == 1) { + char control_character = *(char*)zmq_msg_data(&msg); + if (control_character == 0x01 || control_character == 0x00) { + SWSS_LOG_INFO("Received subscription/unsubscription message when XSUB connect to XPUB: %c", control_character); + } else { + SWSS_LOG_DEBUG("Received non subscription based control character: %c", control_character); + } + rc = 0; + } else if (rc != -1) { size_t more_size = sizeof (more); zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &more_size); @@ -318,8 +326,7 @@ struct serialization rc = zmsg_to_map(msg, data); RET_ON_ERR(rc == 0, "Failed to deserialize part rc=%d", rc); /* read more flag if message read fails to de-serialize */ - } - else { + } else { /* override with zmq err */ rc = zmq_errno(); if (rc != 11) { @@ -332,7 +339,7 @@ struct serialization return rc; } - + template int zmq_send_part(void *sock, int flag, const DT &data) diff --git a/tests/events_common_ut.cpp b/tests/events_common_ut.cpp index 7df48588b..524265793 100644 --- a/tests/events_common_ut.cpp +++ b/tests/events_common_ut.cpp @@ -97,9 +97,51 @@ TEST(events_common, send_recv) zmq_ctx_term(zmq_ctx); } +TEST(events_common, send_recv_control_character) +{ +#if 0 + { + /* Direct log messages to stdout */ + string dummy, op("STDOUT"); + swss::Logger::swssOutputNotify(dummy, op); + swss::Logger::setMinPrio(swss::Logger::SWSS_DEBUG); + } +#endif + char *path = "tcp://127.0.0.1:5570"; + void *zmq_ctx = zmq_ctx_new(); + void *sock_p0 = zmq_socket (zmq_ctx, ZMQ_PAIR); + EXPECT_EQ(0, zmq_connect (sock_p0, path)); + void *sock_p1 = zmq_socket (zmq_ctx, ZMQ_PAIR); + EXPECT_EQ(0, zmq_bind (sock_p1, path)); + string source; + map m; + + // Subscription based control character test + zmq_msg_t sub_msg; + zmq_msg_init_size(&sub_msg, 1); + *(char*)zmq_msg_data(&sub_msg) = 0x01; + EXPECT_EQ(1, zmq_msg_send(&sub_msg, sock_p0, 0)); + zmq_msg_close(&sub_msg); + // First part will be read only and will return as 0, but will not be deserialized event + EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m)); + EXPECT_EQ("", source); + EXPECT_EQ(0, m.size()); + + // Non-subscription based control character test + zmq_msg_t ctrl_msg; + zmq_msg_init_size(&ctrl_msg, 1); + *(char*)zmq_msg_data(&ctrl_msg) = 0x07; + EXPECT_EQ(1, zmq_msg_send(&ctrl_msg, sock_p0, 0)); + zmq_msg_close(&ctrl_msg); + // First part will be read only and will return as 0, but will not be deserialized event + EXPECT_EQ(0, zmq_message_read(sock_p1, 0, source, m)); + EXPECT_EQ("", source); + EXPECT_EQ(0, m.size()); - - + zmq_close(sock_p0); + zmq_close(sock_p1); + zmq_ctx_term(zmq_ctx); +}