Skip to content

Commit

Permalink
Warm reboot for PortsOrch (sonic-net#551)
Browse files Browse the repository at this point in the history
* Fix addExistingData consumer converstion
* Add more addExistingData()
* Warm reboot for PortsOrch
* Remove calling doPortConfigDoneTask in ctor
* Remove unused function signature
  • Loading branch information
qiluo-msft authored Aug 8, 2018
1 parent 674a5e6 commit bfce363
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 12 deletions.
74 changes: 70 additions & 4 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,10 @@ vector<Selectable *> Orch::getSelectables()
return selectables;
}

void Consumer::execute()
void Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
{
SWSS_LOG_ENTER();

std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);

/* Nothing popped */
if (entries.empty())
{
Expand Down Expand Up @@ -123,6 +120,47 @@ void Consumer::execute()
m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}
}

// TODO: Table should be const
void Consumer::refillToSync(Table* table)
{
std::deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
table->getKeys(keys);
for (const auto &key: keys)
{
KeyOpFieldsValuesTuple kco;

kfvKey(kco) = key;
kfvOp(kco) = SET_COMMAND;

if (!table->get(key, kfvFieldsValues(kco)))
{
continue;
}
entries.push_back(kco);
}

addToSync(entries);
}

void Consumer::refillToSync()
{
auto db = getConsumerTable()->getDbConnector();
string tableName = getConsumerTable()->getTableName();
auto table = Table(db, tableName);
refillToSync(&table);
}

void Consumer::execute()
{
SWSS_LOG_ENTER();

std::deque<KeyOpFieldsValuesTuple> entries;
getConsumerTable()->pops(entries);

addToSync(entries);

drain();
}
Expand All @@ -133,6 +171,34 @@ void Consumer::drain()
m_orch->doTask(*this);
}

bool Orch::addExistingData(const string& tableName)
{
Consumer* consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
if (consumer == NULL)
{
SWSS_LOG_ERROR("No consumer %s in Orch", tableName.c_str());
return false;
}

consumer->refillToSync();
return true;
}

// TODO: Table should be const
bool Orch::addExistingData(Table *table)
{
string tableName = table->getTableName();
Consumer* consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
if (consumer == NULL)
{
SWSS_LOG_ERROR("No consumer %s in Orch", tableName.c_str());
return false;
}

consumer->refillToSync(table);
return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down
13 changes: 10 additions & 3 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,21 +101,24 @@ class Executor : public Selectable

class Consumer : public Executor {
public:
Consumer(TableConsumable *select, Orch *orch)
Consumer(ConsumerTableBase *select, Orch *orch)
: Executor(select, orch)
{
}

TableConsumable *getConsumerTable() const
ConsumerTableBase *getConsumerTable() const
{
return static_cast<TableConsumable *>(getSelectable());
return static_cast<ConsumerTableBase *>(getSelectable());
}

string getTableName() const
{
return getConsumerTable()->getTableName();
}

void addToSync(std::deque<KeyOpFieldsValuesTuple> &entries);
void refillToSync();
void refillToSync(Table* table);
void execute();
void drain();

Expand Down Expand Up @@ -149,6 +152,10 @@ class Orch

vector<Selectable*> getSelectables();

// add the existing table data (left by warm reboot) to the consumer todo task list.
bool addExistingData(Table *table);
bool addExistingData(const string& tableName);

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
void doTask();

Expand Down
68 changes: 63 additions & 5 deletions orchagent/portsorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ PortsOrch::PortsOrch(DBConnector *db, vector<table_name_with_pri_t> &tableNames)
m_portStatusNotificationConsumer = new swss::NotificationConsumer(notificationsDb, "NOTIFICATIONS");
auto portStatusNotificatier = new Notifier(m_portStatusNotificationConsumer, this);
Orch::addExecutor("PORT_STATUS_NOTIFICATIONS", portStatusNotificatier);

// Try warm start
bake();
}

void PortsOrch::removeDefaultVlanMembers()
Expand Down Expand Up @@ -626,7 +629,7 @@ bool PortsOrch::bindAclTable(sai_object_id_t id, sai_object_id_t table_oid, sai_
{
// Bind this ACL group to LAG
sai_attribute_t lag_attr;
lag_attr.id = ingress ? SAI_LAG_ATTR_INGRESS_ACL : SAI_LAG_ATTR_EGRESS_ACL;
lag_attr.id = ingress ? SAI_LAG_ATTR_INGRESS_ACL : SAI_LAG_ATTR_EGRESS_ACL;
lag_attr.value.oid = groupOid;

status = sai_lag_api->set_lag_attribute(port.m_lag_id, &lag_attr);
Expand Down Expand Up @@ -1083,8 +1086,8 @@ bool PortsOrch::removePort(sai_object_id_t port_id)
Port p;
if (getPort(port_id, p))
{
PortUpdate update = {p, false };
notify(SUBJECT_TYPE_PORT_CHANGE, static_cast<void *>(&update));
PortUpdate update = {p, false };
notify(SUBJECT_TYPE_PORT_CHANGE, static_cast<void *>(&update));
}

sai_status_t status = sai_port_api->remove_port(port_id);
Expand Down Expand Up @@ -1156,8 +1159,8 @@ bool PortsOrch::initPort(const string &alias, const set<int> &lane_set)

m_flexCounterTable->set(key, fields);

PortUpdate update = {p, true };
notify(SUBJECT_TYPE_PORT_CHANGE, static_cast<void *>(&update));
PortUpdate update = {p, true };
notify(SUBJECT_TYPE_PORT_CHANGE, static_cast<void *>(&update));

SWSS_LOG_NOTICE("Initialized port %s", alias.c_str());
}
Expand All @@ -1177,6 +1180,55 @@ bool PortsOrch::initPort(const string &alias, const set<int> &lane_set)
return true;
}

bool PortsOrch::bake()
{
SWSS_LOG_ENTER();

// Check the APP_DB port table for warm reboot
vector<FieldValueTuple> tuples;
bool foundPortConfigDone = m_portTable->get("PortConfigDone", tuples);
SWSS_LOG_NOTICE("foundPortConfigDone = %d", foundPortConfigDone);

bool foundPortInitDone = m_portTable->get("PortInitDone", tuples);
SWSS_LOG_NOTICE("foundPortInitDone = %d", foundPortInitDone);

vector<string> keys;
m_portTable->getKeys(keys);
SWSS_LOG_NOTICE("m_portTable->getKeys %zd", keys.size());

if (!foundPortConfigDone || !foundPortInitDone)
{
SWSS_LOG_NOTICE("No port table, fallback to cold start");
cleanPortTable(keys);
return false;
}

if (m_portCount != keys.size() - 2)
{
// Invalid port table
SWSS_LOG_ERROR("Invalid port table: m_portCount");
cleanPortTable(keys);
return false;
}

addExistingData(m_portTable.get());
addExistingData(APP_LAG_TABLE_NAME);
addExistingData(APP_LAG_MEMBER_TABLE_NAME);
addExistingData(APP_VLAN_TABLE_NAME);
addExistingData(APP_VLAN_MEMBER_TABLE_NAME);

return true;
}

// Clean up port table
void PortsOrch::cleanPortTable(const vector<string>& keys)
{
for (auto& key : keys)
{
m_portTable->del(key);
}
}

void PortsOrch::doPortTask(Consumer &consumer)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -1279,6 +1331,12 @@ void PortsOrch::doPortTask(Consumer &consumer)
m_lanesAliasSpeedMap[lane_set] = make_tuple(alias, speed, an, fec_mode);
}

// TODO:
// Fix the issue below
// After PortConfigDone, while waiting for "PortInitDone" and the first gBufferOrch->isPortReady(alias),
// the complete m_lanesAliasSpeedMap may be populated again, so initPort() will be called more than once
// for the same port.

/* Once all ports received, go through the each port and perform appropriate actions:
* 1. Remove ports which don't exist anymore
* 2. Create new ports
Expand Down
2 changes: 2 additions & 0 deletions orchagent/portsorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class PortsOrch : public Orch, public Subject
bool isInitDone();

map<string, Port>& getAllPorts();
bool bake();
void cleanPortTable(const vector<string>& keys);
bool getBridgePort(sai_object_id_t id, Port &port);
bool getPort(string alias, Port &port);
bool getPort(sai_object_id_t id, Port &port);
Expand Down

0 comments on commit bfce363

Please sign in to comment.