Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: catch exception if unmarshall the rpc request encounters error #790

Merged
merged 14 commits into from
Jul 28, 2021
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/master' into rpc-coredump
  • Loading branch information
levy5307 committed Jul 26, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 458b55a390f81e50aba040200bb8d81d00286912
17 changes: 17 additions & 0 deletions scripts/pegasus_rolling_update.sh
Original file line number Diff line number Diff line change
@@ -123,6 +123,14 @@ if [ $set_ok -ne 1 ]; then
exit 1
fi

echo "Set lb.assign_delay_ms to 30min..."
echo "remote_command -l $pmeta meta.lb.assign_delay_ms 1800000" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
if [ $set_ok -ne 1 ]; then
echo "ERROR: set lb.assign_delay_ms to 30min failed"
exit 1
fi

echo
while read line
do
@@ -302,6 +310,15 @@ if [ $set_ok -ne 1 ]; then
exit 1
fi

echo "Set lb.assign_delay_ms to DEFAULT..."
echo "remote_command -l $pmeta meta.lb.assign_delay_ms DEFAULT" | ./run.sh shell --cluster $meta_list &>/tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms
set_ok=`grep OK /tmp/$UID.$PID.pegasus.rolling_node.assign_delay_ms | wc -l`
if [ $set_ok -ne 1 ]; then
echo "ERROR: set lb.assign_delay_ms to DEFAULT failed"
exit 1
fi
echo

if [ "$type" = "all" ]; then
echo "=================================================================="
echo "=================================================================="
2 changes: 1 addition & 1 deletion src/server/compaction_operation.cpp
Original file line number Diff line number Diff line change
@@ -174,7 +174,7 @@ compaction_operations create_compaction_operations(const std::string &json, uint
enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
if (operation != nullptr) {
operation->set_rules(std::move(rules));
res.emplace_back(std::unique_ptr<compaction_operation>(operation));
res.emplace_back(std::shared_ptr<compaction_operation>(operation));
}
}
return res;
2 changes: 1 addition & 1 deletion src/server/compaction_operation.h
Original file line number Diff line number Diff line change
@@ -159,7 +159,7 @@ class update_ttl : public compaction_operation
FRIEND_TEST(compaction_filter_operation_test, create_operations);
};

typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
void register_compaction_operations();
} // namespace server
43 changes: 40 additions & 3 deletions src/server/key_ttl_compaction_filter.h
Original file line number Diff line number Diff line change
@@ -40,13 +40,15 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
bool enabled,
int32_t pidx,
int32_t partition_version,
bool validate_hash)
bool validate_hash,
compaction_operations &&compaction_ops)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
_validate_partition_hash(validate_hash)
_validate_partition_hash(validate_hash),
_user_specified_operations(std::move(compaction_ops))
{
}

@@ -60,6 +62,17 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return false;
}

// ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need
// deal with it here.
if (key.size() < 2) {
return false;
}

if (!_user_specified_operations.empty() &&
user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
return true;
}

uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
@@ -73,6 +86,22 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}

bool user_specified_operation_filter(const rocksdb::Slice &key,
const rocksdb::Slice &existing_value,
std::string *new_value,
bool *value_changed) const
{
std::string hash_key, sort_key;
pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
for (const auto &op : _user_specified_operations) {
if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
// return true if this data need to be deleted
return true;
}
}
return false;
}

const char *Name() const override { return "KeyWithTTLCompactionFilter"; }

// Check if the record is stale after partition split, which will split the partition into two
@@ -94,6 +123,7 @@ class KeyWithTTLCompactionFilter : public rocksdb::CompactionFilter
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
compaction_operations _user_specified_operations;
};

class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
@@ -105,13 +135,20 @@ class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactor
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
compaction_operations tmp_filter_operations;
{
dsn::utils::auto_read_lock l(_lock);
tmp_filter_operations = _user_specified_operations;
}

return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
_validate_partition_hash.load()));
_validate_partition_hash.load(),
std::move(tmp_filter_operations)));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }

You are viewing a condensed version of this merge commit. You can view the full changes here.