Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
feat: forbid large-size-value writes to Pegasus (#414)
Browse files Browse the repository at this point in the history
  • Loading branch information
foreverneverer authored and neverchanje committed Mar 29, 2020
1 parent 2beeeab commit b6721ce
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "mutation_log.h"
#include "replica_stub.h"
#include <dsn/dist/replication/replication_app_base.h>
#include <dsn/dist/fmt_logging.h>

namespace dsn {
namespace replication {
Expand All @@ -44,6 +45,18 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (dsn_unlikely(_stub->_max_allowed_write_size &&
request->body_size() > _stub->_max_allowed_write_size)) {
dwarn_replica("client from {} write request body size exceed threshold, request_body_size "
"= {}, max_allowed_write_size = {}, it will be rejected!",
request->header->from_address.to_string(),
request->body_size(),
_stub->_max_allowed_write_size);
_stub->_counter_recent_write_size_exceed_threshold_count->increment();
response_client_write(request, ERR_INVALID_DATA);
return;
}

task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_write(request, ERR_OPERATION_DISABLED);
Expand Down
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_log = nullptr;
_primary_address_str[0] = '\0';
install_perf_counters();

_max_allowed_write_size = dsn_config_get_value_uint64("replication",
"max_allowed_write_size",
1 << 20,
"write operation exceed this "
"threshold will be logged and reject, "
"default is 1MB, 0 means no check");
}

replica_stub::~replica_stub(void) { close(); }
Expand Down Expand Up @@ -322,6 +329,12 @@ void replica_stub::install_perf_counters()
"recent.write.busy.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write busy count in the recent period");

_counter_recent_write_size_exceed_threshold_count.init_app_counter(
"eon.replica_stub",
"recent_write_size_exceed_threshold_count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write size exceed threshold count in the recent period");
}

void replica_stub::initialize(bool clear /* = false*/)
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
friend class replica_duplicator_manager_test;
friend class replica_test;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down Expand Up @@ -343,6 +344,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// cli service
std::unique_ptr<dsn::cli_service> _cli_service;

// write body size exceed this threshold will be logged and reject, 0 means no check
uint64_t _max_allowed_write_size;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
Expand Down Expand Up @@ -402,6 +406,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
perf_counter_wrapper _counter_recent_read_busy_count;
perf_counter_wrapper _counter_recent_write_busy_count;

perf_counter_wrapper _counter_recent_write_size_exceed_threshold_count;

dsn::task_tracker _tracker;
};
} // namespace replication
Expand Down
63 changes: 63 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/replica_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"
#include <dsn/utility/defer.h>

namespace dsn {
namespace replication {

class replica_test : public replica_test_base
{
public:
dsn::app_info _app_info;
dsn::gpid pid = gpid(2, 1);

public:
void SetUp() override
{
stub->install_perf_counters();
mock_app_info();
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1);
}

int get_write_size_exceed_threshold_count()
{
return stub->_counter_recent_write_size_exceed_threshold_count->get_value();
}

void mock_app_info()
{
_app_info.app_id = 2;
_app_info.app_name = "replica_test";
_app_info.app_type = "replica";
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
_app_info.partition_count = 8;
}
};

TEST_F(replica_test, write_size_limited)
{
int count = 100;
task_code default_code;
struct dsn::message_header header;
header.body_length = 10000000;

auto write_request = dsn::message_ex::create_request(default_code);
auto cleanup = dsn::defer([=]() { delete write_request; });
write_request->header = &header;

for (int i = 0; i < count; i++) {
stub->on_client_write(pid, write_request);
}

ASSERT_EQ(get_write_size_exceed_threshold_count(), count);
}

} // namespace replication
} // namespace dsn

0 comments on commit b6721ce

Please sign in to comment.