Skip to content

Commit

Permalink
Add batch ops and StatusCode for PINS / P4Runtime
Browse files Browse the repository at this point in the history
* Add batch set/delete() to ProducerStateTable
* Add StatusCode enum and functions to convert between
  string and enum values.
* Allow exists() to check for whitespace.
  This is only to allow whitespace when we check for
  existence. We can already create entries with whitespace.
* Add SWSS return code SWSS_RC_UNIMPLEMENTED
* Fix json error, refer to nlohmann/json#590

Submission containing materials of a third party:
    Copyright Google LLC; Licensed under Apache 2.0

Co-authored-by: Akarsh Gupta <[email protected]>
Co-authored-by: Jay Hu <[email protected]>
Co-authored-by: Manali Kumar <[email protected]>
Co-authored-by: Robert J. Halstead <[email protected]>
Co-authored-by: Runming Wu <[email protected]>
Co-authored-by: Yilan Ji <[email protected]>

Signed-off-by: Don Newton [email protected]
  • Loading branch information
PINS Working Group authored and bocon13 committed Oct 28, 2021
1 parent 94ae46d commit a570eb6
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 6 deletions.
5 changes: 0 additions & 5 deletions common/dbconnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -627,11 +627,6 @@ int64_t DBConnector::del(const string &key)
bool DBConnector::exists(const string &key)
{
RedisCommand rexists;
if (key.find_first_of(" \t") != string::npos)
{
SWSS_LOG_ERROR("EXISTS failed, invalid space or tab in single key: %s", key.c_str());
throw runtime_error("EXISTS failed, invalid space or tab in single key");
}
rexists.format("EXISTS %s", key.c_str());
RedisReply r(this, rexists, REDIS_REPLY_INTEGER);
return r.getContext()->integer > 0;
Expand Down
2 changes: 1 addition & 1 deletion common/json.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5394,7 +5394,7 @@ class basic_json
{
assert(lhs.m_value.array != nullptr);
assert(rhs.m_value.array != nullptr);
return *lhs.m_value.array < *rhs.m_value.array;
return (*lhs.m_value.array) < *rhs.m_value.array;
}
case value_t::object:
{
Expand Down
123 changes: 123 additions & 0 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,35 @@ ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &ta
"end\n";
m_shaDel = m_pipe->loadRedisScript(luaDel);

string luaBatchedSet =
"local added = 0\n"
"local idx = 2\n"
"for i = 0, #KEYS - 4 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[4 + i])\n"
" for j = 0, tonumber(ARGV[idx]) - 1 do\n"
" local attr = ARGV[idx + j * 2 + 1]\n"
" local val = ARGV[idx + j * 2 + 2]\n"
" redis.call('HSET', KEYS[3] .. KEYS[4 + i], attr, val)\n"
" end\n"
" idx = idx + tonumber(ARGV[idx]) * 2 + 1\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedSet = m_pipe->loadRedisScript(luaBatchedSet);

string luaBatchedDel =
"local added = 0\n"
"for i = 0, #KEYS - 5 do\n"
" added = added + redis.call('SADD', KEYS[2], KEYS[5 + i])\n"
" redis.call('SADD', KEYS[3], KEYS[5 + i])\n"
" redis.call('DEL', KEYS[4] .. KEYS[5 + i])\n"
"end\n"
"if added > 0 then \n"
" redis.call('PUBLISH', KEYS[1], ARGV[1])\n"
"end\n";
m_shaBatchedDel = m_pipe->loadRedisScript(luaBatchedDel);

string luaClear =
"redis.call('DEL', KEYS[1])\n"
"local keys = redis.call('KEYS', KEYS[2] .. '*')\n"
Expand Down Expand Up @@ -156,6 +185,100 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND
}
}

void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &value : values)
{
const std::string &key = kfvKey(value);
for (const auto &iv : kfvFieldsValues(value))
{
m_tempViewState[key][fvField(iv)] = fvValue(iv);
}
}
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedSet);
args.emplace_back(to_string(values.size() + 3));
args.emplace_back(getChannelName());
args.emplace_back(getKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &value : values)
{
args.emplace_back(kfvKey(value));
}
args.emplace_back("G");
for (const auto &value : values)
{
args.emplace_back(to_string(kfvFieldsValues(value).size()));
for (const auto &iv : kfvFieldsValues(value))
{
args.emplace_back(fvField(iv));
args.emplace_back(fvValue(iv));
}
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}

void ProducerStateTable::del(const std::vector<std::string>& keys)
{
if (m_tempViewActive)
{
// Write to temp view instead of DB
for (const auto &key : keys)
{
m_tempViewState.erase(key);
}
return;
}

// Assembly redis command args into a string vector
vector<string> args;
args.emplace_back("EVALSHA");
args.emplace_back(m_shaBatchedDel);
args.emplace_back(to_string(keys.size() + 4));
args.emplace_back(getChannelName());
args.emplace_back(getKeySetName());
args.emplace_back(getDelKeySetName());
args.emplace_back(getStateHashPrefix() + getTableName() + getTableNameSeparator());
for (const auto &key : keys)
{
args.emplace_back(key);
}
args.emplace_back("G");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
m_pipe->flush();
}
}

void ProducerStateTable::flush()
{
m_pipe->flush();
Expand Down
13 changes: 13 additions & 0 deletions common/producerstatetable.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <memory>
#include <vector>
#include "table.h"
#include "redispipeline.h"

Expand Down Expand Up @@ -35,6 +36,16 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
%}
#endif

// Batched version of set() and del().
// The batched methods don't include (or use) op and prefix. They are
// written for specific use case only. The consumer logic (or batch size)
// might need to change if the producer does batched operations.

// In set(), the op is ignored.
void set(const std::vector<KeyOpFieldsValuesTuple>& values);

void del(const std::vector<std::string>& keys);

void flush();

int64_t count();
Expand All @@ -51,6 +62,8 @@ class ProducerStateTable : public TableBase, public TableName_KeySet
RedisPipeline *m_pipe;
std::string m_shaSet;
std::string m_shaDel;
std::string m_shaBatchedSet;
std::string m_shaBatchedDel;
std::string m_shaClear;
std::string m_shaApplyView;
TableDump m_tempViewState;
Expand Down
70 changes: 70 additions & 0 deletions common/status_code_util.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#pragma once

#include <map>
#include <string>

namespace swss {

enum class StatusCode {
SWSS_RC_SUCCESS,
SWSS_RC_INVALID_PARAM,
SWSS_RC_DEADLINE_EXCEEDED,
SWSS_RC_UNAVAIL,
SWSS_RC_NOT_FOUND,
SWSS_RC_NO_MEMORY,
SWSS_RC_EXISTS,
SWSS_RC_PERMISSION_DENIED,
SWSS_RC_FULL,
SWSS_RC_IN_USE,
SWSS_RC_INTERNAL,
SWSS_RC_UNIMPLEMENTED,
SWSS_RC_UNKNOWN,
};

static std::map<StatusCode, std::string> statusCodeMapping = {
{StatusCode::SWSS_RC_SUCCESS, "SWSS_RC_SUCCESS"},
{StatusCode::SWSS_RC_INVALID_PARAM, "SWSS_RC_INVALID_PARAM"},
{StatusCode::SWSS_RC_DEADLINE_EXCEEDED, "SWSS_RC_DEADLINE_EXCEEDED"},
{StatusCode::SWSS_RC_UNAVAIL, "SWSS_RC_UNAVAIL"},
{StatusCode::SWSS_RC_NOT_FOUND, "SWSS_RC_NOT_FOUND"},
{StatusCode::SWSS_RC_NO_MEMORY, "SWSS_RC_NO_MEMORY"},
{StatusCode::SWSS_RC_EXISTS, "SWSS_RC_EXISTS"},
{StatusCode::SWSS_RC_PERMISSION_DENIED, "SWSS_RC_PERMISSION_DENIED"},
{StatusCode::SWSS_RC_FULL, "SWSS_RC_FULL"},
{StatusCode::SWSS_RC_IN_USE, "SWSS_RC_IN_USE"},
{StatusCode::SWSS_RC_INTERNAL, "SWSS_RC_INTERNAL"},
{StatusCode::SWSS_RC_UNIMPLEMENTED, "SWSS_RC_UNIMPLEMENTED"},
{StatusCode::SWSS_RC_UNKNOWN, "SWSS_RC_UNKNOWN"},
};

static std::map<std::string, StatusCode> StatusCodeLookup = {
{"SWSS_RC_SUCCESS", StatusCode::SWSS_RC_SUCCESS},
{"SWSS_RC_INVALID_PARAM", StatusCode::SWSS_RC_INVALID_PARAM},
{"SWSS_RC_DEADLINE_EXCEEDED", StatusCode::SWSS_RC_DEADLINE_EXCEEDED},
{"SWSS_RC_UNAVAIL", StatusCode::SWSS_RC_UNAVAIL},
{"SWSS_RC_NOT_FOUND", StatusCode::SWSS_RC_NOT_FOUND},
{"SWSS_RC_NO_MEMORY", StatusCode::SWSS_RC_NO_MEMORY},
{"SWSS_RC_EXISTS", StatusCode::SWSS_RC_EXISTS},
{"SWSS_RC_PERMISSION_DENIED", StatusCode::SWSS_RC_PERMISSION_DENIED},
{"SWSS_RC_FULL", StatusCode::SWSS_RC_FULL},
{"SWSS_RC_IN_USE", StatusCode::SWSS_RC_IN_USE},
{"SWSS_RC_INTERNAL", StatusCode::SWSS_RC_INTERNAL},
{"SWSS_RC_UNIMPLEMENTED", StatusCode::SWSS_RC_UNIMPLEMENTED},
{"SWSS_RC_UNKNOWN", StatusCode::SWSS_RC_UNKNOWN},
};

inline std::string statusCodeToStr(const StatusCode& status) {
if (statusCodeMapping.find(status) == statusCodeMapping.end()) {
return "SWSS_RC_UNKNOWN";
}
return statusCodeMapping.at(status);
}

inline StatusCode strToStatusCode(const std::string& status) {
if (StatusCodeLookup.find(status) == StatusCodeLookup.end()) {
return StatusCode::SWSS_RC_UNKNOWN;
}
return StatusCodeLookup.at(status);
}

} // namespace swss
1 change: 1 addition & 0 deletions tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ tests_SOURCES = redis_ut.cpp \
stringutility_ut.cpp \
redisutility_ut.cpp \
boolean_ut.cpp \
status_code_util_test.cpp \
main.cpp

tests_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_GTEST) $(LIBNL_CFLAGS)
Expand Down
19 changes: 19 additions & 0 deletions tests/status_code_util_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include "common/status_code_util.h"

#include <gtest/gtest.h>

namespace {

using swss::StatusCode;

TEST(StatusCodeUtilTest, StatusCodeUtilTest) {
for (int i = static_cast<int>(StatusCode::SWSS_RC_SUCCESS);
i <= static_cast<int>(StatusCode::SWSS_RC_UNKNOWN); ++i) {
StatusCode original = static_cast<StatusCode>(i);
StatusCode final = swss::strToStatusCode(statusCodeToStr(original));
EXPECT_EQ(original, final);
}
EXPECT_EQ(StatusCode::SWSS_RC_UNKNOWN, swss::strToStatusCode("invalid"));
}

} // namespace

0 comments on commit a570eb6

Please sign in to comment.