Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[201811][everflow] Add retry mechanism for mirror sessions and policers #1966

Merged
merged 3 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions orchagent/aclorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -993,11 +993,6 @@ bool AclRuleMirror::validateAddAction(string attr_name, string attr_value)
return false;
}

if (!m_pMirrorOrch->sessionExists(attr_value))
{
return false;
}

m_sessionName = attr_value;

return true;
Expand Down Expand Up @@ -1084,6 +1079,14 @@ bool AclRuleMirror::create()
bool state = false;
sai_object_id_t oid = SAI_NULL_OBJECT_ID;

SWSS_LOG_NOTICE("Creating mirror rule %s", m_sessionName.c_str());

if (!m_pMirrorOrch->sessionExists(m_sessionName))
{
SWSS_LOG_ERROR("Mirror rule references mirror session \"%s\" that does not exist yet", m_sessionName.c_str());
return false;
}

if (!m_pMirrorOrch->getSessionStatus(m_sessionName, state))
{
throw runtime_error("Failed to get mirror session state");
Expand Down Expand Up @@ -2536,7 +2539,16 @@ void AclOrch::doAclRuleTask(Consumer &consumer)
}


newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
try
{
newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
}
catch (exception &e)
{
SWSS_LOG_ERROR("Error while creating ACL rule %s: %s", rule_id.c_str(), e.what());
it = consumer.m_toSync.erase(it);
return;
}

for (const auto& itr : kfvFieldsValues(t))
{
Expand Down
2 changes: 2 additions & 0 deletions orchagent/aclorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,8 @@ class AclOrch : public Orch, public Observer
bool m_isCombinedMirrorV6Table = true;
map<acl_table_type_t, bool> m_mirrorTableCapabilities;

using Orch::doTask; // Allow access to the basic doTask

private:
void doTask(Consumer &consumer);
void doAclTableTask(Consumer &consumer);
Expand Down
67 changes: 25 additions & 42 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ bool MirrorOrch::bake()
{
SWSS_LOG_ENTER();

// Freeze the route update during orchagent restoration
m_freeze = true;

deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
m_mirrorTable.getKeys(keys);
Expand Down Expand Up @@ -129,23 +126,6 @@ bool MirrorOrch::bake()
return Orch::bake();
}

bool MirrorOrch::postBake()
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Start MirrorOrch post-baking");

// Unfreeze the route update
m_freeze = false;

Orch::doTask();

// Clean up the recovery cache
m_recoverySessionMap.clear();

return Orch::postBake();
}

void MirrorOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -260,7 +240,7 @@ bool MirrorOrch::decreaseRefCount(const string& name)
return true;
}

void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
task_process_status MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
{
SWSS_LOG_ENTER();

Expand All @@ -269,7 +249,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_NOTICE("Failed to create session, session %s already exists",
key.c_str());
return;
return task_process_status::task_duplicated;
}

string platform = getenv("platform") ? getenv("platform") : "";
Expand All @@ -284,7 +264,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.srcIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s source IP address\n", key.c_str());
return;
return task_process_status::task_invalid_entry;
}
}
else if (fvField(i) == MIRROR_SESSION_DST_IP)
Expand All @@ -293,7 +273,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.dstIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s destination IP address\n", key.c_str());
return;
return task_process_status::task_invalid_entry;
}
}
else if (fvField(i) == MIRROR_SESSION_GRE_TYPE)
Expand All @@ -318,7 +298,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_ERROR("Failed to get policer %s",
fvValue(i).c_str());
return;
return task_process_status::task_need_retry;
}

m_policerOrch->increaseRefCount(fvValue(i));
Expand All @@ -327,18 +307,18 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
else
{
SWSS_LOG_ERROR("Failed to parse session %s configuration. Unknown attribute %s.\n", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_invalid_entry;
}
}
catch (const exception& e)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s error: %s.", key.c_str(), fvField(i).c_str(), e.what());
return;
return task_process_status::task_invalid_entry;
}
catch (...)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s. Unknown error has been occurred", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_failed;
}
}

Expand All @@ -350,6 +330,8 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d

// Attach the destination IP to the routeOrch
m_routeOrch->attach(this, entry.dstIp);

return task_process_status::task_success;
}

task_process_status MirrorOrch::deleteEntry(const string& name)
Expand Down Expand Up @@ -1135,11 +1117,6 @@ void MirrorOrch::doTask(Consumer& consumer)
{
SWSS_LOG_ENTER();

if (m_freeze)
{
return;
}

if (!gPortsOrch->isPortReady())
{
return;
Expand All @@ -1152,26 +1129,32 @@ void MirrorOrch::doTask(Consumer& consumer)

string key = kfvKey(t);
string op = kfvOp(t);
task_process_status task_status = task_process_status::task_failed;

if (op == SET_COMMAND)
{
createEntry(key, kfvFieldsValues(t));
task_status = createEntry(key, kfvFieldsValues(t));
}
else if (op == DEL_COMMAND)
{
auto task_status = deleteEntry(key);
// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
continue;
}
task_status = deleteEntry(key);
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}

consumer.m_toSync.erase(it++);
// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
}
else
{
consumer.m_toSync.erase(it++);
}
}

// Clear any recovery state that might be leftover from warm reboot
m_recoverySessionMap.clear();
}
7 changes: 3 additions & 4 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,15 @@ class MirrorOrch : public Orch, public Observer, public Subject
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);

bool bake() override;
bool postBake() override;
void update(SubjectType, void *);
bool sessionExists(const string&);
bool getSessionStatus(const string&, bool&);
bool getSessionOid(const string&, sai_object_id_t&);
bool increaseRefCount(const string&);
bool decreaseRefCount(const string&);

using Orch::doTask; // Allow access to the basic doTask

private:
PortsOrch *m_portsOrch;
RouteOrch *m_routeOrch;
Expand All @@ -91,9 +92,7 @@ class MirrorOrch : public Orch, public Observer, public Subject
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

bool m_freeze = false;

void createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status deleteEntry(const string&);

bool activateSession(const string&, MirrorEntry&);
Expand Down
7 changes: 0 additions & 7 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,6 @@ bool Orch::bake()
return true;
}

bool Orch::postBake()
{
SWSS_LOG_ENTER();

return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down
5 changes: 2 additions & 3 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ typedef enum
task_invalid_entry,
task_failed,
task_need_retry,
task_ignore
task_ignore,
task_duplicated
} task_process_status;

typedef map<string, sai_object_id_t> object_map;
Expand Down Expand Up @@ -182,8 +183,6 @@ class Orch
// Prepare for warm start if Redis contains valid input data
// otherwise fallback to cold start
virtual bool bake();
// Clean up the state set in bake()
virtual bool postBake();

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
virtual void doTask();
Expand Down
23 changes: 15 additions & 8 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ IntfsOrch *gIntfsOrch;
NeighOrch *gNeighOrch;
RouteOrch *gRouteOrch;
AclOrch *gAclOrch;
MirrorOrch *gMirrorOrch;
CrmOrch *gCrmOrch;
BufferOrch *gBufferOrch;
SwitchOrch *gSwitchOrch;
Expand Down Expand Up @@ -124,7 +125,7 @@ bool OrchDaemon::init()

TableConnector stateDbMirrorSession(m_stateDb, STATE_MIRROR_SESSION_TABLE_NAME);
TableConnector confDbMirrorSession(m_configDb, CFG_MIRROR_SESSION_TABLE_NAME);
MirrorOrch *mirror_orch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);
gMirrorOrch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);

TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_NAME);
TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
Expand Down Expand Up @@ -191,10 +192,10 @@ bool OrchDaemon::init()
m_orchList.push_back(dtel_orch);
}
TableConnector stateDbSwitchTable(m_stateDb, "SWITCH_CAPABILITY");
gAclOrch = new AclOrch(acl_table_connectors, stateDbSwitchTable, gPortsOrch, mirror_orch, gNeighOrch, gRouteOrch, dtel_orch);
gAclOrch = new AclOrch(acl_table_connectors, stateDbSwitchTable, gPortsOrch, gMirrorOrch, gNeighOrch, gRouteOrch, dtel_orch);

m_orchList.push_back(gFdbOrch);
m_orchList.push_back(mirror_orch);
m_orchList.push_back(gMirrorOrch);
m_orchList.push_back(gAclOrch);
m_orchList.push_back(vnet_orch);
m_orchList.push_back(vnet_rt_orch);
Expand Down Expand Up @@ -486,18 +487,24 @@ bool OrchDaemon::warmRestoreAndSyncUp()

for (auto it = 0; it < 4; it++)
{
SWSS_LOG_DEBUG("The current iteration is %d", it);
SWSS_LOG_DEBUG("The current doTask iteration is %d", it);

for (Orch *o : m_orchList)
{
if (o == gMirrorOrch) {
yxieca marked this conversation as resolved.
Show resolved Hide resolved
SWSS_LOG_DEBUG("Skipping mirror processing until the end");
continue;
}

o->doTask();
}
}

for (Orch *o : m_orchList)
{
o->postBake();
}
// MirrorOrch depends on everything else being settled before it can run,
// and mirror ACL rules depend on MirrorOrch, so run these two at the end
// after the rest of the data has been processed.
gMirrorOrch->doTask();
gAclOrch->doTask();

/*
* At this point, all the pre-existing data should have been processed properly, and
Expand Down
Loading