Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into rpc-coredump
Browse files Browse the repository at this point in the history
  • Loading branch information
levy5307 committed Jul 26, 2021
2 parents 615f345 + ea1fdc7 commit 458b55a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 5 deletions.
17 changes: 17 additions & 0 deletions scripts/pegasus_rolling_update.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ if [ $set_ok -ne 1 ]; then
exit 1
fi

echo "Set lb.assign_delay_ms to 30min..."
echo "remote_command -l $pmeta meta.lb.assign_delay_ms 1800000" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
if [ $set_ok -ne 1 ]; then
echo "ERROR: set lb.assign_delay_ms to 30min failed"
exit 1
fi

echo
while read line
do
Expand Down Expand Up @@ -302,6 +310,15 @@ if [ $set_ok -ne 1 ]; then
exit 1
fi

echo "Set lb.assign_delay_ms to DEFAULT..."
echo "remote_command -l $pmeta meta.lb.assign_delay_ms DEFAULT" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
if [ $set_ok -ne 1 ]; then
echo "ERROR: set lb.assign_delay_ms to DEFAULT failed"
exit 1
fi
echo

if [ "$type" = "all" ]; then
echo "=================================================================="
echo "=================================================================="
Expand Down
2 changes: 1 addition & 1 deletion src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const std::string &json, uint
enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
if (operation != nullptr) {
operation->set_rules(std::move(rules));
res.emplace_back(std::unique_ptr<compaction_operation>(operation));
res.emplace_back(std::shared_ptr<compaction_operation>(operation));
}
}
return res;
Expand Down
2 changes: 1 addition & 1 deletion src/server/compaction_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class update_ttl : public compaction_operation
FRIEND_TEST(compaction_filter_operation_test, create_operations);
};

typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
void register_compaction_operations();
} // namespace server
Expand Down
43 changes: 40 additions & 3 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
bool validate_hash,
compaction_operations &&compaction_ops)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
_validate_partition_hash(validate_hash),
_user_specified_operations(std::move(compaction_ops))
{
}

Expand All @@ -60,6 +62,17 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return false;
}

// ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need
// deal with it here.
if (key.size() < 2) {
return false;
}

if (!_user_specified_operations.empty() &&
user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
return true;
}

uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
Expand All @@ -73,6 +86,22 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

bool user_specified_operation_filter(const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const
{
std::string hash_key, sort_key;
pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
for (const auto &op : _user_specified_operations) {
if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
// return true if this data need to be deleted
return true;
}
}
return false;
}

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

// Check if the record is stale after partition split, which will split the partition into two
Expand All @@ -94,6 +123,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
compaction_operations _user_specified_operations;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
Expand All @@ -105,13 +135,20 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
compaction_operations tmp_filter_operations;
{
dsn::utils::auto_read_lock l(_lock);
tmp_filter_operations = _user_specified_operations;
}

return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
_validate_partition_hash.load(),
std::move(tmp_filter_operations)));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

Expand Down

0 comments on commit 458b55a

Please sign in to comment.