Skip to content

Commit

Permalink
Merge pull request #4 from Fangjin98/master
Browse files Browse the repository at this point in the history
add unicastResubscribe function.
  • Loading branch information
luyaoluo authored Dec 22, 2021
2 parents 4fc7762 + bd9c5c7 commit 956ba37
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 26 deletions.
23 changes: 18 additions & 5 deletions include/aca_message_pulsar_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class ACA_Message_Pulsar_Consumer {
Consumer multicast_consumer;
Consumer unicast_consumer;

static string empty_topic;

private:
void setMulticastSubscriptionName(string subscription_name);

Expand All @@ -59,12 +61,19 @@ class ACA_Message_Pulsar_Consumer {

void setUnicastTopicName(string topic);



public:
ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name);
static string recovered_topic;

static ACA_Message_Pulsar_Consumer &get_instance();

ACA_Message_Pulsar_Consumer();

ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name);

~ACA_Message_Pulsar_Consumer();
~ACA_Message_Pulsar_Consumer();

void init(string topic, string brokers, string subscription_name);

string getBrokers() const;

Expand All @@ -76,13 +85,17 @@ class ACA_Message_Pulsar_Consumer {

string getUnicastSubscriptionName() const;

static string getRecoveredTopicName();

bool multicastConsumerDispatched();

bool unicastConsumerDispatched(int stickyHash);

//static void listener(Consumer consumer, const Message& message);
bool unicastUnsubcribe();

bool unicastResubscribe(string topic, int stickyHash);


bool unicastResubscribe(bool isSubscribe, string topic="", string stickHash="");
};

} // namespace aca_message_pulsar
Expand Down
105 changes: 104 additions & 1 deletion src/comm/aca_message_pulsar_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ using pulsar::StickyRange;

namespace aca_message_pulsar
{
string aca_message_pulsar::ACA_Message_Pulsar_Consumer::empty_topic="";

string aca_message_pulsar::ACA_Message_Pulsar_Consumer::recovered_topic=
ACA_Message_Pulsar_Consumer::getRecoveredTopicName();

void listener(Consumer consumer, const Message& message){
alcor::schema::GoalStateV2 deserialized_GoalState;
Expand Down Expand Up @@ -61,9 +65,57 @@ void listener(Consumer consumer, const Message& message){
consumer.acknowledge(message.getMessageId());
}

ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer()
{
string default_brokers = "pulsar://localhost:6650";
string default_topic = "Host-ts-1";
string default_subscription_name = "test_subscription";

setUnicastTopicName(default_topic);
recovered_topic=default_topic;
setMulticastTopicName(default_topic);
setBrokers(default_brokers);
setUnicastSubscriptionName(default_subscription_name);
setMulticastSubscriptionName(default_subscription_name);

ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str());

// Create the clients
this->ptr_multicast_client= new Client(default_brokers);
this->ptr_unicast_client = new Client(default_brokers);
}

ACA_Message_Pulsar_Consumer &ACA_Message_Pulsar_Consumer::get_instance()
{
static ACA_Message_Pulsar_Consumer instance;
return instance;
}
void ACA_Message_Pulsar_Consumer::init(string topic, string brokers, string subscription_name){
setUnicastTopicName(topic);
recovered_topic=topic;
setMulticastTopicName(topic);
setBrokers(brokers);
setUnicastSubscriptionName(subscription_name);
setMulticastSubscriptionName(subscription_name);

ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str());

// Create the clients
this->ptr_multicast_client= new Client(brokers);
this->ptr_unicast_client = new Client(brokers);
}
ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name)
{
setUnicastTopicName(topic);
recovered_topic=topic;
setMulticastTopicName(topic);
setBrokers(brokers);
setUnicastSubscriptionName(subscription_name);
Expand All @@ -76,7 +128,7 @@ ACA_Message_Pulsar_Consumer::ACA_Message_Pulsar_Consumer(string topic, string br
ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str());

// Create the clients
//this->ptr_multicast_client= new Client(brokers);
this->ptr_multicast_client= new Client(brokers);
this->ptr_unicast_client = new Client(brokers);
}

Expand Down Expand Up @@ -111,6 +163,10 @@ string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const
return this->unicast_subscription_name;
}

string ACA_Message_Pulsar_Consumer::getRecoveredTopicName()
{
return "recovered topic test";
}

bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){
Result result;
Expand All @@ -125,6 +181,7 @@ bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){

//Use key shared mode
this->unicast_consumer_config.setConsumerType(ConsumerKeyShared).setKeySharedPolicy(keySharedPolicy).setMessageListener(listener);
ACA_LOG_INFO("%s\n",this->unicast_topic_name.c_str());
result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer);
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to subscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
Expand All @@ -147,6 +204,51 @@ bool ACA_Message_Pulsar_Consumer::multicastConsumerDispatched(){
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastUnsubcribe()
{
Result result;
if(this->unicast_topic_name==empty_topic){
ACA_LOG_INFO("The consumer already unsubscribe the unicast topic.");
return EXIT_SUCCESS;
}

result=this->unicast_consumer.unsubscribe();
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to unsubscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
return EXIT_FAILURE;
}
this->unicast_topic_name=empty_topic;
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastResubscribe(string topic, int stickyHash)
{
bool result;

result = unicastUnsubcribe();

if (result==EXIT_SUCCESS){
setUnicastTopicName(topic);
recovered_topic=topic;
result = unicastConsumerDispatched(stickyHash);
if (result==EXIT_SUCCESS) {
return EXIT_SUCCESS;
}
}
ACA_LOG_ERROR("Failed to resubscribe unicast topic: %s\n", topic.c_str());
return EXIT_FAILURE;
}


bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool isSubscribe, string topic, string stickHash)
{
if(!isSubscribe){
return unicastUnsubcribe();
}
else{
return unicastResubscribe(topic, std::stoi(stickHash));
}
}

void ACA_Message_Pulsar_Consumer::setBrokers(string brokers)
{
Expand All @@ -173,4 +275,5 @@ void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription
this->unicast_subscription_name = subscription_name;
}


} // namespace aca_message_pulsar
80 changes: 60 additions & 20 deletions test/gtest/aca_test_mq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ TEST(pulsar_test_cases, DISABLED_pulsar_consumer_test)
g_demo_mode = true;

aca_test_reset_environment();

ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription);
ACA_Message_Pulsar_Consumer consumer=ACA_Message_Pulsar_Consumer::get_instance();
consumer.init(mq_test_topic, mq_broker_ip, mq_subscription);
// ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription);
consumer.multicastConsumerDispatched();
pause();

Expand All @@ -118,13 +119,57 @@ TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_test)

aca_test_reset_environment();

ACA_Message_Pulsar_Consumer consumer(mq_test_topic, mq_broker_ip, mq_subscription);
ACA_Message_Pulsar_Consumer consumer = ACA_Message_Pulsar_Consumer::get_instance();
consumer.init(mq_test_topic, mq_broker_ip, mq_subscription);
consumer.unicastConsumerDispatched(mq_hash);
pause();

g_demo_mode = previous_demo_mode;
}

// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_recover_test
TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_recover_test)
{
string cmd_string;
string mq_update_topic="update topic";
bool previous_demo_mode = g_demo_mode;
g_demo_mode = true;

aca_test_reset_environment();

auto* pt=
new ACA_Message_Pulsar_Consumer(mq_test_topic,mq_broker_ip,mq_subscription);
pt->unicastConsumerDispatched(mq_hash);
pt->unicastResubscribe(mq_update_topic,mq_hash);
delete pt;

pt=
new ACA_Message_Pulsar_Consumer(ACA_Message_Pulsar_Consumer::recovered_topic,mq_broker_ip,mq_subscription);
pt->unicastConsumerDispatched(mq_hash);

pause();

g_demo_mode = previous_demo_mode;
}

// sudo ./aca_tests --gtest_also_run_disabled_tests --gtest_filter=*DISABLED_pulsar_unicast_consumer_resubscribe_test
TEST(pulsar_test_cases, DISABLED_pulsar_unicast_consumer_resubscribe_test)
{
string cmd_string;
string mq_update_topic="update topic";
bool previous_demo_mode = g_demo_mode;
g_demo_mode = true;

aca_test_reset_environment();

ACA_Message_Pulsar_Consumer consumer(mq_update_topic, mq_broker_ip, mq_subscription);
consumer.unicastConsumerDispatched(mq_hash);
consumer.unicastResubscribe(false);
consumer.unicastResubscribe(true,mq_test_topic, to_string(mq_hash));
pause();

g_demo_mode = previous_demo_mode;
}

// This case tests the pulsar producer implementation and publishes a GoalState to the subscribed topic.
// First run pulsar_consumer_test then execute
Expand All @@ -139,35 +184,30 @@ TEST(pulsar_test_cases, DISABLED_pulsar_hash_producer_test)
string GoalStateString;
unsigned char serializedGoalState[length];

GoalState GoalState_builder;
PortState *new_port_states = GoalState_builder.add_port_states();
SubnetState *new_subnet_states = GoalState_builder.add_subnet_states();

ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(
"del-br br-int", not_care_culminative_time, overall_rc);
GoalStateV2 GoalState_builder;
PortState new_port_states;
SubnetState new_subnet_states;

ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(
"del-br br-tun", not_care_culminative_time, overall_rc);
aca_test_reset_environment();

overall_rc = ACA_OVS_L2_Programmer::get_instance().setup_ovs_bridges_if_need();
ASSERT_EQ(overall_rc, EXIT_SUCCESS);
overall_rc = EXIT_SUCCESS;
aca_test_create_default_port_state(&new_port_states);
auto &port_states_map = *GoalState_builder.mutable_port_states();
port_states_map[port_id_1] = new_port_states;

// fill in port state structs
aca_test_create_default_port_state(new_port_states);
aca_test_create_default_subnet_state(&new_subnet_states);
auto &subnet_states_map = *GoalState_builder.mutable_subnet_states();
subnet_states_map[subnet_id_1] = new_subnet_states;

// fill in subnet state structs
aca_test_create_default_subnet_state(new_subnet_states);

if(GoalState_builder.SerializeToString(&GoalStateString)){
ACA_LOG_INFO("%s","Successfully covert GoalState to message\n");
ACA_LOG_INFO("%s","Successfully covert GoalStateV2 to message\n");
}

ACA_Message_Pulsar_Producer producer(mq_broker_ip, mq_test_topic);
retcode = producer.publish(GoalStateString,mq_key);
EXPECT_EQ(retcode, EXIT_SUCCESS);

ACA_LOG_INFO("%s","Waiting for GoalState update.\n");
ACA_LOG_INFO("%s","Waiting for GoalStateV2 update.\n");
sleep(1);

ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(
Expand Down

0 comments on commit 956ba37

Please sign in to comment.