Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
task: fix based on comments
Browse files Browse the repository at this point in the history
  • Loading branch information
shengofsun committed May 14, 2018
1 parent 0f6e3c7 commit 0273bd1
Show file tree
Hide file tree
Showing 21 changed files with 226 additions and 238 deletions.
2 changes: 1 addition & 1 deletion include/dsn/c/api_layer1.h
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ Server-Side RPC Primitives
/*! register callback to handle RPC request */
extern DSN_API bool dsn_rpc_register_handler(dsn::task_code code,
const char *extra_name,
const rpc_request_handler &cb);
const dsn::rpc_request_handler &cb);

/*! unregister callback to handle RPC request, returns true if unregister ok, false if no handler
was registered */
Expand Down
2 changes: 1 addition & 1 deletion include/dsn/c/api_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@
@{
*/

namespace dsn {
typedef std::function<void()> task_handler;
typedef std::function<void(dsn_message_t)> rpc_request_handler;
typedef std::function<void(dsn::error_code, dsn_message_t, dsn_message_t)> rpc_response_handler;
typedef std::function<void(dsn::error_code, size_t)> aio_handler;

namespace dsn {
class task;
class raw_task;
class rpc_request_task;
Expand Down
98 changes: 49 additions & 49 deletions include/dsn/cpp/clientlet.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class clientlet
bool _access_thread_id_inited;
};

inline void empty_rpc_handler(dsn::error_code, dsn_message_t, dsn_message_t) {}
inline void empty_rpc_handler(error_code, dsn_message_t, dsn_message_t) {}

// callback(error_code, TResponse&& response)
template <typename TFunction, class Enable = void>
Expand All @@ -73,7 +73,7 @@ struct is_typed_rpc_callback<TFunction,
// todo: check if response_t is marshallable
using inspect_t = function_traits<TFunction>;
constexpr static bool const value =
std::is_same<typename inspect_t::template arg_t<0>, dsn::error_code>::value &&
std::is_same<typename inspect_t::template arg_t<0>, error_code>::value &&
std::is_default_constructible<
typename std::decay<typename inspect_t::template arg_t<1>>::type>::value;
using response_t = typename std::decay<typename inspect_t::template arg_t<1>>::type;
Expand All @@ -86,28 +86,27 @@ struct is_typed_rpc_callback<TFunction,

namespace tasking {
inline task_ptr
create_task(dsn::task_code code, dsn::task_tracker *tracker, task_handler &&cb, int hash = 0)
create_task(task_code code, task_tracker *tracker, task_handler &&callback, int hash = 0)
{
dsn::task_ptr t(new dsn::raw_task(code, std::move(cb), hash, nullptr));
task_ptr t(new raw_task(code, std::move(callback), hash, nullptr));
t->set_tracker(tracker);
t->spec().on_task_create.execute(::dsn::task::get_current_task(), t);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

inline task_ptr create_timer_task(dsn::task_code code,
dsn::task_tracker *tracker,
task_handler &&cb,
inline task_ptr create_timer_task(task_code code,
task_tracker *tracker,
task_handler &&callback,
std::chrono::milliseconds interval,
int hash = 0)
{
dsn::task_ptr t(new dsn::timer_task(
code, std::move(cb), static_cast<uint32_t>(interval.count()), hash, nullptr));
task_ptr t(new timer_task(code, std::move(callback), interval.count(), hash, nullptr));
t->set_tracker(tracker);
t->spec().on_task_create.execute(::dsn::task::get_current_task(), t);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

inline task_ptr enqueue(dsn::task_code code,
inline task_ptr enqueue(task_code code,
task_tracker *tracker,
task_handler &&callback,
int hash = 0,
Expand All @@ -119,25 +118,26 @@ inline task_ptr enqueue(dsn::task_code code,
return tsk;
}

inline task_ptr enqueue_timer(dsn::task_code evt,
inline task_ptr enqueue_timer(task_code evt,
task_tracker *tracker,
task_handler &&cb,
task_handler &&callback,
std::chrono::milliseconds timer_interval,
int hash = 0,
std::chrono::milliseconds delay = std::chrono::milliseconds(0))
{
auto tsk = create_timer_task(evt, tracker, std::move(cb), timer_interval, hash);
auto tsk = create_timer_task(evt, tracker, std::move(callback), timer_interval, hash);
tsk->set_delay(static_cast<int>(delay.count()));
tsk->enqueue();
return tsk;
}

template <typename TCallback>
inline dsn::ref_ptr<dsn::safe_late_task<TCallback>> create_late_task(
dsn::task_code code, const TCallback &cb, int hash = 0, task_tracker *tracker = nullptr)
dsn::task_code code, const TCallback &callback, int hash = 0, task_tracker *tracker = nullptr)
{
using result_task_type = safe_late_task<typename std::remove_cv<TCallback>::type>;
dsn::ref_ptr<result_task_type> ptr(new result_task_type(code, std::move(cb), hash, nullptr));
dsn::ref_ptr<result_task_type> ptr(
new result_task_type(code, std::move(callback), hash, nullptr));
ptr->set_tracker(tracker);
ptr->spec().on_task_create.execute(::dsn::task::get_current_task(), ptr);
return ptr;
Expand Down Expand Up @@ -170,7 +170,7 @@ inline rpc_response_task_ptr create_rpc_response_task(dsn_message_t req,
rpc_response_task_ptr t(
new rpc_response_task((message_ex *)req, std::move(callback), reply_thread_hash, nullptr));
t->set_tracker(tracker);
t->spec().on_task_create.execute(::dsn::task::get_current_task(), t);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

Expand All @@ -186,16 +186,16 @@ create_rpc_response_task(dsn_message_t req,
tracker,
[cb_fwd = std::move(cb)](error_code err, dsn_message_t req, dsn_message_t resp) mutable {
typename is_typed_rpc_callback<TCallback>::response_t response = {};
if (err == dsn::ERR_OK) {
dsn::unmarshall(resp, response);
if (err == ERR_OK) {
unmarshall(resp, response);
}
cb_fwd(err, std::move(response));
},
reply_thread_hash);
}

template <typename TCallback>
rpc_response_task_ptr call(::dsn::rpc_address server,
rpc_response_task_ptr call(rpc_address server,
dsn_message_t request,
task_tracker *tracker,
TCallback &&callback,
Expand All @@ -211,17 +211,17 @@ rpc_response_task_ptr call(::dsn::rpc_address server,
// for TRequest/TResponse, we assume that the following routines are defined:
// marshall(binary_writer& writer, const T& val);
// unmarshall(binary_reader& reader, /*out*/ T& val);
// either in the namespace of ::dsn::utils or T
// either in the namespace of utils or T
// developers may write these helper functions by their own, or use tools
// such as protocol-buffer, thrift, or bond to generate these functions automatically
// for their TRequest and TResponse
//
template <typename TRequest, typename TCallback>
rpc_response_task_ptr
call(dsn::rpc_address server,
dsn::task_code code,
call(rpc_address server,
task_code code,
TRequest &&req,
dsn::task_tracker *owner,
task_tracker *tracker,
TCallback &&callback,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
int thread_hash = 0, ///< if thread_hash == 0 && partition_hash != 0, thread_hash is
Expand All @@ -231,40 +231,40 @@ call(dsn::rpc_address server,
{
dsn_message_t msg = dsn_msg_create_request(
code, static_cast<int>(timeout.count()), thread_hash, partition_hash);
::dsn::marshall(msg, std::forward<TRequest>(req));
return call(server, msg, owner, std::forward<TCallback>(callback), reply_thread_hash);
marshall(msg, std::forward<TRequest>(req));
return call(server, msg, tracker, std::forward<TCallback>(callback), reply_thread_hash);
}

// no callback
template <typename TRequest>
void call_one_way_typed(::dsn::rpc_address server,
dsn::task_code code,
void call_one_way_typed(rpc_address server,
task_code code,
const TRequest &req,
int thread_hash = 0, ///< if thread_hash == 0 && partition_hash != 0,
/// thread_hash is computed from partition_hash
uint64_t partition_hash = 0)
{
dsn_message_t msg = dsn_msg_create_request(code, 0, thread_hash, partition_hash);
::dsn::marshall(msg, req);
marshall(msg, req);
dsn_rpc_call_one_way(server, msg);
}

template <typename TResponse>
std::pair<::dsn::error_code, TResponse> wait_and_unwrap(rpc_response_task_ptr tsk)
std::pair<error_code, TResponse> wait_and_unwrap(const rpc_response_task_ptr &tsk)
{
tsk->wait();
std::pair<::dsn::error_code, TResponse> result;
std::pair<error_code, TResponse> result;
result.first = tsk->error();
if (tsk->error() == ::dsn::ERR_OK) {
::dsn::unmarshall(tsk->get_response(), result.second);
if (tsk->error() == ERR_OK) {
unmarshall(tsk->get_response(), result.second);
}
return result;
}

template <typename TResponse, typename TRequest>
std::pair<::dsn::error_code, TResponse>
call_wait(::dsn::rpc_address server,
dsn::task_code code,
std::pair<error_code, TResponse>
call_wait(rpc_address server,
task_code code,
TRequest &&req,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0),
int thread_hash = 0,
Expand All @@ -289,19 +289,19 @@ call_wait(::dsn::rpc_address server,
namespace file {

inline aio_task_ptr
create_aio_task(dsn::task_code code, task_tracker *tracker, aio_handler &&callback, int hash)
create_aio_task(task_code code, task_tracker *tracker, aio_handler &&callback, int hash = 0)
{
aio_task_ptr t(new aio_task(code, std::move(callback), hash));
t->set_tracker((dsn::task_tracker *)tracker);
t->spec().on_task_create.execute(::dsn::task::get_current_task(), t);
t->set_tracker((task_tracker *)tracker);
t->spec().on_task_create.execute(task::get_current_task(), t);
return t;
}

inline aio_task_ptr read(dsn_handle_t fh,
char *buffer,
int count,
uint64_t offset,
dsn::task_code callback_code,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
Expand All @@ -315,7 +315,7 @@ inline aio_task_ptr write(dsn_handle_t fh,
const char *buffer,
int count,
uint64_t offset,
dsn::task_code callback_code,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
Expand All @@ -329,7 +329,7 @@ inline aio_task_ptr write_vector(dsn_handle_t fh,
const dsn_file_buffer_t *buffers,
int buffer_count,
uint64_t offset,
dsn::task_code callback_code,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
Expand All @@ -339,21 +339,21 @@ inline aio_task_ptr write_vector(dsn_handle_t fh,
return tsk;
}

void copy_remote_files_impl(::dsn::rpc_address remote,
void copy_remote_files_impl(rpc_address remote,
const std::string &source_dir,
const std::vector<std::string> &files, // empty for all
const std::string &dest_dir,
bool overwrite,
bool high_priority,
dsn::aio_task *tsk);
aio_task *tsk);

inline aio_task_ptr copy_remote_files(::dsn::rpc_address remote,
inline aio_task_ptr copy_remote_files(rpc_address remote,
const std::string &source_dir,
const std::vector<std::string> &files, // empty for all
const std::string &dest_dir,
bool overwrite,
bool high_priority,
dsn::task_code callback_code,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
Expand All @@ -364,12 +364,12 @@ inline aio_task_ptr copy_remote_files(::dsn::rpc_address remote,
return tsk;
}

inline aio_task_ptr copy_remote_directory(::dsn::rpc_address remote,
inline aio_task_ptr copy_remote_directory(rpc_address remote,
const std::string &source_dir,
const std::string &dest_dir,
bool overwrite,
bool high_priority,
dsn::task_code callback_code,
task_code callback_code,
task_tracker *tracker,
aio_handler &&callback,
int hash = 0)
Expand Down
2 changes: 1 addition & 1 deletion include/dsn/dist/replication/replication_ddl_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class replication_ddl_client
private:
bool static valid_app_char(int c);

void end_meta_request(rpc_response_task_ptr &&callback,
void end_meta_request(const rpc_response_task_ptr &callback,
int retry_times,
error_code err,
dsn_message_t request,
Expand Down
Loading

0 comments on commit 0273bd1

Please sign in to comment.