Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[everflow] Add retry mechanism for mirror sessions and policers #1486

Merged
merged 11 commits into from
Dec 10, 2020
23 changes: 16 additions & 7 deletions orchagent/aclorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1083,12 +1083,6 @@ bool AclRuleMirror::validateAddAction(string attr_name, string attr_value)

m_sessionName = attr_value;

if (!m_pMirrorOrch->sessionExists(m_sessionName))
{
SWSS_LOG_ERROR("Mirror rule reference mirror session that does not exists %s", m_sessionName.c_str());
return false;
}

// insert placeholder value, we'll set the session oid in AclRuleMirror::create()
m_actions[action] = sai_attribute_value_t{};

Expand Down Expand Up @@ -1178,6 +1172,12 @@ bool AclRuleMirror::create()
sai_object_id_t oid = SAI_NULL_OBJECT_ID;
bool state = false;

if (!m_pMirrorOrch->sessionExists(m_sessionName))
daall marked this conversation as resolved.
Show resolved Hide resolved
{
SWSS_LOG_ERROR("Mirror rule references mirror session \"%s\" that does not exist yet", m_sessionName.c_str());
return false;
}

if (!m_pMirrorOrch->getSessionStatus(m_sessionName, state))
{
SWSS_LOG_THROW("Failed to get mirror session state for session %s", m_sessionName.c_str());
Expand Down Expand Up @@ -3122,7 +3122,16 @@ void AclOrch::doAclRuleTask(Consumer &consumer)
}


newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
try
{
newRule = AclRule::makeShared(type, this, m_mirrorOrch, m_dTelOrch, rule_id, table_id, t);
}
catch (exception &e)
{
SWSS_LOG_ERROR("Error while creating ACL rule %s: %s", rule_id.c_str(), e.what());
it = consumer.m_toSync.erase(it);
return;
}

for (const auto& itr : kfvFieldsValues(t))
{
Expand Down
70 changes: 26 additions & 44 deletions orchagent/mirrororch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,6 @@ bool MirrorOrch::bake()
{
SWSS_LOG_ENTER();

// Freeze the route update during orchagent restoration
m_freeze = true;

deque<KeyOpFieldsValuesTuple> entries;
vector<string> keys;
m_mirrorTable.getKeys(keys);
Expand Down Expand Up @@ -134,23 +131,6 @@ bool MirrorOrch::bake()
return Orch::bake();
}

bool MirrorOrch::postBake()
{
SWSS_LOG_ENTER();

SWSS_LOG_NOTICE("Start MirrorOrch post-baking");

// Unfreeze the route update
m_freeze = false;

Orch::doTask();

// Clean up the recovery cache
m_recoverySessionMap.clear();

return Orch::postBake();
}

void MirrorOrch::update(SubjectType type, void *cntx)
{
SWSS_LOG_ENTER();
Expand Down Expand Up @@ -340,7 +320,7 @@ bool MirrorOrch::validateSrcPortList(const string& srcPortList)
return true;
}

void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
task_process_status MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& data)
{
SWSS_LOG_ENTER();

Expand All @@ -349,7 +329,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_NOTICE("Failed to create session, session %s already exists",
key.c_str());
return;
return task_process_status::task_invalid_entry;
}

string platform = getenv("platform") ? getenv("platform") : "";
Expand All @@ -364,7 +344,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.srcIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s source IP address", key.c_str());
return;
return task_process_status::task_failed;
daall marked this conversation as resolved.
Show resolved Hide resolved
}
}
else if (fvField(i) == MIRROR_SESSION_DST_IP)
Expand All @@ -373,7 +353,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!entry.dstIp.isV4())
{
SWSS_LOG_ERROR("Unsupported version of sessions %s destination IP address", key.c_str());
return;
return task_process_status::task_failed;
}
}
else if (fvField(i) == MIRROR_SESSION_GRE_TYPE)
Expand All @@ -398,7 +378,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
{
SWSS_LOG_ERROR("Failed to get policer %s",
fvValue(i).c_str());
return;
return task_process_status::task_need_retry;
}

m_policerOrch->increaseRefCount(fvValue(i));
Expand All @@ -409,7 +389,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!validateSrcPortList(fvValue(i)))
{
SWSS_LOG_ERROR("Failed to get valid source port list %s", fvValue(i).c_str());
return;
return task_process_status::task_failed;
}
entry.src_port = fvValue(i);
}
Expand All @@ -418,7 +398,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
if (!validateDstPort(fvValue(i)))
{
SWSS_LOG_ERROR("Failed to get valid destination port %s", fvValue(i).c_str());
return;
return task_process_status::task_failed;
}
entry.dst_port = fvValue(i);
}
Expand All @@ -428,7 +408,7 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
|| fvValue(i) == MIRROR_BOTH_DIRECTION))
{
SWSS_LOG_ERROR("Failed to get valid direction %s", fvValue(i).c_str());
return;
return task_process_status::task_failed;
}
entry.direction = fvValue(i);
}
Expand All @@ -439,18 +419,18 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
else
{
SWSS_LOG_ERROR("Failed to parse session %s configuration. Unknown attribute %s", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_failed;
}
}
catch (const exception& e)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s error: %s.", key.c_str(), fvField(i).c_str(), e.what());
return;
return task_process_status::task_failed;
}
catch (...)
{
SWSS_LOG_ERROR("Failed to parse session %s attribute %s. Unknown error has been occurred", key.c_str(), fvField(i).c_str());
return;
return task_process_status::task_failed;
}
}

Expand All @@ -470,6 +450,8 @@ void MirrorOrch::createEntry(const string& key, const vector<FieldValueTuple>& d
// Attach the destination IP to the routeOrch
m_routeOrch->attach(this, entry.dstIp);
}

return task_process_status::task_success;
}

task_process_status MirrorOrch::deleteEntry(const string& name)
Expand Down Expand Up @@ -1412,11 +1394,6 @@ void MirrorOrch::doTask(Consumer& consumer)
{
SWSS_LOG_ENTER();

if (m_freeze)
{
return;
}

if (!gPortsOrch->allPortsReady())
{
return;
Expand All @@ -1429,26 +1406,31 @@ void MirrorOrch::doTask(Consumer& consumer)

string key = kfvKey(t);
string op = kfvOp(t);
task_process_status task_status = task_process_status::task_failed;

if (op == SET_COMMAND)
{
createEntry(key, kfvFieldsValues(t));
task_status = createEntry(key, kfvFieldsValues(t));
}
else if (op == DEL_COMMAND)
{
auto task_status = deleteEntry(key);
// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
continue;
}
task_status = deleteEntry(key);
}
else
{
SWSS_LOG_ERROR("Unknown operation type %s", op.c_str());
}

// Specifically retry the task when asked
if (task_status == task_process_status::task_need_retry)
{
it++;
continue;
daall marked this conversation as resolved.
Show resolved Hide resolved
}

consumer.m_toSync.erase(it++);
}

// Clear any recovery state that might be leftover from warm reboot
m_recoverySessionMap.clear();
}
5 changes: 1 addition & 4 deletions orchagent/mirrororch.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ class MirrorOrch : public Orch, public Observer, public Subject
PortsOrch *portOrch, RouteOrch *routeOrch, NeighOrch *neighOrch, FdbOrch *fdbOrch, PolicerOrch *policerOrch);

bool bake() override;
bool postBake() override;
void update(SubjectType, void *);
bool sessionExists(const string&);
bool getSessionStatus(const string&, bool&);
Expand All @@ -101,9 +100,7 @@ class MirrorOrch : public Orch, public Observer, public Subject
// session_name -> VLAN | monitor_port_alias | next_hop_ip
map<string, string> m_recoverySessionMap;

bool m_freeze = false;

void createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status createEntry(const string&, const vector<FieldValueTuple>&);
task_process_status deleteEntry(const string&);

bool activateSession(const string&, MirrorEntry&);
Expand Down
7 changes: 0 additions & 7 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,13 +299,6 @@ bool Orch::bake()
return true;
}

bool Orch::postBake()
{
SWSS_LOG_ENTER();

return true;
}

/*
- Validates reference has proper format which is [table_name:object_name]
- validates table_name exists
Expand Down
2 changes: 0 additions & 2 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,6 @@ class Orch
// Prepare for warm start if Redis contains valid input data
// otherwise fallback to cold start
virtual bool bake();
// Clean up the state set in bake()
virtual bool postBake();

/* Iterate all consumers in m_consumerMap and run doTask(Consumer) */
virtual void doTask();
Expand Down
23 changes: 15 additions & 8 deletions orchagent/orchdaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ NeighOrch *gNeighOrch;
RouteOrch *gRouteOrch;
FgNhgOrch *gFgNhgOrch;
AclOrch *gAclOrch;
MirrorOrch *gMirrorOrch;
CrmOrch *gCrmOrch;
BufferOrch *gBufferOrch;
SwitchOrch *gSwitchOrch;
Expand Down Expand Up @@ -177,7 +178,7 @@ bool OrchDaemon::init()

TableConnector stateDbMirrorSession(m_stateDb, STATE_MIRROR_SESSION_TABLE_NAME);
TableConnector confDbMirrorSession(m_configDb, CFG_MIRROR_SESSION_TABLE_NAME);
MirrorOrch *mirror_orch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);
gMirrorOrch = new MirrorOrch(stateDbMirrorSession, confDbMirrorSession, gPortsOrch, gRouteOrch, gNeighOrch, gFdbOrch, policer_orch);

TableConnector confDbAclTable(m_configDb, CFG_ACL_TABLE_TABLE_NAME);
TableConnector confDbAclRuleTable(m_configDb, CFG_ACL_RULE_TABLE_NAME);
Expand Down Expand Up @@ -273,10 +274,10 @@ bool OrchDaemon::init()
dtel_orch = new DTelOrch(m_configDb, dtel_tables, gPortsOrch);
m_orchList.push_back(dtel_orch);
}
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, mirror_orch, gNeighOrch, gRouteOrch, dtel_orch);
gAclOrch = new AclOrch(acl_table_connectors, gSwitchOrch, gPortsOrch, gMirrorOrch, gNeighOrch, gRouteOrch, dtel_orch);

m_orchList.push_back(gFdbOrch);
m_orchList.push_back(mirror_orch);
m_orchList.push_back(gMirrorOrch);
m_orchList.push_back(gAclOrch);
m_orchList.push_back(chassis_frontend_orch);
m_orchList.push_back(vrf_orch);
Expand Down Expand Up @@ -548,18 +549,24 @@ bool OrchDaemon::warmRestoreAndSyncUp()

for (auto it = 0; it < 3; it++)
{
SWSS_LOG_DEBUG("The current iteration is %d", it);
SWSS_LOG_DEBUG("The current doTask iteration is %d", it);

for (Orch *o : m_orchList)
{
if (o == gMirrorOrch) {
SWSS_LOG_DEBUG("Skipping mirror processing until the end");
continue;
}

o->doTask();
}
}

for (Orch *o : m_orchList)
{
o->postBake();
}
// MirrorOrch depends on everything else being settled before it can run,
// and mirror ACL rules depend on MirrorOrch, so run these two at the end
// after the rest of the data has been processed.
gMirrorOrch->Orch::doTask();
daall marked this conversation as resolved.
Show resolved Hide resolved
gAclOrch->Orch::doTask();
daall marked this conversation as resolved.
Show resolved Hide resolved

/*
* At this point, all the pre-existing data should have been processed properly, and
Expand Down
13 changes: 11 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1019,16 +1019,25 @@ def remove_neighbor(self, interface, ip):
tbl._del(interface + ":" + ip)
time.sleep(1)

# deps: mirror_port_erspan
# deps: mirror_port_erspan, warm_reboot
def add_route(self, prefix, nexthop):
self.runcmd("ip route add " + prefix + " via " + nexthop)
time.sleep(1)

# deps: mirror_port_erspan
# deps: mirror_port_erspan, warm_reboot
def change_route(self, prefix, nexthop):
self.runcmd("ip route change " + prefix + " via " + nexthop)
time.sleep(1)

# deps: warm_reboot
def change_route_ecmp(self, prefix, nexthops):
cmd = ""
for nexthop in nexthops:
cmd += " nexthop via " + nexthop

self.runcmd("ip route change " + prefix + cmd)
time.sleep(1)

# deps: acl, mirror_port_erspan
def remove_route(self, prefix):
self.runcmd("ip route del " + prefix)
Expand Down
5 changes: 5 additions & 0 deletions tests/dvslib/dvs_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,11 @@ def remove_acl_rule(self, table_name: str, rule_name: str) -> None:
"""
self.config_db.delete_entry("ACL_RULE", "{}|{}".format(table_name, rule_name))

def verify_acl_rule_count(self, expected: int) -> None:
"""Verify that there are N rules in the ASIC DB."""
num_keys = len(self.asic_db.default_acl_entries)
self.asic_db.wait_for_n_keys("ASIC_STATE:SAI_OBJECT_TYPE_ACL_ENTRY", num_keys + expected)

def verify_no_acl_rules(self) -> None:
"""Verify that there are no ACL rules in the ASIC DB."""
num_keys = len(self.asic_db.default_acl_entries)
Expand Down
Loading