Skip to content

Commit

Permalink
Replace RedisClient with DBConnector (sonic-net#1439)
Browse files Browse the repository at this point in the history
  • Loading branch information
qiluo-msft authored and Kavin Kamaraj committed Jan 21, 2021
1 parent df38337 commit 3bd45f9
Show file tree
Hide file tree
Showing 9 changed files with 1,000 additions and 24 deletions.
739 changes: 739 additions & 0 deletions mclagsyncd/mclaglink.cpp

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions mclagsyncd/mclaglink.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/* Copyright(c) 2016-2019 Nephos.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU General Public License,
* version 2, as published by the Free Software Foundation.
*
* This program is distributed in the hope it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, see <http://www.gnu.org/licenses/>.
*
* The full GNU General Public License is included in this distribution in
* the file called "COPYING".
*
* Maintainer: Jim Jiang from nephos
*/

#ifndef __MCLAGLINK__
#define __MCLAGLINK__

#include <arpa/inet.h>
#include <sys/socket.h>
#include <linux/netlink.h>
#include <linux/rtnetlink.h>

#include <errno.h>
#include <assert.h>
#include <unistd.h>
#include <exception>
#include <string>
#include <map>
#include <set>

#include "producerstatetable.h"
#include "selectable.h"
#include "redisclient.h"
#include "mclagsyncd/mclag.h"

namespace swss {

#define ETHER_ADDR_STR_LEN 18
#define MAX_L_PORT_NAME 20

struct mclag_fdb_info
{
char mac[ETHER_ADDR_STR_LEN];
unsigned int vid;
char port_name[MAX_L_PORT_NAME];
short type; /*dynamic or static*/
short op_type; /*add or del*/
};

struct mclag_fdb
{
std::string mac;
unsigned int vid;
std::string port_name;
std::string type;/*dynamic or static*/

mclag_fdb(std::string val_mac, unsigned int val_vid, std::string val_pname,
std::string val_type) : mac(val_mac), vid(val_vid), port_name(val_pname), type(val_type)
{
}
mclag_fdb()
{
}

bool operator <(const mclag_fdb &fdb) const
{
if (mac != fdb.mac)
return mac < fdb.mac;
else if (vid != fdb.vid)
return vid < fdb.vid;
else
return port_name < fdb.port_name;
//else if (port_name != fdb.port_name) return port_name < fdb.port_name;
//else return type <fdb.type;
}

bool operator ==(const mclag_fdb &fdb) const
{
if (mac != fdb.mac)
return 0;
if (vid != fdb.vid)
return 0;
return 1;
}

};

class MclagLink : public Selectable {
public:
const int MSG_BATCH_SIZE;
ProducerStateTable * p_port_tbl;
ProducerStateTable * p_lag_tbl;
ProducerStateTable * p_tnl_tbl;
ProducerStateTable * p_intf_tbl;
ProducerStateTable *p_fdb_tbl;
ProducerStateTable *p_acl_table_tbl;
ProducerStateTable *p_acl_rule_tbl;
DBConnector *p_appl_db;
DBConnector *p_asic_db; /*redis client access to ASIC_DB*/
DBConnector *p_counters_db; /*redis client access to COUNTERS_DB*/
std::set <mclag_fdb> *p_old_fdb;

MclagLink(uint16_t port = MCLAG_DEFAULT_PORT);
virtual ~MclagLink();

/* Wait for connection (blocking) */
void accept();

int getFd() override;
uint64_t readData() override;

/* readMe throws MclagConnectionClosedException when connection is lost */
class MclagConnectionClosedException : public std::exception
{
};

private:
unsigned int m_bufSize;
char *m_messageBuffer;
char *m_messageBuffer_send;
unsigned int m_pos;

bool m_connected;
bool m_server_up;
int m_server_socket;
int m_connection_socket;

void getOidToPortNameMap(std::unordered_map<std::string, std:: string> & port_map);
void getBridgePortIdToAttrPortIdMap(std::map<std::string, std:: string> *oid_map);
void getVidByBvid(std::string &bvid, std::string &vlanid);
void getFdbSet(std::set<mclag_fdb> *fdb_set);
void setPortIsolate(char *msg);
void setPortMacLearnMode(char *msg);
void setFdbFlush();
void setFdbFlushByPort(char *msg);
void setIntfMac(char *msg);
void setFdbEntry(char *msg, int msg_len);
ssize_t getFdbChange(char *msg_buf);
void connectionLostHandlePortIsolate();
void connectionLostHandlePortLearnMode();
void connectionLost();
};

}
#endif

96 changes: 96 additions & 0 deletions mclagsyncd/mclagsyncd.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* Copyright(c) 2016-2019 Nephos.
*
* This program is free software; you can redistribute it and/or modify it
* under the terms and conditions of the GNU General Public License,
* version 2, as published by the Free Software Foundation.
*
* This program is distributed in the hope it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
* more details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, see <http://www.gnu.org/licenses/>.
*
* The full GNU General Public License is included in this distribution in
* the file called "COPYING".
*
* Maintainer: Jim Jiang from nephos
*/
#include <iostream>
#include "logger.h"
#include <map>
#include "select.h"
#include "netdispatcher.h"
#include "mclagsyncd/mclaglink.h"
#include <set>

using namespace std;
using namespace swss;

int main(int argc, char **argv)
{
swss::Logger::linkToDbNative("mclagsyncd");
DBConnector appl_db("APPL_DB", 0);
DBConnector asic_db("ASIC_DB", 0);
DBConnector counters_db("COUNTERS_DB", 0);
ProducerStateTable port_tbl(&appl_db, APP_PORT_TABLE_NAME);
ProducerStateTable lag_tbl(&appl_db, APP_LAG_TABLE_NAME);
ProducerStateTable tnl_tbl(&appl_db, APP_VXLAN_TUNNEL_TABLE_NAME);
ProducerStateTable intf_tbl(&appl_db, APP_INTF_TABLE_NAME);
ProducerStateTable fdb_tbl(&appl_db, APP_FDB_TABLE_NAME);
ProducerStateTable acl_table_tbl(&appl_db, APP_ACL_TABLE_TABLE_NAME);
ProducerStateTable acl_rule_tbl(&appl_db, APP_ACL_RULE_TABLE_NAME);
map <string, string> isolate;
RedisPipeline pipeline(&appl_db);
set <mclag_fdb> old_fdb;

while (1)
{
try
{
MclagLink mclag;
Select s;

mclag.p_port_tbl = &port_tbl;
mclag.p_lag_tbl = &lag_tbl;
mclag.p_tnl_tbl = &tnl_tbl;
mclag.p_intf_tbl = &intf_tbl;
mclag.p_fdb_tbl = &fdb_tbl;
mclag.p_acl_table_tbl = &acl_table_tbl;
mclag.p_acl_rule_tbl = &acl_rule_tbl;
mclag.p_appl_db = &appl_db;
mclag.p_asic_db = &asic_db;
mclag.p_counters_db = &counters_db;
mclag.p_old_fdb = &old_fdb;

cout << "Waiting for connection..." << endl;
mclag.accept();
cout << "Connected!" << endl;

s.addSelectable(&mclag);

while (true)
{
Selectable *temps;

/* Reading MCLAG messages forever (and calling "readData" to read them) */
s.select(&temps);
pipeline.flush();
SWSS_LOG_DEBUG("Pipeline flushed");
}
}
catch (MclagLink::MclagConnectionClosedException &e)
{
cout << "Connection lost, reconnecting..." << endl;
}
catch (const exception& e)
{
cout << "Exception \"" << e.what() << "\" had been thrown in deamon" << endl;
return 0;
}
}

return 1;
}

7 changes: 3 additions & 4 deletions orchagent/bufferorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ BufferOrch::BufferOrch(DBConnector *db, vector<string> &tableNames) :
m_flexCounterDb(new DBConnector("FLEX_COUNTER_DB", 0)),
m_flexCounterTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_TABLE)),
m_flexCounterGroupTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_GROUP_TABLE)),
m_countersDb(new DBConnector("COUNTERS_DB", 0)),
m_countersDbRedisClient(m_countersDb.get())
m_countersDb(new DBConnector("COUNTERS_DB", 0))
{
SWSS_LOG_ENTER();
initTableHandlers();
Expand Down Expand Up @@ -361,7 +360,7 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer)
// Specifically, we push the buffer pool name to oid mapping upon the creation of the oid
// In pg and queue case, this mapping installment is deferred to FlexCounterOrch at a reception of field
// "FLEX_COUNTER_STATUS"
m_countersDbRedisClient.hset(COUNTERS_BUFFER_POOL_NAME_MAP, object_name, sai_serialize_object_id(sai_object));
m_countersDb->hset(COUNTERS_BUFFER_POOL_NAME_MAP, object_name, sai_serialize_object_id(sai_object));
}
}
else if (op == DEL_COMMAND)
Expand All @@ -375,7 +374,7 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer)
SWSS_LOG_NOTICE("Removed buffer pool %s with type %s", object_name.c_str(), map_type_name.c_str());
auto it_to_delete = (m_buffer_type_maps[map_type_name])->find(object_name);
(m_buffer_type_maps[map_type_name])->erase(it_to_delete);
m_countersDbRedisClient.hdel(COUNTERS_BUFFER_POOL_NAME_MAP, object_name);
m_countersDb->hdel(COUNTERS_BUFFER_POOL_NAME_MAP, object_name);
}
else
{
Expand Down
1 change: 0 additions & 1 deletion orchagent/bufferorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class BufferOrch : public Orch
unique_ptr<ProducerTable> m_flexCounterTable;

unique_ptr<DBConnector> m_countersDb;
RedisClient m_countersDbRedisClient;

bool m_isBufferPoolWatermarkCounterIdListGenerated = false;
};
Expand Down
3 changes: 1 addition & 2 deletions orchagent/countercheckorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,12 @@ QueueMcCounters CounterCheckOrch::getQueueMcCounters(

vector<FieldValueTuple> fieldValues;
QueueMcCounters counters;
RedisClient redisClient(m_countersDb.get());

for (uint8_t prio = 0; prio < port.m_queue_ids.size(); prio++)
{
sai_object_id_t queueId = port.m_queue_ids[prio];
auto queueIdStr = sai_serialize_object_id(queueId);
auto queueType = redisClient.hget(COUNTERS_QUEUE_TYPE_MAP, queueIdStr);
auto queueType = m_countersDb->hget(COUNTERS_QUEUE_TYPE_MAP, queueIdStr);

if (queueType.get() == nullptr || *queueType != "SAI_QUEUE_TYPE_MULTICAST" || !m_countersTable->get(queueIdStr, fieldValues))
{
Expand Down
21 changes: 8 additions & 13 deletions orchagent/pfcwdorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -364,9 +364,8 @@ void PfcWdSwOrch<DropHandler, ForwardHandler>::disableBigRedSwitchMode()
}

auto queueId = entry.first;
RedisClient redisClient(this->getCountersDb().get());
string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId);
redisClient.hdel(countersKey, "BIG_RED_SWITCH_MODE");
this->getCountersDb()->hdel(countersKey, "BIG_RED_SWITCH_MODE");
}

m_brsEntryMap.clear();
Expand Down Expand Up @@ -642,9 +641,8 @@ void PfcWdSwOrch<DropHandler, ForwardHandler>::unregisterFromWdDb(const Port& po
m_entryMap.erase(queueId);

// Clean up
RedisClient redisClient(this->getCountersDb().get());
string countersKey = this->getCountersTable()->getTableName() + this->getCountersTable()->getTableNameSeparator() + sai_serialize_object_id(queueId);
redisClient.hdel(countersKey, {"PFC_WD_DETECTION_TIME", "PFC_WD_RESTORATION_TIME", "PFC_WD_ACTION", "PFC_WD_STATUS"});
this->getCountersDb()->hdel(countersKey, {"PFC_WD_DETECTION_TIME", "PFC_WD_RESTORATION_TIME", "PFC_WD_ACTION", "PFC_WD_STATUS"});
}

}
Expand All @@ -666,8 +664,7 @@ PfcWdSwOrch<DropHandler, ForwardHandler>::PfcWdSwOrch(
c_queueAttrIds(queueAttrIds),
m_pollInterval(pollInterval),
m_applDb(make_shared<DBConnector>("APPL_DB", 0)),
m_applTable(make_shared<Table>(m_applDb.get(), APP_PFC_WD_TABLE_NAME "_INSTORM")),
m_applDbRedisClient(m_applDb.get())
m_applTable(make_shared<Table>(m_applDb.get(), APP_PFC_WD_TABLE_NAME "_INSTORM"))
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -943,7 +940,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
entry->second.handler->initCounters();
// Log storm event to APPL_DB for warm-reboot purpose
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
}
}
else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_DROP)
Expand All @@ -965,7 +962,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
entry->second.handler->initCounters();
// Log storm event to APPL_DB for warm-reboot purpose
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
}
}
else if (entry->second.action == PfcWdAction::PFC_WD_ACTION_FORWARD)
Expand All @@ -987,7 +984,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
entry->second.handler->initCounters();
// Log storm event to APPL_DB for warm-reboot purpose
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
m_applDbRedisClient.hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
m_applDb->hset(key, to_string(entry->second.index), PFC_WD_IN_STORM);
}
}
else
Expand All @@ -1011,7 +1008,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::startWdActionOnQueue(const string
entry->second.handler = nullptr;
// Remove storm status in APPL_DB for warm-reboot purpose
string key = m_applTable->getTableName() + m_applTable->getTableNameSeparator() + entry->second.portAlias;
m_applDbRedisClient.hdel(key, to_string(entry->second.index));
m_applDb->hdel(key, to_string(entry->second.index));
}
}
else
Expand All @@ -1028,8 +1025,6 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::bake()
{
// clean all *_last and *_LEFT fields in COUNTERS_TABLE
// to allow warm-reboot pfc detect & restore state machine to enter the same init state as cold-reboot
RedisClient redisClient(this->getCountersDb().get());

vector<string> cKeys;
this->getCountersTable()->getKeys(cKeys);
for (const auto &key : cKeys)
Expand All @@ -1046,7 +1041,7 @@ bool PfcWdSwOrch<DropHandler, ForwardHandler>::bake()
}
if (!wLasts.empty())
{
redisClient.hdel(
this->getCountersDb()->hdel(
this->getCountersTable()->getTableName()
+ this->getCountersTable()->getTableNameSeparator()
+ key,
Expand Down
2 changes: 0 additions & 2 deletions orchagent/pfcwdorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class PfcWdSwOrch: public PfcWdOrch<DropHandler, ForwardHandler>
shared_ptr<DBConnector> m_applDb = nullptr;
// Track queues in storm
shared_ptr<Table> m_applTable = nullptr;
// used for hset and hdel
RedisClient m_applDbRedisClient;
};

#endif
Loading

0 comments on commit 3bd45f9

Please sign in to comment.