diff --git a/include/aca_message_pulsar_consumer.h b/include/aca_message_pulsar_consumer.h index 4572c454..393bb6e2 100644 --- a/include/aca_message_pulsar_consumer.h +++ b/include/aca_message_pulsar_consumer.h @@ -48,6 +48,8 @@ class ACA_Message_Pulsar_Consumer { Consumer multicast_consumer; Consumer unicast_consumer; + static string empty_topic; + private: void setMulticastSubscriptionName(string subscription_name); @@ -59,12 +61,19 @@ class ACA_Message_Pulsar_Consumer { void setUnicastTopicName(string topic); - public: - ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name); + static string recovered_topic; + + static ACA_Message_Pulsar_Consumer &get_instance(); + + ACA_Message_Pulsar_Consumer(); + + ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name); - ~ACA_Message_Pulsar_Consumer(); + ~ACA_Message_Pulsar_Consumer(); + + void init(string topic, string brokers, string subscription_name); string getBrokers() const; @@ -76,13 +85,17 @@ class ACA_Message_Pulsar_Consumer { string getUnicastSubscriptionName() const; + static string getRecoveredTopicName(); + bool multicastConsumerDispatched(); bool unicastConsumerDispatched(int stickyHash); - //static void listener(Consumer consumer, const Message& message); + bool unicastUnsubcribe(); + + bool unicastResubscribe(string topic, int stickyHash); - + bool unicastResubscribe(bool isSubscribe, string topic="", string stickHash=""); }; } // namespace aca_message_pulsar diff --git a/src/comm/aca_message_pulsar_consumer.cpp b/src/comm/aca_message_pulsar_consumer.cpp index 5bbc9125..dc9626a6 100644 --- a/src/comm/aca_message_pulsar_consumer.cpp +++ b/src/comm/aca_message_pulsar_consumer.cpp @@ -30,6 +30,10 @@ using pulsar::StickyRange; namespace aca_message_pulsar { +string aca_message_pulsar::ACA_Message_Pulsar_Consumer::empty_topic=""; + +string aca_message_pulsar::ACA_Message_Pulsar_Consumer::recovered_topic= + ACA_Message_Pulsar_Consumer::getRecoveredTopicName(); void listener(Consumer consumer, const Message& message){ alcor::schema::GoalStateV2 deserialized_GoalState; @@ -61,9 +65,57 @@ void listener(Consumer consumer, const Message& message){ consumer.acknowledge(message.getMessageId()); } +ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer() +{ + string default_brokers = "pulsar://localhost:6650"; + string default_topic = "Host-ts-1"; + string default_subscription_name = "test_subscription"; + + setUnicastTopicName(default_topic); + recovered_topic=default_topic; + setMulticastTopicName(default_topic); + setBrokers(default_brokers); + setUnicastSubscriptionName(default_subscription_name); + setMulticastSubscriptionName(default_subscription_name); + + ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str()); + ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str()); + ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str()); + ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str()); + ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str()); + + // Create the clients + this->ptr_multicast_client= new Client(default_brokers); + this->ptr_unicast_client = new Client(default_brokers); +} + +ACA_Message_Pulsar_Consumer &ACA_Message_Pulsar_Consumer::get_instance() +{ + static ACA_Message_Pulsar_Consumer instance; + return instance; +} +void ACA_Message_Pulsar_Consumer::init(string topic, string brokers, string subscription_name){ + setUnicastTopicName(topic); + recovered_topic=topic; + setMulticastTopicName(topic); + setBrokers(brokers); + setUnicastSubscriptionName(subscription_name); + setMulticastSubscriptionName(subscription_name); + + ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str()); + ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str()); + ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str()); + ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str()); + ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str()); + + // Create the clients + this->ptr_multicast_client= new Client(brokers); + this->ptr_unicast_client = new Client(brokers); +} ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name) { setUnicastTopicName(topic); + recovered_topic=topic; setMulticastTopicName(topic); setBrokers(brokers); setUnicastSubscriptionName(subscription_name); @@ -76,7 +128,7 @@ ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer(string topic, string br ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str()); // Create the clients - //this->ptr_multicast_client= new Client(brokers); + this->ptr_multicast_client= new Client(brokers); this->ptr_unicast_client = new Client(brokers); } @@ -111,6 +163,10 @@ string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const return this->unicast_subscription_name; } +string ACA_Message_Pulsar_Consumer::getRecoveredTopicName() +{ + return "recovered topic test"; +} bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){ Result result; @@ -125,6 +181,7 @@ bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){ //Use key shared mode this->unicast_consumer_config.setConsumerType(ConsumerKeyShared).setKeySharedPolicy(keySharedPolicy).setMessageListener(listener); + ACA_LOG_INFO("%s\n",this->unicast_topic_name.c_str()); result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer); if (result != Result::ResultOk){ ACA_LOG_ERROR("Failed to subscribe unicast topic: %s\n", this->unicast_topic_name.c_str()); @@ -147,6 +204,51 @@ bool ACA_Message_Pulsar_Consumer::multicastConsumerDispatched(){ return EXIT_SUCCESS; } +bool ACA_Message_Pulsar_Consumer::unicastUnsubcribe() +{ + Result result; + if(this->unicast_topic_name==empty_topic){ + ACA_LOG_INFO("The consumer already unsubscribe the unicast topic."); + return EXIT_SUCCESS; + } + + result=this->unicast_consumer.unsubscribe(); + if (result != Result::ResultOk){ + ACA_LOG_ERROR("Failed to unsubscribe unicast topic: %s\n", this->unicast_topic_name.c_str()); + return EXIT_FAILURE; + } + this->unicast_topic_name=empty_topic; + return EXIT_SUCCESS; +} + +bool ACA_Message_Pulsar_Consumer::unicastResubscribe(string topic, int stickyHash) +{ + bool result; + + result = unicastUnsubcribe(); + + if (result==EXIT_SUCCESS){ + setUnicastTopicName(topic); + recovered_topic=topic; + result = unicastConsumerDispatched(stickyHash); + if (result==EXIT_SUCCESS) { + return EXIT_SUCCESS; + } + } + ACA_LOG_ERROR("Failed to resubscribe unicast topic: %s\n", topic.c_str()); + return EXIT_FAILURE; +} + + +bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool isSubscribe, string topic, string stickHash) +{ + if(!isSubscribe){ + return unicastUnsubcribe(); + } + else{ + return unicastResubscribe(topic, std::stoi(stickHash)); + } +} void ACA_Message_Pulsar_Consumer::setBrokers(string brokers) { @@ -173,4 +275,5 @@ void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription this->unicast_subscription_name = subscription_name; } + } // namespace aca_message_pulsar diff --git a/test/gtest/aca_test_mq.cpp b/test/gtest/aca_test_mq.cpp index f5946d94..b7b52197 100644 --- a/test/gtest/aca_test_mq.cpp +++ b/test/gtest/aca_test_mq.cpp @@ -99,8 +99,9 @@ TEST(pulsar_test_cases, DISABLED_pulsar_consumer_test) g_demo_mode = true; aca_test_reset_environment(); - - ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription); + ACA_Message_Pulsar_Consumer consumer=ACA_Message_Pulsar_Consumer::get_instance(); + consumer.init(mq_test_topic, mq_broker_ip, mq_subscription); +// ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription); consumer.multicastConsumerDispatched(); pause(); @@ -118,13 +119,57 @@ TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_test) aca_test_reset_environment(); - ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription); + ACA_Message_Pulsar_Consumer consumer = ACA_Message_Pulsar_Consumer::get_instance(); + consumer.init(mq_test_topic, mq_broker_ip, mq_subscription); consumer.unicastConsumerDispatched(mq_hash); pause(); g_demo_mode = previous_demo_mode; } +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_recover_test +TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_recover_test) +{ + string cmd_string; + string mq_update_topic="update topic"; + bool previous_demo_mode = g_demo_mode; + g_demo_mode = true; + + aca_test_reset_environment(); + + auto* pt= + new ACA_Message_Pulsar_Consumer(mq_test_topic,mq_broker_ip,mq_subscription); + pt->unicastConsumerDispatched(mq_hash); + pt->unicastResubscribe(mq_update_topic,mq_hash); + delete pt; + + pt= + new ACA_Message_Pulsar_Consumer(ACA_Message_Pulsar_Consumer::recovered_topic,mq_broker_ip,mq_subscription); + pt->unicastConsumerDispatched(mq_hash); + + pause(); + + g_demo_mode = previous_demo_mode; +} + +// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_resubscribe_test +TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_resubscribe_test) +{ + string cmd_string; + string mq_update_topic="update topic"; + bool previous_demo_mode = g_demo_mode; + g_demo_mode = true; + + aca_test_reset_environment(); + + ACA_Message_Pulsar_Consumer consumer(mq_update_topic, mq_broker_ip, mq_subscription); + consumer.unicastConsumerDispatched(mq_hash); + consumer.unicastResubscribe(false); + consumer.unicastResubscribe(true,mq_test_topic, to_string(mq_hash)); + pause(); + + g_demo_mode = previous_demo_mode; +} // This case tests the pulsar producer implementation and publishes a GoalState to the subscribed topic. // First run pulsar_consumer_test then execute @@ -139,35 +184,30 @@ TEST(pulsar_test_cases, DISABLED_pulsar_hash_producer_test) string GoalStateString; unsigned char serializedGoalState[length]; - GoalState GoalState_builder; - PortState *new_port_states = GoalState_builder.add_port_states(); - SubnetState *new_subnet_states = GoalState_builder.add_subnet_states(); - - ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( - "del-br br-int", not_care_culminative_time, overall_rc); + GoalStateV2 GoalState_builder; + PortState new_port_states; + SubnetState new_subnet_states; - ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command( - "del-br br-tun", not_care_culminative_time, overall_rc); + aca_test_reset_environment(); - overall_rc = ACA_OVS_L2_Programmer::get_instance().setup_ovs_bridges_if_need(); - ASSERT_EQ(overall_rc, EXIT_SUCCESS); - overall_rc = EXIT_SUCCESS; + aca_test_create_default_port_state(&new_port_states); + auto &port_states_map = *GoalState_builder.mutable_port_states(); + port_states_map[port_id_1] = new_port_states; - // fill in port state structs - aca_test_create_default_port_state(new_port_states); + aca_test_create_default_subnet_state(&new_subnet_states); + auto &subnet_states_map = *GoalState_builder.mutable_subnet_states(); + subnet_states_map[subnet_id_1] = new_subnet_states; - // fill in subnet state structs - aca_test_create_default_subnet_state(new_subnet_states); if(GoalState_builder.SerializeToString(&GoalStateString)){ - ACA_LOG_INFO("%s","Successfully covert GoalState to message\n"); + ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n"); } ACA_Message_Pulsar_Producer producer(mq_broker_ip, mq_test_topic); retcode = producer.publish(GoalStateString,mq_key); EXPECT_EQ(retcode, EXIT_SUCCESS); - ACA_LOG_INFO("%s","Waiting for GoalState update.\n"); + ACA_LOG_INFO("%s","Waiting for GoalStateV2 update.\n"); sleep(1); ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(