Skip to content

Commit

Permalink
Add modifyRedis flag to consumer table object (#344)
Browse files Browse the repository at this point in the history
* Add modifyRedis flag to consumer table object

* Remove sal annotations

* Add unittests

* Address comments
  • Loading branch information
kcudnik authored May 30, 2020
1 parent 5a32636 commit 2c7354b
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 3 deletions.
4 changes: 3 additions & 1 deletion common/consumer_table_pops.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ for i = n, 1, -3 do
end
table.insert(rets, ret)

if op == 'bulkset' or op == 'bulkcreate' or op == 'bulkremove' then
if ARGV[2] == "0" then
-- do nothing, we don't want to modify redis during pop
elseif op == 'bulkset' or op == 'bulkcreate' or op == 'bulkremove' then

-- key is "OBJECT_TYPE:num", extract object type from key
key = key:sub(1, string.find(key, ':') - 1)
Expand Down
13 changes: 11 additions & 2 deletions common/consumertable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ namespace swss {
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
, m_modifyRedis(true)
{
std::string luaScript = loadLuaScript("consumer_table_pops.lua");
m_shaPop = loadRedisScript(db, luaScript);
Expand All @@ -39,15 +40,23 @@ ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBa
setQueueLength(len/3);
}

void ConsumerTable::setModifyRedis(bool modify)
{
SWSS_LOG_ENTER();

m_modifyRedis = modify;
}

void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %d ''",
"EVALSHA %s 2 %s %s %d %d",
m_shaPop.c_str(),
getKeyValueOpQueueTableName().c_str(),
(prefix+getTableName()).c_str(),
POP_BATCH_SIZE);
POP_BATCH_SIZE,
m_modifyRedis ? 1 : 0);

RedisReply r(m_db, command, REDIS_REPLY_ARRAY);

Expand Down
11 changes: 11 additions & 0 deletions common/consumertable.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,19 @@ class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueue
/* Get multiple pop elements */
void pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const std::string &prefix = EMPTY_PREFIX);

void setModifyRedis(bool modify);
private:
std::string m_shaPop;

/**
* @brief Modify Redis database.
*
* If set to false, will not make changes to database during POPs operation.
* This will be utilized during synchronous mode.
*
* Default is true.
*/
bool m_modifyRedis;
};

}
Expand Down
39 changes: 39 additions & 0 deletions tests/redis_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,45 @@ TEST(ProducerConsumer, PopEmpty)
EXPECT_EQ(fvs.size(), 0U);
}

TEST(ProducerConsumer, PopNoModify)
{
clearDB();

std::string tableName = "tableName";

DBConnector db("TEST_DB", 0, true);
ProducerTable p(&db, tableName);

std::vector<FieldValueTuple> values;

FieldValueTuple fv("f", "v");
values.push_back(fv);

p.set("key", values, "set");

ConsumerTable c(&db, tableName);

c.setModifyRedis(false);

std::string key;
std::string op;
std::vector<FieldValueTuple> fvs;

c.pop(key, op, fvs); //, "prefixNoMod_");

EXPECT_EQ(key, "key");
EXPECT_EQ(op, "set");
EXPECT_EQ(fvField(fvs[0]), "f");
EXPECT_EQ(fvValue(fvs[0]), "v");

Table t(&db, tableName);

string value_got;
bool r = t.hget("key", "f", value_got);

ASSERT_FALSE(r);
}

TEST(ProducerConsumer, ConsumerSelectWithInitData)
{
clearDB();
Expand Down

0 comments on commit 2c7354b

Please sign in to comment.