Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add different log info #6

Merged
merged 6 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions include/aca_message_pulsar_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

using namespace pulsar;
using std::string;
using std::vector;

namespace aca_message_pulsar
{
Expand All @@ -37,7 +38,7 @@ class ACA_Message_Pulsar_Consumer {
string unicast_subscription_name; // Subscription name of the unicast pulsar consumer

string multicast_topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/
string unicast_topic_name;
vector<string> unicast_topic_name = vector<string>();

ConsumerConfiguration multicast_consumer_config; //Configuration of the mulitcast pulsar consumer
ConsumerConfiguration unicast_consumer_config; //Configuration of the unicast pulsar consumer
Expand All @@ -48,8 +49,6 @@ class ACA_Message_Pulsar_Consumer {
Consumer multicast_consumer;
Consumer unicast_consumer;

static string empty_topic;

private:
void setMulticastSubscriptionName(string subscription_name);

Expand Down Expand Up @@ -88,11 +87,9 @@ class ACA_Message_Pulsar_Consumer {

bool unicastConsumerDispatched(int stickyHash);

bool unicastUnsubcribe();

bool unicastResubscribe(string topic, int stickyHash);
bool unicastUnsubscribeAll();

bool unicastResubscribe(bool isUnSubscribe, string topic="", string stickHash="");
bool unicastResubscribe(bool unSubscribe, string topic="", string stickHash="");
};

} // namespace aca_message_pulsar
Expand Down
10 changes: 6 additions & 4 deletions src/comm/aca_grpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall(
// process goalstateV2 in the thread pool
// It has read from the stream, now to GoalStateV2 should not be empty
// and we need to process it.
ACA_LOG_DEBUG("%s\n", "This call has already read from the stream, now we process the gsv2...");
ACA_LOG_DEBUG("%s\n", "ACA_GRPC: This call has already read from the stream, now we process the gsv2...");


if (streamingCall->goalStateV2_.neighbor_states_size() == 1) {
// if there's only one neighbor state, it means that it is pushed
Expand All @@ -189,16 +190,17 @@ void GoalStateProvisionerAsyncServer::ProcessPushGoalStatesStreamAsyncCall(
}
std::chrono::_V2::steady_clock::time_point start =
std::chrono::steady_clock::now();
ACA_LOG_INFO("%s\n", "ACA_GRPC: Start updating GoalStateV2...");
int rc = Aca_Comm_Manager::get_instance().update_goal_state(
streamingCall->goalStateV2_, streamingCall->gsOperationReply_);
if (rc == EXIT_SUCCESS) {
ACA_LOG_INFO("Control Fast Path streaming - Successfully updated host with latest goal state %d.\n",
ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Successfully updated host with latest goal state %d.\n",
rc);
} else if (rc == EINPROGRESS) {
ACA_LOG_INFO("Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n",
ACA_LOG_INFO("ACA_GRPC: Control Fast Path streaming - Update host with latest goal state returned pending, rc=%d.\n",
rc);
} else {
ACA_LOG_ERROR("Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n",
ACA_LOG_ERROR("ACA_GRPC: Control Fast Path streaming - Failed to update host with latest goal state, rc=%d.\n",
rc);
}
std::chrono::_V2::steady_clock::time_point end =
Expand Down
99 changes: 40 additions & 59 deletions src/comm/aca_message_pulsar_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,32 @@ using pulsar::StickyRange;

namespace aca_message_pulsar
{
string aca_message_pulsar::ACA_Message_Pulsar_Consumer::empty_topic="";

void listener(Consumer consumer, const Message& message){
alcor::schema::GoalStateV2 deserialized_GoalState;
alcor::schema::GoalStateOperationReply gsOperationalReply;
int rc;
Result result;

ACA_LOG_DEBUG("\n<=====incoming message: %s\n",
message.getDataAsString().c_str());

ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Successfully received the incoming message.\n");
ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start deserializing the message to GoalState...\n");
ACA_LOG_INFO("%s","<====================================================>\n");
rc = Aca_Comm_Manager::get_instance().deserialize(
(unsigned char *)message.getData(), message.getLength(), deserialized_GoalState);
ACA_LOG_INFO("%s","<====================================================>\n");
if (rc == EXIT_SUCCESS) {
ACA_LOG_INFO("%s","ACA_PULSAR_MQ: Start updating GoalState...\n");
ACA_LOG_INFO("%s","<====================================================>\n");
rc = Aca_Comm_Manager::get_instance().update_goal_state(
deserialized_GoalState, gsOperationalReply);


ACA_LOG_INFO("%s","<====================================================>\n");
if (rc != EXIT_SUCCESS) {
ACA_LOG_ERROR("Failed to update host with latest goal state, rc=%d.\n", rc);
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to update host with latest goal state, rc=%d.\n", rc);
} else {
ACA_LOG_INFO("Successfully updated host with latest goal state %d.\n", rc);
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully updated host with latest goal state, rc=%d.\n", rc);
}

} else {
ACA_LOG_ERROR("Deserialization failed with error code %d.\n", rc);
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to deserialize the message with error code %d.\n", rc);
}

// Now acknowledge message
Expand All @@ -78,11 +78,11 @@ void ACA_Message_Pulsar_Consumer::init(string topic, string brokers, string subs
setUnicastSubscriptionName(subscription_name);
setMulticastSubscriptionName(subscription_name);

ACA_LOG_DEBUG("Broker list: %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("Unicast consumer topic name: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_DEBUG("Unicast consumer subscription name: %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("Multicast consumer topic name: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("Multicast consumer subscription name: %s\n", this->multicast_subscription_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Broker list -> %s\n", this->brokers_list.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer topic name -> %s\n", this->unicast_topic_name[0].c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Unicast consumer subscription name -> %s\n", this->unicast_subscription_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer topic name -> %s\n", this->multicast_topic_name.c_str());
ACA_LOG_DEBUG("ACA_PULSAR_MQ: Multicast consumer subscription name -> %s\n", this->multicast_subscription_name.c_str());

// Create the clients
this->ptr_multicast_client= new Client(brokers);
Expand Down Expand Up @@ -112,23 +112,18 @@ string ACA_Message_Pulsar_Consumer::getMulticastSubscriptionName() const

string ACA_Message_Pulsar_Consumer::getUnicastTopicName() const
{
return this->unicast_topic_name;
string topic = "";
for (auto t: this->unicast_topic_name)
topic += (t+" ");
return topic;
}

string ACA_Message_Pulsar_Consumer::getUnicastSubscriptionName() const
{
return this->unicast_subscription_name;
}

//TODO: get recovered topic from database?
string ACA_Message_Pulsar_Consumer::getRecoveredTopicName()
{
return "recovered topic test";
}

bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){
Result result;
Consumer consumer;
KeySharedPolicy keySharedPolicy;

keySharedPolicy.setKeySharedMode(STICKY);
Expand All @@ -139,10 +134,9 @@ bool ACA_Message_Pulsar_Consumer::unicastConsumerDispatched(int stickyHash){

//Use key shared mode
this->unicast_consumer_config.setConsumerType(ConsumerKeyShared).setKeySharedPolicy(keySharedPolicy).setMessageListener(listener);
ACA_LOG_INFO("%s\n",this->unicast_topic_name.c_str());
result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer);
Result result = this->ptr_unicast_client->subscribe(this->unicast_topic_name,this->unicast_subscription_name,this->unicast_consumer_config,this->unicast_consumer);
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to subscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe unicast topic -> %s\n", this->getUnicastTopicName().c_str());
return EXIT_FAILURE;
}

Expand All @@ -156,54 +150,41 @@ bool ACA_Message_Pulsar_Consumer::multicastConsumerDispatched(){
this->multicast_consumer_config.setMessageListener(listener);
result = this->ptr_multicast_client->subscribe(this->multicast_topic_name,this->multicast_subscription_name,this->multicast_consumer_config,this->multicast_consumer);
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to subscribe multicast topic: %s\n", this->multicast_topic_name.c_str());
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to subscribe multicast topic -> %s\n", this->multicast_topic_name.c_str());
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastUnsubcribe()
bool ACA_Message_Pulsar_Consumer::unicastUnsubscribeAll()
{
Result result;
if(this->unicast_topic_name==empty_topic){
ACA_LOG_INFO("The consumer already unsubscribe the unicast topic.\n");
if(this->unicast_topic_name.empty()){
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n");
return EXIT_SUCCESS;
}

result=this->unicast_consumer.unsubscribe();
if (result != Result::ResultOk){
ACA_LOG_ERROR("Failed to unsubscribe unicast topic: %s\n", this->unicast_topic_name.c_str());
if (this->unicast_consumer.unsubscribe() == Result::ResultOk){
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully to unsubscribe all the unicast topics.\n");
this->unicast_topic_name.clear();
return EXIT_SUCCESS;
}
else{
ACA_LOG_ERROR("ACA_PULSAR_MQ: ERROR, failed to unsubscribe unicast topics -> %s.\n", this->getUnicastTopicName().c_str());
return EXIT_FAILURE;
}
this->unicast_topic_name=empty_topic;
return EXIT_SUCCESS;
}

bool ACA_Message_Pulsar_Consumer::unicastResubscribe(string topic, int stickyHash)
bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool unSubscribe, string topic, string stickHash)
{
bool result;

result = unicastUnsubcribe();

if (result==EXIT_SUCCESS){
if(!unSubscribe){
if(this->unicast_consumer.unsubscribe() == Result::ResultOk){ // this unsubscribes topics in pulsar, but doesn't clean the topic list of Consumer.
setUnicastTopicName(topic);
result = unicastConsumerDispatched(stickyHash);
if (result==EXIT_SUCCESS) {
return EXIT_SUCCESS;
}
}
ACA_LOG_ERROR("Failed to resubscribe unicast topic: %s\n", topic.c_str());
return EXIT_FAILURE;
}


bool ACA_Message_Pulsar_Consumer::unicastResubscribe(bool isUnSubscribe, string topic, string stickHash)
{
if(isUnSubscribe){
return unicastUnsubcribe();
return unicastConsumerDispatched(stoi(stickHash));
}
return EXIT_FAILURE;
}
else{
return unicastResubscribe(topic, std::stoi(stickHash));
return unicastUnsubscribeAll();
}
}

Expand All @@ -224,7 +205,7 @@ void ACA_Message_Pulsar_Consumer::setMulticastSubscriptionName(string subscripti

void ACA_Message_Pulsar_Consumer::setUnicastTopicName(string topic)
{
this->unicast_topic_name = topic;
this->unicast_topic_name.push_back(topic);
}

void ACA_Message_Pulsar_Consumer::setUnicastSubscriptionName(string subscription_name)
Expand Down
56 changes: 26 additions & 30 deletions src/comm/aca_message_pulsar_producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,60 +55,56 @@ void ACA_Message_Pulsar_Producer::setTopicName(string topic)

bool ACA_Message_Pulsar_Producer::publish(string message)
{
Result result;

// Create a producer
Producer producer;
result = this->ptr_client->createProducer(this->topic_name,producer);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result);
Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer);
if(createProducerResult != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult);
return EXIT_FAILURE;
}

// Create a message
Message msg = MessageBuilder().setContent(message).build();
result = producer.send(msg);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str());

if(producer.send(msg) != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message -> %s.\n", message.c_str());
return EXIT_FAILURE;
}
else{
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str());

ACA_LOG_INFO("Successfully send message %s\n", message.c_str());

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;
}
}

bool ACA_Message_Pulsar_Producer::publish(string message, string orderingKey)
{
Result result;

// Create a producer
Producer producer;
result = this->ptr_client->createProducer(this->topic_name,producer);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to create producer, result=%d.\n", result);
Result createProducerResult = this->ptr_client->createProducer(this->topic_name,producer);
if(createProducerResult != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to create producer, result=%d.\n", createProducerResult);
return EXIT_FAILURE;
}

// Create a message
Message msg = MessageBuilder().setContent(message).setOrderingKey(orderingKey).build();
result = producer.send(msg);
if(result != ResultOk){
ACA_LOG_ERROR("Failed to send message %s.\n", message.c_str());

if(producer.send(msg) != ResultOk){
ACA_LOG_ERROR("ACA_PULSAR_MQ: Failed to send message %s.\n", message.c_str());
return EXIT_FAILURE;
}
else{
ACA_LOG_INFO("ACA_PULSAR_MQ: Successfully send message %s.\n", message.c_str());

ACA_LOG_INFO("Successfully send message %s\n", message.c_str());

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;

// Flush all produced messages
producer.flush();
producer.close();
return EXIT_SUCCESS;
}
}
void ACA_Message_Pulsar_Producer::setBrokers(string brokers)
{
Expand Down
19 changes: 14 additions & 5 deletions test/func_tests/gs_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ string g_ncm_port = EMPTY_STRING;
string g_grpc_server_port = EMPTY_STRING;
// by default, this should run as GRCP client, unless specified by the corresponding flag.
bool g_run_as_server = false;
bool g_run_as_topic_client = false;

GoalStateProvisionerAsyncServer *g_grpc_server = NULL;
GoalStateProvisionerClientImpl *g_grpc_client = NULL;
// GoalStateProvisionerServer *g_test_grcp_server = NULL;
Expand Down Expand Up @@ -812,7 +814,7 @@ int main(int argc, char *argv[])
signal(SIGINT, aca_signal_handler);
signal(SIGTERM, aca_signal_handler);

while ((option = getopt(argc, argv, "s:p:dm")) != -1) {
while ((option = getopt(argc, argv, "s:p:dmt")) != -1) {
switch (option) {
case 's':
g_grpc_server_ip = optarg;
Expand All @@ -826,14 +828,18 @@ int main(int argc, char *argv[])
case 'm':
g_run_as_server = true;
break;
case 't':
g_run_as_topic_client = true;
break;
default: /* the '?' case when the option is not recognized */
/* specifying port not avaiable for now */
fprintf(stderr,
"Usage: %s\n"
"\t\t[-s grpc server]\n"
"\t\t[-p grpc port]\n"
"\t\t[-d enable debug mode]\n"
"\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321; otherwise it runs as a grpc client]\n",
"\t\t[-m If this flag is passed in, gs test runs as grpc server, which listens on localhost:54321;]\n",
"\t\t[-t If this flag is passed in, gs test runs as grpc topic client, which publishes topic subscribe requests to localhost:50002; otherwise it runs as a grpc client]\n",
argv[0]);
exit(EXIT_FAILURE);
}
Expand All @@ -853,9 +859,12 @@ int main(int argc, char *argv[])

if (g_run_as_server) {
rc = RunServer();
} else {
// rc = run_as_client();
rc = run_as_topic_client();
}
else if (g_run_as_topic_client){
rc = run_as_topic_client();
}
else {
rc = run_as_client();
}
// Verify that the version of the library that we linked against is
// compatible with the version of the headers we compiled against.
Expand Down
Loading