Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

feat: forbid large-size-value writes to Pegasus #414

Merged
merged 20 commits into from
Mar 12, 2020
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dist/replication/lib/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "prepare_list.h"
#include "replica_context.h"
#include "throttling_controller.h"
#include <dsn/dist/fmt_logging.h>
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved

namespace dsn {
namespace replication {
Expand Down
13 changes: 13 additions & 0 deletions src/dist/replication/lib/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (dsn_unlikely(_stub->_max_allowed_write_size &&
request->body_size() > _stub->_max_allowed_write_size)) {
dwarn_replica("client from {} write request body size exceed threshold = {}, gpid = "
"({}.{}), it will be reject!",
request->header->from_address.to_string(),
_stub->_max_allowed_write_size,
get_gpid().get_app_id(),
levy5307 marked this conversation as resolved.
Show resolved Hide resolved
get_gpid().get_partition_index());
_stub->_counter_recent_write_size_exceed_threshold_count->increment();
response_client_write(request, ERR_INVALID_DATA);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
return;
}

task_spec *spec = task_spec::get(request->rpc_code());
if (!_options->allow_non_idempotent_write && !spec->rpc_request_is_write_idempotent) {
response_client_write(request, ERR_OPERATION_DISABLED);
Expand Down
12 changes: 12 additions & 0 deletions src/dist/replication/lib/replica_stub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ replica_stub::replica_stub(replica_state_subscriber subscriber /*= nullptr*/,
_log = nullptr;
_primary_address_str[0] = '\0';
install_perf_counters();

_max_allowed_write_size = dsn_config_get_value_uint64(
"pegasus.server",
"max_allowed_write_size",
1000000,
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
"write operation exceed this threshold will be logged and reject, 0 means no check");
}

replica_stub::~replica_stub(void) { close(); }
Expand Down Expand Up @@ -322,6 +328,12 @@ void replica_stub::install_perf_counters()
"recent.write.busy.count",
COUNTER_TYPE_VOLATILE_NUMBER,
"write busy count in the recent period");

_counter_recent_write_size_exceed_threshold_count.init_app_counter(
"eon.replica_stub",
"recent.write.size.exceed.threshold.count",
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
COUNTER_TYPE_VOLATILE_NUMBER,
"write size exceed threshold count in the recent period");
}

void replica_stub::initialize(bool clear /* = false*/)
Expand Down
6 changes: 6 additions & 0 deletions src/dist/replication/lib/replica_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
friend class duplication_sync_timer;
friend class duplication_sync_timer_test;
friend class replica_duplicator_manager_test;
friend class replica_test;

typedef std::unordered_map<gpid, ::dsn::task_ptr> opening_replicas;
typedef std::unordered_map<gpid, std::tuple<task_ptr, replica_ptr, app_info, replica_info>>
Expand Down Expand Up @@ -336,6 +337,9 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
// cli service
std::unique_ptr<dsn::cli_service> _cli_service;

// write body size exceed this threshold will be logged and reject, 0 means no check
uint64_t _max_allowed_write_size;

// performance counters
perf_counter_wrapper _counter_replicas_count;
perf_counter_wrapper _counter_replicas_opening_count;
Expand Down Expand Up @@ -395,6 +399,8 @@ class replica_stub : public serverlet<replica_stub>, public ref_counter
perf_counter_wrapper _counter_recent_read_busy_count;
perf_counter_wrapper _counter_recent_write_busy_count;

perf_counter_wrapper _counter_recent_write_size_exceed_threshold_count;

dsn::task_tracker _tracker;
};
} // namespace replication
Expand Down
62 changes: 62 additions & 0 deletions src/dist/replication/test/replica_test/unit_test/replica_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) 2017-present, Xiaomi, Inc. All rights reserved.
// This source code is licensed under the Apache License Version 2.0, which
// can be found in the LICENSE file in the root directory of this source tree.

#include <gtest/gtest.h>

#include <dsn/utility/fail_point.h>
#include "replica_test_base.h"

namespace dsn {
namespace replication {

class replica_test : public replica_test_base
hycdong marked this conversation as resolved.
Show resolved Hide resolved
{
public:
int total_count = 0;
dsn::app_info _app_info;
dsn::gpid pid = gpid(2, 1);

public:
void SetUp() override
{
stub->install_perf_counters();
mock_app_info();
stub->generate_replica(_app_info, pid, partition_status::PS_PRIMARY, 1);
}

void calc_write_size_exceed_threshold_count()
{
total_count += stub->_counter_recent_write_size_exceed_threshold_count->get_value();
}

void mock_app_info()
{
_app_info.app_id = 2;
_app_info.app_name = "replica_test";
_app_info.app_type = "replica";
_app_info.is_stateful = true;
_app_info.max_replica_count = 3;
_app_info.partition_count = 8;
}
};

TEST_F(replica_test, write_size_limited)
{
int count = 100;
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
task_code default_code;
struct dsn::message_header header;
header.body_length = 10000000;

auto write_request = dsn::message_ex::create_request(default_code);
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
write_request->header = &header;
while (count-- > 0) {
stub->on_client_write(pid, write_request);
calc_write_size_exceed_threshold_count();
foreverneverer marked this conversation as resolved.
Show resolved Hide resolved
}

ASSERT_EQ(total_count, 100);
}

} // namespace replication
} // namespace dsn