Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: forbid large-size-value writes to Pegasus #414

Merged
merged 20 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand All @@ -44,6 +45,18 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (dsn_unlikely(_stub->_max_allowed_write_size &&
request->body_size() > _stub->_max_allowed_write_size)) {
dwarn_replica("client from {} write request body size exceed threshold, request_body_size "
"= {}, max_allowed_write_size = {}, it will be rejected!",
request->header->from_address.to_string(),
request->body_size(),
_stub->_max_allowed_write_size);
_stub->_counter_recent_write_size_exceed_threshold_count->increment();
response_client_write(request, ERR_INVALID_DATA);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return;
}

task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_write(request, ERR_OPERATION_DISABLED);
Expand Down
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_log = nullptr;
_primary_address_str[0] = '\0';
install_perf_counters();

_max_allowed_write_size = dsn_config_get_value_uint64("replication",
"max_allowed_write_size",
1 << 20,
"write operation exceed this "
"threshold will be logged and reject, "
"default is 1MB, 0 means no check");
}

replica_stub::~replica_stub(void) { close(); }
Expand Down Expand Up @@ -322,6 +329,12 @@ void replica_stub::install_perf_counters()
"recent.write.busy.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write busy count in the recent period");

_counter_recent_write_size_exceed_threshold_count.init_app_counter(
"eon.replica_stub",
"recent_write_size_exceed_threshold_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write size exceed threshold count in the recent period");
}

void replica_stub::initialize(bool clear /* = false*/)
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
friend class replica_duplicator_manager_test;
friend class replica_test;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down Expand Up @@ -343,6 +344,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// cli service
std::unique_ptr<dsn::cli_service> _cli_service;

// write body size exceed this threshold will be logged and reject, 0 means no check
uint64_t _max_allowed_write_size;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
Expand Down Expand Up @@ -402,6 +406,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
perf_counter_wrapper _counter_recent_read_busy_count;
perf_counter_wrapper _counter_recent_write_busy_count;

perf_counter_wrapper _counter_recent_write_size_exceed_threshold_count;

dsn::task_tracker _tracker;
};
} // namespace replication
Expand Down
63 changes: 63 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/replica_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>

namespace dsn {
namespace replication {

class replica_test : public replica_test_base
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
public:
dsn::app_info _app_info;
dsn::gpid pid = gpid(2, 1);

public:
void SetUp() override
{
stub->install_perf_counters();
mock_app_info();
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1);
}

int get_write_size_exceed_threshold_count()
{
return stub->_counter_recent_write_size_exceed_threshold_count->get_value();
}

void mock_app_info()
{
_app_info.app_id = 2;
_app_info.app_name = "replica_test";
_app_info.app_type = "replica";
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
_app_info.partition_count = 8;
}
};

TEST_F(replica_test, write_size_limited)
{
int count = 100;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
task_code default_code;
struct dsn::message_header header;
header.body_length = 10000000;

auto write_request = dsn::message_ex::create_request(default_code);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
auto cleanup = dsn::defer([=]() { delete write_request; });
write_request->header = &header;

for (int i = 0; i < count; i++) {
stub->on_client_write(pid, write_request);
}

ASSERT_EQ(get_write_size_exceed_threshold_count(), count);
}

} // namespace replication
} // namespace dsn