Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
Merge pull request #3 from neverchanje/master
Browse files Browse the repository at this point in the history
 *: introduce some utilities && rewrite bad old code using modern cpp techniques
  • Loading branch information
qinzuoyan authored Apr 9, 2018
2 parents 43bc2a4 + b361a9b commit 0e1c040
Show file tree
Hide file tree
Showing 17 changed files with 1,233 additions and 108 deletions.
78 changes: 64 additions & 14 deletions include/dsn/cpp/rpc_holder.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
#include <dsn/c/api_layer1.h>
#include <dsn/service_api_cpp.h>
#include <dsn/cpp/clientlet.h>
#include <dsn/cpp/smart_pointers.h>
#include <dsn/tool-api/task_code.h>
#include <dsn/utility/smart_pointers.h>
#include <dsn/utility/chrono_literals.h>

namespace dsn {

using literals::chrono_literals::operator"" _ms;

//
// rpc_holder is mainly designed for RAII of dsn_message_t.
// Since the request message will be automatically released after the rpc ends,
Expand Down Expand Up @@ -69,16 +71,29 @@ namespace dsn {
template <typename TRequest, typename TResponse>
class rpc_holder
{
public:
using request_type = TRequest;
using response_type = TResponse;

public:
explicit rpc_holder(dsn_message_t req = nullptr)
{
if (req != nullptr) {
_i = std::make_shared<internal>(req);
}
}
rpc_holder(std::unique_ptr<TRequest> req, dsn::task_code code) : _i(new internal(req, code)) {}

rpc_holder(std::unique_ptr<TRequest> req,
dsn::task_code code,
std::chrono::milliseconds timeout = 0_ms,
uint64_t partition_hash = 0)
: _i(new internal(req, code, timeout, partition_hash))
{
}

// copyable and movable
// Copying an rpc_holder doesn't produce a deep copy, the new instance will
// reference the same rpc internal data. So, just feel free to copy :)
rpc_holder(const rpc_holder &) = default;
rpc_holder(rpc_holder &&) noexcept = default;
rpc_holder &operator=(const rpc_holder &) = default;
Expand All @@ -92,6 +107,12 @@ class rpc_holder
return *(_i->thrift_request);
}

TRequest *mutable_request() const
{
dassert(_i, "rpc_holder is uninitialized");
return _i->thrift_request.get();
}

TResponse &response() const
{
dassert(_i, "rpc_holder is uninitialized");
Expand All @@ -105,6 +126,8 @@ class rpc_holder
}

// TCallback = void(dsn::error_code)
// NOTE that the `error_code` is not the error carried by response. Users should
// check the responded error themselves.
template <typename TCallback>
task_ptr
call(::dsn::rpc_address server, clientlet *svc, TCallback &&callback, int reply_thread_hash = 0)
Expand All @@ -116,7 +139,7 @@ class rpc_holder
dsn::error_code>::value,
"the first argument of TCallback must be dsn::error_code");

if (_mail_box) {
if (dsn_unlikely(_mail_box != nullptr)) {
_mail_box->emplace_back(request());
return nullptr;
}
Expand All @@ -125,12 +148,9 @@ class rpc_holder
dsn_request(),
svc,
[ cb_fwd = std::forward<TCallback>(callback),
this ](error_code err, dsn_message_t req, dsn_message_t resp) mutable {
rpc = *this ](error_code err, dsn_message_t req, dsn_message_t resp) mutable {
if (err == ERR_OK) {
::dsn::unmarshall(resp, response());
}
if (response().err) {
err = response().err;
::dsn::unmarshall(resp, rpc.response());
}
cb_fwd(err);
},
Expand All @@ -139,6 +159,16 @@ class rpc_holder
return t;
}

// Returns an rpc_holder that will reply the request after its lifetime ends.
// By default rpc_holder never replies.
// SEE: serverlet<T>::register_rpc_handler_with_rpc_holder
static inline rpc_holder auto_reply(dsn_message_t req)
{
rpc_holder rpc(req);
rpc._i->auto_reply = true;
return rpc;
}

// Only use this function when testing.
// In mock mode, all messages will be dropped into mail_box without going through network,
// and response callbacks will never be called.
Expand Down Expand Up @@ -168,29 +198,49 @@ class rpc_holder
struct internal
{
explicit internal(dsn_message_t req)
: dsn_request(req), thrift_request(make_unique<TRequest>())
: dsn_request(req), thrift_request(make_unique<TRequest>()), auto_reply(false)
{
// we must hold one reference for the request, or rdsn will delete it after
// the rpc call ends.
dsn_msg_add_ref(dsn_request);
dsn::unmarshall(req, *thrift_request);
}

internal(std::unique_ptr<TRequest> &req, dsn::task_code code)
: thrift_request(std::move(req))
internal(std::unique_ptr<TRequest> &req,
dsn::task_code code,
std::chrono::milliseconds timeout,
uint64_t partition_hash)
: thrift_request(std::move(req)), auto_reply(false)
{
dassert(thrift_request != nullptr, "req should not be null");

dsn_request = dsn_msg_create_request(code);
// leave thread_hash to 0
dsn_request =
dsn_msg_create_request(code, static_cast<int>(timeout.count()), 0, partition_hash);
dsn_msg_add_ref(dsn_request);
dsn::marshall(dsn_request, *thrift_request);
}

~internal() { dsn_msg_release_ref(dsn_request); }
void reply()
{
dsn_message_t dsn_response = dsn_msg_create_response(dsn_request);
::dsn::marshall(dsn_response, thrift_response);
dsn_rpc_reply(dsn_response);
}

~internal()
{
if (auto_reply) {
reply();
}
dsn_msg_release_ref(dsn_request);
}

dsn_message_t dsn_request;
std::unique_ptr<TRequest> thrift_request;
TResponse thrift_response;

bool auto_reply;
};

std::shared_ptr<internal> _i;
Expand Down
27 changes: 27 additions & 0 deletions include/dsn/cpp/serverlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

#include <dsn/cpp/clientlet.h>
#include <dsn/cpp/service_app.h>
#include <dsn/cpp/rpc_holder.h>

namespace dsn {
/*!
Expand Down Expand Up @@ -122,6 +123,12 @@ class serverlet : public virtual clientlet
void (T::*handler)(const TRequest &, TResponse &),
dsn::gpid gpid = dsn::gpid());

template <typename TRpcHolder>
bool register_rpc_handler_with_rpc_holder(dsn::task_code rpc_code,
const char *rpc_description,
void (T::*handler)(TRpcHolder),
dsn::gpid gpid = {});

template <typename TRequest, typename TResponse>
bool register_async_rpc_handler(dsn::task_code rpc_code,
const char *rpc_name_,
Expand Down Expand Up @@ -215,6 +222,26 @@ inline bool serverlet<T>::register_rpc_handler(dsn::task_code rpc_code,
return dsn_rpc_register_handler(rpc_code, rpc_name_, cb, hc, gpid);
}

template <typename T>
template <typename TRpcHolder>
inline bool serverlet<T>::register_rpc_handler_with_rpc_holder(dsn::task_code rpc_code,
const char *rpc_description,
void (T::*handler)(TRpcHolder),
dsn::gpid gpid)
{
typedef handler_context<void (T::*)(TRpcHolder)> hc_type;
auto hc = (hc_type *)malloc(sizeof(hc_type));
hc->this_ = (T *)this;
hc->cb = handler;

dsn_rpc_request_handler_t cb = [](dsn_message_t request, void *param) {
auto hc2 = (hc_type *)param;
((hc2->this_)->*(hc2->cb))(TRpcHolder::auto_reply(request));
};

return dsn_rpc_register_handler(rpc_code, rpc_description, cb, hc, gpid);
}

template <typename T>
template <typename TRequest, typename TResponse>
inline bool serverlet<T>::register_async_rpc_handler(dsn::task_code rpc_code,
Expand Down
77 changes: 77 additions & 0 deletions include/dsn/dist/fmt_logging.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2015 Microsoft Corporation
*
* -=- Robust Distributed System Nucleus (rDSN) -=-
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

#pragma once

#include <dsn/cpp/auto_codes.h>
#include <dsn/utility/errors.h>
#include <fmt/format.h>

// The macros below no longer use the default snprintf method for log message formatting,
// instead we use fmt::format.
// TODO(wutao1): prevent construction of std::string for each log.

#define dinfo_f(...) dinfo(fmt::format(__VA_ARGS__).c_str())
#define ddebug_f(...) ddebug(fmt::format(__VA_ARGS__).c_str())
#define dwarn_f(...) dwarn(fmt::format(__VA_ARGS__).c_str())
#define derror_f(...) derror(fmt::format(__VA_ARGS__).c_str())
#define dfatal_f(...) dfatal(fmt::format(__VA_ARGS__).c_str())
#define dassert_f(x, ...) dassert(x, fmt::format(__VA_ARGS__).c_str())

// Macros for writing log message prefixed by gpid.
#define dinfo_replica(...) dinfo_f("[gpid: {}] {}", get_gpid(), fmt::format(__VA_ARGS__));
#define ddebug_replica(...) ddebug_f("[gpid: {}] {}", get_gpid(), fmt::format(__VA_ARGS__));
#define dwarn_replica(...) dwarn_f("[gpid: {}] {}", get_gpid(), fmt::format(__VA_ARGS__));
#define derror_replica(...) derror_f("[gpid: {}] {}", get_gpid(), fmt::format(__VA_ARGS__));
#define dfatal_replica(...) dfatal_f("[gpid: {}] {}", get_gpid(), fmt::format(__VA_ARGS__));

// Customized formatter for rDSN basic types, on which
// users can easily call fmt::format("{}", xxx), without the effort
// of converting them into string.

namespace fmt {

inline void format_arg(fmt::BasicFormatter<char> &f, const char *format_str, dsn::gpid p)
{
f.writer().write("{}.{}", p.get_app_id(), p.get_partition_index());
}

inline void format_arg(fmt::BasicFormatter<char> &f, const char *format_str, const dsn::error_s &p)
{
f.writer().write(p.description());
}

inline void format_arg(fmt::BasicFormatter<char> &f, const char *format_str, dsn::error_code p)
{
f.writer().write(p.to_string());
}

inline void format_arg(fmt::BasicFormatter<char> &f, const char *format_str, dsn::task_code p)
{
f.writer().write(p.to_string());
}

} // namespace fmt
37 changes: 23 additions & 14 deletions include/dsn/tool-api/gpid.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,40 @@
#endif

namespace dsn {

// Group-Partition-ID.
class gpid
{
public:
gpid(int app_id, int pidx)
{
_value.u.app_id = app_id;
_value.u.partition_index = pidx;
}
gpid(const gpid &gd) { _value.value = gd._value.value; }
gpid() { _value.value = 0; }
uint64_t value() const { return _value.value; }
constexpr gpid(int app_id, int pidx) : _value({.u = {app_id, pidx}}) {}

constexpr gpid() = default;

constexpr uint64_t value() const { return _value.value; }

bool operator<(const gpid &r) const
{
return _value.u.app_id < r._value.u.app_id ||
(_value.u.app_id == r._value.u.app_id &&
_value.u.partition_index < r._value.u.partition_index);
}
bool operator==(const gpid &r) const { return value() == r.value(); }
bool operator!=(const gpid &r) const { return value() != r.value(); }

int32_t get_app_id() const { return _value.u.app_id; }
int32_t get_partition_index() const { return _value.u.partition_index; }
constexpr bool operator==(const gpid &r) const { return value() == r.value(); }

constexpr bool operator!=(const gpid &r) const { return value() != r.value(); }

constexpr int32_t get_app_id() const { return _value.u.app_id; }

constexpr int32_t get_partition_index() const { return _value.u.partition_index; }

void set_app_id(int32_t v) { _value.u.app_id = v; }

void set_partition_index(int32_t v) { _value.u.partition_index = v; }

void set_value(uint64_t v) { _value.value = v; }

bool parse_from(const char *str);

const char *to_string() const;

#ifdef DSN_USE_THRIFT_SERIALIZATION
Expand All @@ -67,6 +74,7 @@ class gpid
#endif

int thread_hash() const { return _value.u.app_id * 7919 + _value.u.partition_index; }

private:
union
{
Expand All @@ -76,9 +84,10 @@ class gpid
int32_t partition_index; ///< zero-based partition index
} u;
uint64_t value;
} _value;
} _value{.value = 0};
};
}

} // namespace dsn

namespace std {
template <>
Expand Down
Loading

0 comments on commit 0e1c040

Please sign in to comment.