Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Add grpc thread for pulsar subscribe info #274

Open
wants to merge 50 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
1c121dc
fix invalid pointer bug
luyaoluo Mar 10, 2021
a271fe3
[Fix #223]remove arp test case on one machine
luyaoluo Mar 16, 2021
5c5add0
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Aug 9, 2021
8e019a9
some minor change
luyaoluo Sep 4, 2021
b54e3ee
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Sep 22, 2021
5de9608
Add listeners for multicast and unicast consumer
luyaoluo Oct 20, 2021
8155093
Add CLI parameter for pulsar hashed key
luyaoluo Oct 26, 2021
00fd7c7
add mq test
Nov 29, 2021
5311466
mq publish serialized goalstate.
Nov 29, 2021
869655a
add GoalStateV2 test case
Nov 29, 2021
c6ef0fb
add unicast test cases
Dec 3, 2021
103a751
Pulsar producer can publish key-shared messages.
Fangjin98 Dec 3, 2021
c61e6d0
Merge pull request #1 from Fangjin98/master
luyaoluo Dec 3, 2021
b30c925
Merge branch 'master' of https://github.com/lly00/alcor-control-agent
luyaoluo Dec 3, 2021
44d6bd4
Merge branch 'master' of https://github.com/futurewei-cloud/alcor-con…
luyaoluo Dec 3, 2021
46b2316
add test cases for mq
luyaoluo Dec 3, 2021
dde2e94
fix pulsar path
luyaoluo Dec 3, 2021
1c89be9
add deserialize of GoalStateV2
Fangjin98 Dec 7, 2021
f89bb7e
change consumer covert message into GoalStateV2
Fangjin98 Dec 7, 2021
e9ce7e0
Add publish GoalStateV2 test
Fangjin98 Dec 7, 2021
1dc9034
Merge branch 'master' into master
luyaoluo Dec 9, 2021
3e8cd96
Merge pull request #2 from Fangjin98/master
luyaoluo Dec 9, 2021
3d4f6a0
fix pulsar producer orderingKey bugs
Dec 9, 2021
4fc7762
Merge pull request #3 from Fangjin98/master
luyaoluo Dec 9, 2021
ad9a372
add head files
luyaoluo Dec 17, 2021
9f0398c
merge from master
luyaoluo Dec 17, 2021
8c56c64
pulsar consumer now can resubscribe topic name.
Fangjin98 Dec 18, 2021
944cbad
add static recovered_topic to support fault tolerance
Fangjin98 Dec 18, 2021
02cc3c4
overload unicastResubscribe
Fangjin98 Dec 22, 2021
76a725f
aca_message_pulsar_consumer runs as instance
Fangjin98 Dec 22, 2021
bd9c5c7
aca_message_pulsar_consumer runs as instance
Fangjin98 Dec 22, 2021
956ba37
Merge pull request #4 from Fangjin98/master
luyaoluo Dec 22, 2021
2c9724a
add grpc thread for subscribe info
luyaoluo Dec 22, 2021
858209c
merge for adding resubscribe functions
luyaoluo Dec 22, 2021
ce08f7b
update subscribe info
luyaoluo Dec 22, 2021
aeb7027
delete multicast producer test cases
Fangjin98 Dec 29, 2021
7ab2543
replace construct function with init function
Fangjin98 Dec 30, 2021
886059e
add subscribe topic info test
Fangjin98 Dec 30, 2021
fb76dda
use get_instance function to init pulsar consumer
Fangjin98 Dec 30, 2021
2921c0d
remove multicast test and recovered test
Fangjin98 Dec 30, 2021
35d4e19
Merge pull request #5 from Fangjin98/master
luyaoluo Dec 30, 2021
014c802
merge for subscribe info test
luyaoluo Dec 30, 2021
31019af
fix an error in cmakelists
luyaoluo Dec 30, 2021
4228bee
change topic name type string to vector
luyaoluo Jan 6, 2022
76e014b
add an option to launch topic client
Fangjin98 Jan 7, 2022
652ddef
remove static topic string
Fangjin98 Jan 7, 2022
34062d8
add pulsar consumer multi-subscribe test
Fangjin98 Jan 7, 2022
80eac63
add specific logs to mq
Fangjin98 Mar 7, 2022
244ec44
add logs distinguish grpc and mq.
Fangjin98 Mar 7, 2022
90a7870
Merge pull request #6 from Fangjin98/master
luyaoluo Mar 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions include/aca_async_grpc_subscribe_server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// MIT License
// Copyright(c) 2020 Futurewei Cloud
//
// Permission is hereby granted,
// free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons
// to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#include <iostream>

#include <grpcpp/grpcpp.h>
#include <grpc/support/log.h>
#include "subscribeinfoprovisioner.grpc.pb.h"

using grpc::Server;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::Status;

class Aca_Async_GRPC_Subscribe_Server final {
public:
~Aca_Async_GRPC_Subscribe_Server();
Aca_Async_GRPC_Subscribe_Server();
void Run();
void StopServer();

private:
class CallData {
public:
CallData(alcor::schema::SubscribeInfoProvisioner::AsyncService *service,
ServerCompletionQueue *cq);
void Proceed();

private:
alcor::schema::SubscribeInfoProvisioner::AsyncService *service_;
ServerCompletionQueue *cq_;
ServerContext ctx_;
alcor::schema::GoalState request_;
alcor::schema::GoalStateOperationReply reply_;
ServerAsyncResponseWriter<alcor::schema::GoalStateOperationReply> responder_;

enum CallStatus { CREATE, PROCESS, FINISH };
CallStatus status_;
};

void HandleRpcs();
std::unique_ptr<ServerCompletionQueue> cq_;
alcor::schema::GoalStateProvisioner::AsyncService service_;
std::unique_ptr<Server> server_;
};
4 changes: 4 additions & 0 deletions include/aca_comm_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define ACA_COMM_MGR_H

#include "goalstateprovisioner.grpc.pb.h"
#include "subscribeinfoprovisioner.grpc.pb.h"

using std::string;

Expand All @@ -37,6 +38,9 @@ class Aca_Comm_Manager {
int update_goal_state(alcor::schema::GoalStateV2 &goal_state_message,
alcor::schema::GoalStateOperationReply &gsOperationReply);

int update_subscribe_info(alcor::schema::NodeSubscribeInfo &subscribe_info_message,
alcor::schema::SubscribeOperationReply &subscribeOperationReply);

// compiler will flag error when below is called
Aca_Comm_Manager(Aca_Comm_Manager const &) = delete;
void operator=(Aca_Comm_Manager const &) = delete;
Expand Down
96 changes: 96 additions & 0 deletions include/aca_grpc_subscribe.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// MIT License
// Copyright(c) 2020 Futurewei Cloud
//
// Permission is hereby granted,
// free of charge, to any person obtaining a copy of this software and associated documentation files(the "Software"), to deal in the Software without restriction,
// including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and / or sell copies of the Software, and to permit persons
// to whom the Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
// WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

#include <iostream>

#include <grpcpp/grpcpp.h>
#include <grpc/support/log.h>
#include "subscribeinfoprovisioner.grpc.pb.h"
#include "ctpl/ctpl_stl.h"

using namespace alcor::schema;
using grpc::Server;
using grpc::ServerAsyncReader;
using grpc::ServerAsyncReaderWriter;
using grpc::ServerAsyncResponseWriter;
using grpc::ServerBuilder;
using grpc::ServerCompletionQueue;
using grpc::ServerContext;
using grpc::ServerReader;
using grpc::ServerReaderWriter;
using grpc::ServerWriter;
using grpc::Status;

class SubscribeInfoProvisionerAsyncServer {
public:
~SubscribeInfoProvisionerAsyncServer()
{
this->keepReadingFromCq_ = false;
}

/*
Base class that represents a gRPC call.
When you have a new kind of rpc, add the corresponding enum to CallType
*/
struct AsyncSubscribeInfoProvionerCallBase {
/*
Currently there are two types of CallStatus, INIT and SENT
At the INIT state, a streaming/unary rpc call creates a new streaming/unary call instance,
requests the call and then processes the received data;
AT the SENT state, a streaming call doesn't do anything; but a unary call deletes its own instance,
since this call is already done.
*/
enum CallStatus { INIT, SENT, DESTROY };
CallStatus status_;
grpc::ServerContext ctx_;
};

// struct for PushNodeSubscribeInfoAsyncCall, which is a unary gRPC call
// when adding a new unary rpc call, create a new struct just like PushNodeSubscribeInfoAsyncCall
struct PushNodeSubscribeInfoAsyncCall : public AsyncSubscribeInfoProvionerCallBase {
// Received SubscribeInfo
NodeSubscribeInfo subscribeInfo_;
// Reply to be sent
SubscribeOperationReply subscribeOperationReply_;

// Object to send reply to client
grpc::ServerAsyncResponseWriter<alcor::schema::SubscribeOperationReply> responder_;

// Constructor
PushNodeSubscribeInfoAsyncCall() : responder_(&ctx_)
{
}
};

std::unique_ptr<SubscribeInfoProvisioner::Stub> stub_;
std::shared_ptr<grpc_impl::Channel> chan_;

Status ShutDownServer();
void RunServer(int thread_pool_size);
void AsyncWorkder();
/*
Add a corresponding function here to process a new kind of rpc call.
For unary rpcs, please refer to ProcessNodeSubscribeInfoAsyncCall

*/
void ProcessPushNodeSubscribeInfoAsyncCall(AsyncSubscribeInfoProvionerCallBase *baseCall,
bool ok);

private:
bool keepReadingFromCq_ = true;
std::unique_ptr<Server> server_;
std::unique_ptr<ServerCompletionQueue> cq_;
SubscribeInfoProvisioner::AsyncService service_;
ctpl::thread_pool thread_pool_;
};
19 changes: 13 additions & 6 deletions include/aca_message_pulsar_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

using namespace pulsar;
using std::string;
using std::vector;

namespace aca_message_pulsar
{
Expand All @@ -37,7 +38,7 @@ class ACA_Message_Pulsar_Consumer {
string unicast_subscription_name; // Subscription name of the unicast pulsar consumer

string multicast_topic_name; //A string representation of the topic to be consumed, for example: /hostid/00000000-0000-0000-0000-000000000000/netwconf/
string unicast_topic_name;
vector<string> unicast_topic_name = vector<string>();

ConsumerConfiguration multicast_consumer_config; //Configuration of the mulitcast pulsar consumer
ConsumerConfiguration unicast_consumer_config; //Configuration of the unicast pulsar consumer
Expand All @@ -59,12 +60,16 @@ class ACA_Message_Pulsar_Consumer {

void setUnicastTopicName(string topic);



public:
ACA_Message_Pulsar_Consumer(string topic, string brokers, string subscription_name);

~ACA_Message_Pulsar_Consumer();
static ACA_Message_Pulsar_Consumer &get_instance();

ACA_Message_Pulsar_Consumer();

~ACA_Message_Pulsar_Consumer();

void init(string topic, string brokers, string subscription_name);

string getBrokers() const;

Expand All @@ -76,13 +81,15 @@ class ACA_Message_Pulsar_Consumer {

string getUnicastSubscriptionName() const;

static string getRecoveredTopicName();

bool multicastConsumerDispatched();

bool unicastConsumerDispatched(int stickyHash);

//static void listener(Consumer consumer, const Message& message);
bool unicastUnsubscribeAll();


bool unicastResubscribe(bool unSubscribe, string topic="", string stickHash="");
};

} // namespace aca_message_pulsar
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(SOURCES
./comm/aca_message_pulsar_producer.cpp
./comm/aca_comm_mgr.cpp
./comm/aca_grpc.cpp
./comm/aca_grpc_subscribe.cpp
./comm/aca_grpc_client.cpp
./dp_abstraction/aca_goal_state_handler.cpp
./dp_abstraction/aca_dataplane_ovs.cpp
Expand Down
44 changes: 38 additions & 6 deletions src/aca_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "aca_log.h"
#include "aca_util.h"
#include "aca_message_pulsar_consumer.h"
#include "aca_grpc_subscribe.h"
#include "aca_grpc.h"
#include "aca_grpc_client.h"

Expand All @@ -29,6 +30,7 @@
#include "aca_ovs_control.h"

#include "goalstateprovisioner.grpc.pb.h"
#include "subscribeinfoprovisioner.grpc.pb.h"
#include <thread>
#include <chrono>
#include <unistd.h> /* for getopt */
Expand All @@ -44,23 +46,27 @@ using std::string;
static char EMPTY_STRING[] = "";
static char BROKER_LIST[] = "pulsar://localhost:6650";
static char PULSAR_TOPIC[] = "Host-ts-1";
static char PULSAR_SUBSCRIPTION_NAME[] = "Test-Subscription";
static char PULSAR_SUBSCRIPTION_NAME[] = "test-subscription";
static char GRPC_SERVER_PORT[] = "50001";
static char GRPC_SUBSCRIBE_SERVER_PORT[] = "50002";
static char OFCTL_COMMAND[] = "monitor";
static char OFCTL_TARGET[] = "br-int";

using namespace std;
using namespace std;

// Global variables
std::thread *g_grpc_server_thread = NULL;
std::thread *g_grpc_subscribe_server_thread = NULL;
std::thread *g_grpc_client_thread = NULL;
GoalStateProvisionerAsyncServer *g_grpc_server = NULL;
SubscribeInfoProvisionerAsyncServer *g_grpc_subscribe_server = NULL;
GoalStateProvisionerClientImpl *g_grpc_client = NULL;
string g_broker_list = EMPTY_STRING;
string g_pulsar_topic = EMPTY_STRING;
string g_pulsar_subsription_name = EMPTY_STRING;
string g_pulsar_hashed_key = "0";
string g_pulsar_hashed_key = "49775";
string g_grpc_server_port = EMPTY_STRING;
string g_grpc_subscribe_server_port = EMPTY_STRING;
string g_ofctl_command = EMPTY_STRING;
string g_ofctl_target = EMPTY_STRING;
string g_ofctl_options = EMPTY_STRING;
Expand Down Expand Up @@ -138,6 +144,23 @@ static void aca_cleanup()
ACA_LOG_ERROR("%s", "Unable to call delete, grpc server thread pointer is null.\n");
}

if (g_grpc_subscribe_server != NULL) {
g_grpc_subscribe_server->ShutDownServer();
delete g_grpc_subscribe_server;
g_grpc_subscribe_server = NULL;
ACA_LOG_INFO("%s", "Cleaned up grpc subscribe server.\n");
} else {
ACA_LOG_ERROR("%s", "Unable to call delete, grpc subscribe server pointer is null.\n");
}

if (g_grpc_subscribe_server_thread != NULL) {
delete g_grpc_subscribe_server_thread;
g_grpc_subscribe_server_thread = NULL;
ACA_LOG_INFO("%s", "Cleaned up grpc subscribe server thread.\n");
} else {
ACA_LOG_ERROR("%s", "Unable to call delete, grpc subscribe server thread pointer is null.\n");
}

// Stop the grpc client
if (g_grpc_client != NULL) {
delete g_grpc_client;
Expand Down Expand Up @@ -253,6 +276,9 @@ int main(int argc, char *argv[])
if (g_grpc_server_port == EMPTY_STRING) {
g_grpc_server_port = GRPC_SERVER_PORT;
}
if (g_grpc_subscribe_server_port == EMPTY_STRING) {
g_grpc_subscribe_server_port = GRPC_SUBSCRIBE_SERVER_PORT;
}
if (g_ofctl_command == EMPTY_STRING) {
g_ofctl_command = OFCTL_COMMAND;
}
Expand All @@ -265,6 +291,13 @@ int main(int argc, char *argv[])
&GoalStateProvisionerAsyncServer::RunServer, g_grpc_server, thread_pools_size));
g_grpc_server_thread->detach();


// Create a separate thread to get subsribe info for pulsar
g_grpc_subscribe_server = new SubscribeInfoProvisionerAsyncServer();
g_grpc_subscribe_server_thread = new std::thread(std::bind(
&SubscribeInfoProvisionerAsyncServer::RunServer, g_grpc_subscribe_server, 1));
g_grpc_subscribe_server_thread->detach();

// Create a separate thread to run the grpc client.
g_grpc_client = new GoalStateProvisionerClientImpl();
g_grpc_client_thread = new std::thread(
Expand Down Expand Up @@ -292,9 +325,8 @@ int main(int argc, char *argv[])
//// monitor br-tun for arp request message
//ACA_OVS_Control::get_instance().monitor("br-tun", "resume");

ACA_Message_Pulsar_Consumer network_config_consumer(g_pulsar_topic, g_broker_list, g_pulsar_subsription_name);
//network_config_consumer.multicastConsumerDispatched();
network_config_consumer.unicastConsumerDispatched(atoi(g_pulsar_hashed_key.c_str()));
ACA_Message_Pulsar_Consumer::get_instance().init(g_pulsar_topic, g_broker_list, g_pulsar_subsription_name);
ACA_Message_Pulsar_Consumer::get_instance().unicastConsumerDispatched(atoi(g_pulsar_hashed_key.c_str()));

pause();
aca_cleanup();
Expand Down
33 changes: 33 additions & 0 deletions src/comm/aca_comm_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#include "aca_comm_mgr.h"
#include "aca_goal_state_handler.h"
#include "aca_dhcp_state_handler.h"
#include "aca_message_pulsar_consumer.h"
#include "goalstateprovisioner.grpc.pb.h"
#include "subscribeinfoprovisioner.grpc.pb.h"

using namespace std;
using namespace alcor::schema;
using namespace aca_goal_state_handler;
using namespace aca_dhcp_state_handler;
using namespace aca_message_pulsar;

extern string g_rpc_server;
extern string g_rpc_protocol;
Expand Down Expand Up @@ -287,6 +290,36 @@ int Aca_Comm_Manager::update_goal_state(GoalStateV2 &goal_state_message,
return rc;
}

int Aca_Comm_Manager::update_subscribe_info(NodeSubscribeInfo &subscribe_info_message,
SubscribeOperationReply &subscribeOperationReply)
{
int exec_command_rc = EXIT_SUCCESS;
int rc = EXIT_SUCCESS;
auto start = chrono::steady_clock::now();


auto subscribe_finished_time = chrono::steady_clock::now();
exec_command_rc = ACA_Message_Pulsar_Consumer::get_instance().unicastResubscribe(subscribe_info_message.subscribe_operation(),
subscribe_info_message.topic(),
subscribe_info_message.key());
auto subscribe_operation_time =
cast_to_microseconds(subscribe_finished_time - start).count();

ACA_LOG_INFO("[METRICS] Elapsed time for subscribe operation took: %ld microseconds or %ld milliseconds\n",
subscribe_operation_time, us_to_ms(subscribe_operation_time));

OperationStatus operation_status;

if (exec_command_rc == EXIT_SUCCESS)
operation_status = OperationStatus::SUCCESS;
else
operation_status = OperationStatus::FAILURE;
subscribeOperationReply.set_operationstatus(operation_status);

return rc;
}


void Aca_Comm_Manager::print_goal_state(GoalState parsed_struct)
{
if (g_debug_mode == false) {
Expand Down
Loading