Skip to content
This repository has been archived by the owner on Mar 31, 2023. It is now read-only.

Commit

Permalink
Zeta Data path integration fixes (#191)
Browse files Browse the repository at this point in the history
* fixed some bugs and add fwd's mac_address
  • Loading branch information
zhangml authored Jan 14, 2021
1 parent 404cc8b commit 68680e4
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 30 deletions.
30 changes: 27 additions & 3 deletions include/aca_zeta_programming.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,30 @@ static atomic_uint current_available_group_id(1);

namespace aca_zeta_programming
{
class FWD_Info {
public:
string ip_addr;
string mac_addr;

FWD_Info(){};
FWD_Info(string ip_addr, string mac_addr)
{
this->ip_addr = ip_addr;
this->mac_addr = mac_addr;
}

// Overload "==" for hash operation
bool operator==(const FWD_Info &other) const
{
return ((ip_addr == other.ip_addr) && (mac_addr == other.mac_addr));
};
};
struct zeta_config {
uint group_id;
uint oam_port;
// set<gateway_node_ip_address>
unordered_set<string> zeta_buckets;

// CTSL::HashMap <key: FWD_Info, value: int* (not used)>
CTSL::HashMap<FWD_Info *, int *> zeta_buckets;
};

class ACA_Zeta_Programming {
Expand All @@ -40,7 +59,8 @@ class ACA_Zeta_Programming {
~ACA_Zeta_Programming();
static ACA_Zeta_Programming &get_instance();

void create_entry(string zeta_gateway_id, uint oam_port);
void create_entry(string zeta_gateway_id, uint oam_port,
alcor::schema::AuxGateway current_AuxGateway);

void clear_all_data();

Expand All @@ -64,10 +84,14 @@ class ACA_Zeta_Programming {
int _delete_group_punt_rule(uint tunnel_id);

int _create_zeta_group_entry(zeta_config *zeta_config_in);
int _update_zeta_group_entry(zeta_config *zeta_config_in);
int _delete_zeta_group_entry(zeta_config *zeta_config_in);

// hashtable <key: zeta_gateway_id, value: zeta_config>
CTSL::HashMap<string, zeta_config *> _zeta_config_table;

//The mutex for modifying group table entry
std::timed_mutex _group_operation_mutex;
};
} // namespace aca_zeta_programming
#endif // #ifndef ACA_ZETA_PROGRAMMING_H
143 changes: 117 additions & 26 deletions src/zeta/aca_zeta_programming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,27 @@ ACA_Zeta_Programming &ACA_Zeta_Programming::get_instance()
return instance;
}

void ACA_Zeta_Programming::create_entry(string zeta_gateway_id, uint oam_port)
void ACA_Zeta_Programming::create_entry(string zeta_gateway_id, uint oam_port,
alcor::schema::AuxGateway current_AuxGateway)
{
ACA_LOG_DEBUG("%s", "ACA_Zeta_Programming::create_entry ---> Entering\n");

zeta_config *new_zeta_cfg = new zeta_config;
new_zeta_cfg->oam_port = oam_port;
// fetch the value first to used for new_zeta_cfg->group_id
// then add 1 after, doing both atomically
// std::memory_order_relaxed option won't help much for x86 but other
// CPU architecture can take advantage of it
new_zeta_cfg->group_id =
current_available_group_id.fetch_add(1, std::memory_order_relaxed);

new_zeta_cfg->oam_port = oam_port;
// fill in the ip_address and mac_address of fwds
for (auto destination : current_AuxGateway.destinations()) {
FWD_Info *new_fwd =
new FWD_Info(destination.ip_address(), destination.mac_address());
new_zeta_cfg->zeta_buckets.insert(new_fwd, nullptr);
}

_zeta_config_table.insert(zeta_gateway_id, new_zeta_cfg);

ACA_LOG_DEBUG("%s", "ACA_Zeta_Programming::create_entry <--- Exiting\n");
Expand Down Expand Up @@ -181,34 +189,54 @@ int ACA_Zeta_Programming::create_zeta_config(const alcor::schema::AuxGateway cur
{
ACA_LOG_DEBUG("%s", "ACA_Zeta_Programming::create_zeta_config ---> Entering\n");
int overall_rc = EXIT_SUCCESS;
zeta_config *new_zeta_cfg;
zeta_config *current_zeta_cfg;
bool bucket_not_found = false;
unordered_set<string> new_zeta_buckets;
CTSL::HashMap<FWD_Info *, int *> new_zeta_buckets;

uint oam_port = current_AuxGateway.zeta_info().port_inband_operation();

if (!_zeta_config_table.find(current_AuxGateway.id(), new_zeta_cfg)) {
create_entry(current_AuxGateway.id(), oam_port);
if (!_zeta_config_table.find(current_AuxGateway.id(), current_zeta_cfg)) {
create_entry(current_AuxGateway.id(), oam_port, current_AuxGateway);
_zeta_config_table.find(current_AuxGateway.id(), current_zeta_cfg);

overall_rc = _create_zeta_group_entry(current_zeta_cfg);

_create_oam_ofp(oam_port);
// add oam port number to cache
aca_zeta_oam_server::ACA_Zeta_Oam_Server::get_instance().add_oam_port_cache(oam_port);
} else {
if (current_zeta_cfg->zeta_buckets.hashSize !=
(uint)current_AuxGateway.destinations().size()) {
bucket_not_found = true;
} else {
for (auto destination : current_AuxGateway.destinations()) {
FWD_Info *target_fwd =
new FWD_Info(destination.ip_address(), destination.mac_address());

int *found = nullptr;

if (current_zeta_cfg->zeta_buckets.find(target_fwd, found)) {
continue;
} else {
bucket_not_found |= true;
break;
}
new_zeta_buckets.insert(target_fwd, nullptr);
}
}

_zeta_config_table.find(current_AuxGateway.id(), new_zeta_cfg);
}
// If the buckets have changed, update the buckets and group table rules.
if (bucket_not_found == true) {
current_zeta_cfg->zeta_buckets.clear();
for (auto destination : current_AuxGateway.destinations()) {
FWD_Info *target_fwd =
new FWD_Info(destination.ip_address(), destination.mac_address());

for (auto destination : current_AuxGateway.destinations()) {
if (new_zeta_cfg->zeta_buckets.find(destination.ip_address()) ==
new_zeta_cfg->zeta_buckets.end()) {
bucket_not_found |= true;
}
new_zeta_buckets.insert(destination.ip_address());
}
current_zeta_cfg->zeta_buckets.insert(target_fwd, nullptr);
}

// If the buckets have changed, update the buckets and group table rules.
if (new_zeta_cfg->zeta_buckets.size() != new_zeta_buckets.size() ||
bucket_not_found == true) {
new_zeta_cfg->zeta_buckets = new_zeta_buckets;
overall_rc = _create_zeta_group_entry(new_zeta_cfg);
overall_rc = _update_zeta_group_entry(current_zeta_cfg);
}
}

// get the current auxgateway_id of vpc
Expand All @@ -217,7 +245,7 @@ int ACA_Zeta_Programming::create_zeta_config(const alcor::schema::AuxGateway cur
ACA_LOG_INFO("%s", "The vpc currently has not auxgateway set!\n");
ACA_Vlan_Manager::get_instance().set_zeta_gateway(tunnel_id,
current_AuxGateway.id());
_create_group_punt_rule(tunnel_id, new_zeta_cfg->group_id);
_create_group_punt_rule(tunnel_id, current_zeta_cfg->group_id);
} else {
ACA_LOG_INFO("%s", "The vpc currently has an auxgateway set!\n");
}
Expand Down Expand Up @@ -272,13 +300,76 @@ int ACA_Zeta_Programming::_create_zeta_group_entry(zeta_config *zeta_cfg)
ACA_LOG_DEBUG("%s", "ACA_Zeta_Programming::_create_zeta_group_entry ---> Entering\n");
unsigned long not_care_culminative_time;
int overall_rc = EXIT_SUCCESS;

// adding group table rule
string cmd = "-O OpenFlow13 add-group br-tun group_id=" + to_string(zeta_cfg->group_id) +
",type=select";
unordered_set<string>::iterator it;
for (it = zeta_cfg->zeta_buckets.begin(); it != zeta_cfg->zeta_buckets.end(); it++) {
cmd += ",bucket=\"set_field:" + *it + "->tun_dst,output:vxlan-generic\"";

for (size_t i = 0; i < zeta_cfg->zeta_buckets.hashSize; i++) {
auto hash_node = zeta_cfg->zeta_buckets.hashTable[i].head;
if (hash_node == nullptr) {
continue;
} else {
//-----Start share lock to enable mutiple concurrent reads-----
std::shared_lock<std::shared_timed_mutex> hash_bucket_lock(
(zeta_cfg->zeta_buckets.hashTable[i]).mutex_);

while (hash_node != nullptr) {
cmd += ",bucket=\"set_field:" + hash_node->getKey()->ip_addr +
"->tun_dst,mod_dl_dst:" + hash_node->getKey()->mac_addr +
",output:vxlan-generic\"";
hash_node = hash_node->next;
}
hash_bucket_lock.unlock();
//-----End share lock to enable mutiple concurrent reads-----
}
}

//-----Start unique lock-----
std::unique_lock<std::timed_mutex> group_entry_lock(_group_operation_mutex);
// add group table rule
aca_ovs_l2_programmer::ACA_OVS_L2_Programmer::get_instance().execute_openflow_command(
cmd, not_care_culminative_time, overall_rc);
group_entry_lock.unlock();
//-----End unique lock-----

if (overall_rc == EXIT_SUCCESS) {
ACA_LOG_INFO("%s", "create_zeta_group_entry succeeded!\n");
} else {
ACA_LOG_ERROR("create_zeta_group_entry failed!!! overrall_rc: %d\n", overall_rc);
}

ACA_LOG_DEBUG("ACA_Zeta_Programming::_create_zeta_group_entry <--- Exiting, overall_rc = %d\n",
overall_rc);
return overall_rc;
}

int ACA_Zeta_Programming::_update_zeta_group_entry(zeta_config *zeta_cfg)
{
ACA_LOG_DEBUG("%s", "ACA_Zeta_Programming::_update_zeta_group_entry ---> Entering\n");
unsigned long not_care_culminative_time;
int overall_rc = EXIT_SUCCESS;
// adding group table rule
string cmd = "-O OpenFlow13 mod-group br-tun group_id=" + to_string(zeta_cfg->group_id) +
",type=select";

for (size_t i = 0; i < zeta_cfg->zeta_buckets.hashSize; i++) {
auto hash_node = zeta_cfg->zeta_buckets.hashTable[i].head;
if (hash_node == nullptr) {
continue;
} else {
//-----Start share lock to enable mutiple concurrent reads-----
std::shared_lock<std::shared_timed_mutex> hash_bucket_lock(
(zeta_cfg->zeta_buckets.hashTable[i]).mutex_);

while (hash_node != nullptr) {
cmd += ",bucket=\"set_field:" + hash_node->getKey()->ip_addr +
"->tun_dst,mod_dl_dst:" + hash_node->getKey()->mac_addr +
",output:vxlan-generic\"";
hash_node = hash_node->next;
}
hash_bucket_lock.unlock();
//-----End share lock to enable mutiple concurrent reads-----
}
}

// add group table rule
Expand All @@ -291,7 +382,7 @@ int ACA_Zeta_Programming::_create_zeta_group_entry(zeta_config *zeta_cfg)
ACA_LOG_ERROR("update_zeta_group_entry failed!!! overrall_rc: %d\n", overall_rc);
}

ACA_LOG_DEBUG("ACA_Zeta_Programming::_create_zeta_group_entry <--- Exiting, overall_rc = %d\n",
ACA_LOG_DEBUG("ACA_Zeta_Programming::_update_zeta_group_entry <--- Exiting, overall_rc = %d\n",
overall_rc);
return overall_rc;
}
Expand Down
10 changes: 9 additions & 1 deletion test/gtest/aca_test_oam.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "gtest/gtest.h"
#include "goalstateprovisioner.grpc.pb.h"
#define private public
#include "aca_zeta_oam_server.h"
#include "aca_util.h"
Expand All @@ -26,6 +27,7 @@ using namespace aca_zeta_oam_server;
using namespace aca_ovs_control;
using namespace aca_zeta_programming;
using namespace aca_ovs_l2_programmer;
using namespace alcor::schema;

extern string vmac_address_1;
extern string vmac_address_2;
Expand Down Expand Up @@ -90,8 +92,14 @@ TEST(oam_message_test_cases, oams_recv_valid)
aca_vlan_manager::ACA_Vlan_Manager::get_instance().create_entry(match.vni);
aca_vlan_manager::ACA_Vlan_Manager::get_instance().set_zeta_gateway(match.vni, auxGateway_id_1);

// fill in auxgateway state structs
AuxGateway new_auxGateway;
new_auxGateway.set_id(auxGateway_id_1);
AuxGateway_zeta *zeta_info = new_auxGateway.mutable_zeta_info();
zeta_info->set_port_inband_operation(oam_port_1); //port_ibo

// fill zeta_config_table
ACA_Zeta_Programming::get_instance().create_entry(auxGateway_id_1, oam_port_1);
ACA_Zeta_Programming::get_instance().create_entry(auxGateway_id_1, oam_port_1, new_auxGateway);

// flow injection
stOamMsg.op_code = htonl(0);
Expand Down
12 changes: 12 additions & 0 deletions test/gtest/aca_test_zeta_programming.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,18 @@ TEST(zeta_programming_test_cases, delete_zeta_config_valid)

TEST(zeta_programming_test_cases, create_auxgateway_test)
{
ulong not_care_culminative_time = 0;
int overall_rc;
// delete br-int and br-tun bridges
ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(
"del-br br-int", not_care_culminative_time, overall_rc);

ACA_OVS_L2_Programmer::get_instance().execute_ovsdb_command(
"del-br br-tun", not_care_culminative_time, overall_rc);

// create and setup br-int and br-tun bridges, and their patch ports
overall_rc = ACA_OVS_L2_Programmer::get_instance().setup_ovs_bridges_if_need();
ASSERT_EQ(overall_rc, EXIT_SUCCESS);
// from here.
int retcode;
GoalState GoalState_builder;
Expand Down

0 comments on commit 68680e4

Please sign in to comment.