Skip to content

Commit

Permalink
Merge pull request #6 from Fangjin98/master
Browse files Browse the repository at this point in the history
add different log info
  • Loading branch information
luyaoluo authored Mar 10, 2022
2 parents 31019af + 244ec44 commit 90a7870
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 137 deletions.
11 changes: 4 additions & 7 deletions include/aca_message_pulsar_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

using namespace pulsar;
using std::string;
using std::vector;

namespace aca_message_pulsar
{
Expand All @@ -37,7 +38,7 @@ class ACA_Message_Pulsar_Consumer {
string unicast_subscription_name; // Subscription name of the unicast pulsar consumer

string multicast_topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/
string unicast_topic_name;
vector<string> unicast_topic_name = vector<string>();

ConsumerConfiguration multicast_consumer_config; //Configuration of the mulitcast pulsar consumer
ConsumerConfiguration unicast_consumer_config; //Configuration of the unicast pulsar consumer
Expand All @@ -48,8 +49,6 @@ class ACA_Message_Pulsar_Consumer {
Consumer multicast_consumer;
Consumer unicast_consumer;

static string empty_topic;

private:
void setMulticastSubscriptionName(string subscription_name);

Expand Down Expand Up @@ -88,11 +87,9 @@ class ACA_Message_Pulsar_Consumer {

bool unicastConsumerDispatched(int stickyHash);

bool unicastUnsubcribe();

bool unicastResubscribe(string topic, int stickyHash);
bool unicastUnsubscribeAll();

bool unicastResubscribe(bool isUnSubscribe, string topic="", string stickHash="");
bool unicastResubscribe(bool unSubscribe, string topic="", string stickHash="");
};

} // namespace aca_message_pulsar
Expand Down
10 changes: 6 additions & 4 deletions src/comm/aca_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall(
// process goalstateV2 in the thread pool
// It has read from the stream, now to GoalStateV2 should not be empty
// and we need to process it.
ACA_LOG_DEBUG("%s\n", "This call has already read from the stream, now we process the gsv2...");
ACA_LOG_DEBUG("%s\n", "ACA_GRPC: This call has already read from the stream, now we process the gsv2...");


if (streamingCall->goalStateV2_.neighbor_states_size() == 1) {
// if there's only one neighbor state, it means that it is pushed
Expand All @@ -189,16 +190,17 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall(
}
std::chrono::_V2::steady_clock::time_point start =
std::chrono::steady_clock::now();
ACA_LOG_INFO("%s\n", "ACA_GRPC: Start updating GoalStateV2...");
int rc = Aca_Comm_Manager::get_instance().update_goal_state(
streamingCall->goalStateV2_, streamingCall->gsOperationReply_);
if (rc == EXIT_SUCCESS) {
ACA_LOG_INFO("Control Fast Path streaming - Successfully updated host with latest goal state %d.\n",
ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Successfully updated host with latest goal state %d.\n",
rc);
} else if (rc == EINPROGRESS) {
ACA_LOG_INFO("Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n",
ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n",
rc);
} else {
ACA_LOG_ERROR("Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n",
ACA_LOG_ERROR("ACA_GRPC: Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n",
rc);
}
std::chrono::_V2::steady_clock::time_point end =
Expand Down
99 changes: 40 additions & 59 deletions src/comm/aca_message_pulsar_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,32 @@ using pulsar::StickyRange;

namespace aca_message_pulsar
{
string aca_message_pulsar::ACA_Message_Pulsar_Consumer::empty_topic="";

void listener(Consumer consumer, const Message& message){
alcor::schema::GoalStateV2 deserialized_GoalState;
alcor::schema::GoalStateOperationReply gsOperationalReply;
int rc;
Result result;

ACA_LOG_DEBUG("\n<=====incoming message: %s\n",
message.getDataAsString().c_str());

ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Successfully received the incoming message.\n");
ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start deserializing the message to GoalState...\n");
ACA_LOG_INFO("%s","<====================================================>\n");
rc = Aca_Comm_Manager::get_instance().deserialize(
(unsigned char *)message.getData(), message.getLength(), deserialized_GoalState);
ACA_LOG_INFO("%s","<====================================================>\n");
if (rc == EXIT_SUCCESS) {
ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start updating GoalState...\n");
ACA_LOG_INFO("%s","<====================================================>\n");
rc = Aca_Comm_Manager::get_instance().update_goal_state(
deserialized_GoalState, gsOperationalReply);


ACA_LOG_INFO("%s","<====================================================>\n");
if (rc != EXIT_SUCCESS) {
ACA_LOG_ERROR("Failed to update host with latest goal state, rc=%d.\n", rc);
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to update host with latest goal state, rc=%d.\n", rc);
} else {
ACA_LOG_INFO("Successfully updated host with latest goal state %d.\n", rc);
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully updated host with latest goal state, rc=%d.\n", rc);
}

} else {
ACA_LOG_ERROR("Deserialization failed with error code %d.\n", rc);
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to deserialize the message with error code %d.\n", rc);
}

// Now acknowledge message
Expand All @@ -78,11 +78,11 @@ void ACA_Message_Pulsar_Consumer::init(string topic, string brokers, string subs
setUnicastSubscriptionName(subscription_name);
setMulticastSubscriptionName(subscription_name);

ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Broker list -> %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer topic name -> %s\n", this->unicast_topic_name[0].c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer subscription name -> %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer topic name -> %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer subscription name -> %s\n", this->multicast_subscription_name.c_str());

// Create the clients
this->ptr_multicast_client= new Client(brokers);
Expand Down Expand Up @@ -112,23 +112,18 @@ string ACA_Message_Pulsar_Consumer::getMulticastSubscriptionName() const

string ACA_Message_Pulsar_Consumer::getUnicastTopicName() const
{
return this->unicast_topic_name;
string topic = "";
for (auto t: this->unicast_topic_name)
topic += (t+" ");
return topic;
}

string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const
{
return this->unicast_subscription_name;
}

//TODO: get recovered topic from database?
string ACA_Message_Pulsar_Consumer::getRecoveredTopicName()
{
return "recovered topic test";
}

bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){
Result result;
Consumer consumer;
KeySharedPolicy keySharedPolicy;

keySharedPolicy.setKeySharedMode(STICKY);
Expand All @@ -139,10 +134,9 @@ bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){

//Use key shared mode
this->unicast_consumer_config.setConsumerType(ConsumerKeyShared).setKeySharedPolicy(keySharedPolicy).setMessageListener(listener);
ACA_LOG_INFO("%s\n",this->unicast_topic_name.c_str());
result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer);
Result result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer);
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to subscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe unicast topic -> %s\n", this->getUnicastTopicName().c_str());
return EXIT_FAILURE;
}

Expand All @@ -156,54 +150,41 @@ bool ACA_Message_Pulsar_Consumer::multicastConsumerDispatched(){
this->multicast_consumer_config.setMessageListener(listener);
result = this->ptr_multicast_client->subscribe(this->multicast_topic_name,this->multicast_subscription_name,this->multicast_consumer_config,this->multicast_consumer);
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to subscribe multicast topic: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe multicast topic -> %s\n", this->multicast_topic_name.c_str());
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastUnsubcribe()
bool ACA_Message_Pulsar_Consumer::unicastUnsubscribeAll()
{
Result result;
if(this->unicast_topic_name==empty_topic){
ACA_LOG_INFO("The consumer already unsubscribe the unicast topic.\n");
if(this->unicast_topic_name.empty()){
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n");
return EXIT_SUCCESS;
}

result=this->unicast_consumer.unsubscribe();
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to unsubscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
if (this->unicast_consumer.unsubscribe() == Result::ResultOk){
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n");
this->unicast_topic_name.clear();
return EXIT_SUCCESS;
}
else{
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to unsubscribe unicast topics -> %s.\n", this->getUnicastTopicName().c_str());
return EXIT_FAILURE;
}
this->unicast_topic_name=empty_topic;
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastResubscribe(string topic, int stickyHash)
bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool unSubscribe, string topic, string stickHash)
{
bool result;

result = unicastUnsubcribe();

if (result==EXIT_SUCCESS){
if(!unSubscribe){
if(this->unicast_consumer.unsubscribe() == Result::ResultOk){ // this unsubscribes topics in pulsar, but doesn't clean the topic list of Consumer.
setUnicastTopicName(topic);
result = unicastConsumerDispatched(stickyHash);
if (result==EXIT_SUCCESS) {
return EXIT_SUCCESS;
}
}
ACA_LOG_ERROR("Failed to resubscribe unicast topic: %s\n", topic.c_str());
return EXIT_FAILURE;
}


bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool isUnSubscribe, string topic, string stickHash)
{
if(isUnSubscribe){
return unicastUnsubcribe();
return unicastConsumerDispatched(stoi(stickHash));
}
return EXIT_FAILURE;
}
else{
return unicastResubscribe(topic, std::stoi(stickHash));
return unicastUnsubscribeAll();
}
}

Expand All @@ -224,7 +205,7 @@ void ACA_Message_Pulsar_Consumer::setMulticastSubscriptionName(string subscripti

void ACA_Message_Pulsar_Consumer::setUnicastTopicName(string topic)
{
this->unicast_topic_name = topic;
this->unicast_topic_name.push_back(topic);
}

void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription_name)
Expand Down
56 changes: 26 additions & 30 deletions src/comm/aca_message_pulsar_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,60 +55,56 @@ void ACA_Message_Pulsar_Producer::setTopicName(string topic)

bool ACA_Message_Pulsar_Producer::publish(string message)
{
Result result;

// Create a producer
Producer producer;
result = this->ptr_client->createProducer(this->topic_name,producer);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result);
Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer);
if(createProducerResult != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult);
return EXIT_FAILURE;
}

// Create a message
Message msg = MessageBuilder().setContent(message).build();
result = producer.send(msg);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str());

if(producer.send(msg) != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message -> %s.\n", message.c_str());
return EXIT_FAILURE;
}
else{
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str());

ACA_LOG_INFO("Successfully send message %s\n", message.c_str());

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;
}
}

bool ACA_Message_Pulsar_Producer::publish(string message, string orderingKey)
{
Result result;

// Create a producer
Producer producer;
result = this->ptr_client->createProducer(this->topic_name,producer);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result);
Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer);
if(createProducerResult != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult);
return EXIT_FAILURE;
}

// Create a message
Message msg = MessageBuilder().setContent(message).setOrderingKey(orderingKey).build();
result = producer.send(msg);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str());

if(producer.send(msg) != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message %s.\n", message.c_str());
return EXIT_FAILURE;
}
else{
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str());

ACA_LOG_INFO("Successfully send message %s\n", message.c_str());

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;
}
}
void ACA_Message_Pulsar_Producer::setBrokers(string brokers)
{
Expand Down
19 changes: 14 additions & 5 deletions test/func_tests/gs_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ string g_ncm_port = EMPTY_STRING;
string g_grpc_server_port = EMPTY_STRING;
// by default, this should run as GRCP client, unless specified by the corresponding flag.
bool g_run_as_server = false;
bool g_run_as_topic_client = false;

GoalStateProvisionerAsyncServer *g_grpc_server = NULL;
GoalStateProvisionerClientImpl *g_grpc_client = NULL;
// GoalStateProvisionerServer *g_test_grcp_server = NULL;
Expand Down Expand Up @@ -812,7 +814,7 @@ int main(int argc, char *argv[])
signal(SIGINT, aca_signal_handler);
signal(SIGTERM, aca_signal_handler);

while ((option = getopt(argc, argv, "s:p:dm")) != -1) {
while ((option = getopt(argc, argv, "s:p:dmt")) != -1) {
switch (option) {
case 's':
g_grpc_server_ip = optarg;
Expand All @@ -826,14 +828,18 @@ int main(int argc, char *argv[])
case 'm':
g_run_as_server = true;
break;
case 't':
g_run_as_topic_client = true;
break;
default: /* the '?' case when the option is not recognized */
/* specifying port not avaiable for now */
fprintf(stderr,
"Usage: %s\n"
"\t\t[-s grpc server]\n"
"\t\t[-p grpc port]\n"
"\t\t[-d enable debug mode]\n"
"\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321; otherwise it runs as a grpc client]\n",
"\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321;]\n",
"\t\t[-t If this flag is passed in, gs test runs as grpc topic client, which publishes topic subscribe requests to localhost:50002; otherwise it runs as a grpc client]\n",
argv[0]);
exit(EXIT_FAILURE);
}
Expand All @@ -853,9 +859,12 @@ int main(int argc, char *argv[])

if (g_run_as_server) {
rc = RunServer();
} else {
// rc = run_as_client();
rc = run_as_topic_client();
}
else if (g_run_as_topic_client){
rc = run_as_topic_client();
}
else {
rc = run_as_client();
}
// Verify that the version of the library that we linked against is
// compatible with the version of the headers we compiled against.
Expand Down
Loading

0 comments on commit 90a7870

Please sign in to comment.