Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run multiple threads for PBFT #838

Merged
merged 5 commits into from
Feb 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmake/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ else()
set(CONSENSUS_ARG "raft")
endif()

if((NOT CMAKE_BUILD_TYPE STREQUAL "Debug") AND NOT PBFT)
if((NOT CMAKE_BUILD_TYPE STREQUAL "Debug") AND NOT SAN)
set(WORKER_THREADS 2)
else()
set(WORKER_THREADS 0)
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Checkpoint : public Message
// Checkpoint messages
//
public:
Checkpoint(uint32_t msg_size = 0) : Message(msg_size) {}

Checkpoint(Seqno s, Digest& d, bool stable = false);
// Effects: Creates a new signed Checkpoint message with sequence
// number "s" and digest "d". "stable" should be true iff the checkpoint
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Client_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ bool ClientProxy<T, C>::send_request(
}

Request_id rid = request_id_generator.next_rid();
auto req = std::make_unique<Request>(rid);
auto req = std::make_unique<Request>(rid, -1);
if (req == nullptr)
{
current_outstanding.fetch_sub(1);
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Commit : public Message
// Commit messages
//
public:
Commit(uint32_t msg_size = 0) : Message(msg_size) {}

Commit(View v, Seqno s);
// Effects: Creates a new Commit message with view number "v"
// and sequence number "s".
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Data.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Data : public Message
// Data messages
//
public:
Data(uint32_t msg_size = 0) : Message(msg_size) {}

Data(size_t i, Seqno lm, char* data);
// Effects: Creates a new Data message. i is the index of he data block, lm
// is the last sequence number when it was modified, and data is a pointer to
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Fetch : public Message
// Fetch messages
//
public:
Fetch(uint32_t msg_size = 0) : Message(msg_size) {}

Fetch(
Request_id rid,
Seqno lu,
Expand Down
36 changes: 15 additions & 21 deletions src/consensus/pbft/libbyz/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ class Message
//
// Generic messages
//
public:
protected:
Message(unsigned sz = 0);
// Effects: Creates an untagged Message object that can hold up
// to "sz" bytes and holds zero bytes. Useful to create message
// buffers to receive messages from the network.

~Message();
public:
virtual ~Message();
// Effects: Deallocates all storage associated with this message.

void trim();
Expand All @@ -91,6 +92,18 @@ class Message
int tag() const;
// Effects: Fetches the message tag.

static int get_tag(const uint8_t* data)
{
Message_rep* m = (Message_rep*)data;
return m->tag;
}

static int get_size(const uint8_t* data)
{
Message_rep* m = (Message_rep*)data;
return m->size;
}

bool has_tag(int t, int sz) const;
// Effects: If message has tag "t", its size is greater than "sz",
// its size less than or equal to "max_size", and its size is a
Expand All @@ -103,10 +116,6 @@ class Message
// Effects: Messages may be full or empty. Empty messages are just
// digests of full messages.

// Message-specific heap management operators.
void* operator new(size_t s);
void operator delete(void* x, size_t s);

const char* stag();
// Effects: Returns a string with tag name

Expand Down Expand Up @@ -186,21 +195,6 @@ inline bool Message::full() const
return true;
}

inline void* Message::operator new(size_t s)
{
void* ret = malloc(s);
PBFT_ASSERT(ret != 0, "Ran out of memory\n");
return ret;
}

inline void Message::operator delete(void* x, size_t s)
{
if (x != 0)
{
free(x);
}
}

inline int Message::msize() const
{
return (max_size >= 0) ? max_size : msg->size;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Meta_data : public Message
// Meta_data messages
//
public:
Meta_data(uint32_t msg_size = 0) : Message(msg_size) {}

Meta_data(Request_id r, int l, size_t i, Seqno lu, Seqno lm, Digest& d);
// Effects: Creates a new un-authenticated Meta_data message with no
// subpartition information.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Meta_data_d.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Meta_data_d : public Message
// Meta_data_d messages
//
public:
Meta_data_d(uint32_t msg_size = 0) : Message(msg_size) {}

Meta_data_d(Request_id r, int l, size_t i, Seqno ls);
// Effects: Creates a new un-authenticated Meta_data_d message with no
// partition digests.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Network_open.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct Network_open_rep : public Message_rep
class Network_open : public Message
{
public:
Network_open(uint32_t msg_size = 0) : Message(msg_size) {}

Network_open(int id);

int id() const;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/New_principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct New_principal_rep : public Message_rep
class New_principal : public Message
{
public:
New_principal(uint32_t msg_size = 0) : Message(msg_size) {}

New_principal(
NodeId id,
short port,
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/New_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class New_view : public Message
// New_view messages
//
public:
New_view(uint32_t msg_size = 0) : Message(msg_size) {}

New_view(View v);
// Effects: Creates a new (unsigned) New_view message with an empty
// set of view change messages.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Pre_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class Pre_prepare : public Message
// Pre_prepare messages
//
public:
Pre_prepare(uint32_t msg_size = 0) : Message(msg_size) {}

Pre_prepare(View v, Seqno s, Req_queue& reqs, size_t& requests_in_batch);
// Effects: Creates a new signed Pre_prepare message with view
// number "v", sequence number "s", the requests in "reqs" (up to a
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Prepare : public Message
// Prepare messages
//
public:
Prepare(uint32_t msg_size = 0) : Message(msg_size) {}

Prepare(
View v, Seqno s, Digest& d, Principal* dst = 0, bool is_signed = false);
// Effects: Creates a new signed Prepare message with view number
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Query_stable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Query_stable : public Message
// Query_stable messages
//
public:
Query_stable(uint32_t msg_size = 0) : Message(msg_size) {}

Query_stable();
// Effects: Creates a new authenticated Query_stable message.

Expand Down
103 changes: 95 additions & 8 deletions src/consensus/pbft/libbyz/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,107 @@ static void pre_verify_cb(std::unique_ptr<enclave::Tmsg<PreVerifyCbMsg>> req)

static uint64_t verification_thread = 0;

Message* Replica::create_message(const uint8_t* data, uint32_t size)
{
uint64_t alloc_size = std::max(size, (uint32_t)Max_message_size);

Message* m;

switch (Message::get_tag(data))
{
case Request_tag:
m = new Request(alloc_size);
break;

case Reply_tag:
m = new Reply(alloc_size);
break;

case Pre_prepare_tag:
m = new Pre_prepare(alloc_size);
break;

case Prepare_tag:
m = new Prepare(alloc_size);
break;

case Commit_tag:
m = new Commit(alloc_size);
break;

case Checkpoint_tag:
m = new Checkpoint(alloc_size);
break;

#ifndef USE_PKEY_VIEW_CHANGES
case View_change_ack_tag:
m = new View_change_ack(alloc_size);
break;
#endif

case Status_tag:
m = new Status(alloc_size);
break;

case Fetch_tag:
m = new Fetch(alloc_size);
break;

case Query_stable_tag:
m = new Query_stable(alloc_size);
break;

case Reply_stable_tag:
m = new Reply_stable(alloc_size);
break;

case Meta_data_tag:
m = new Meta_data(alloc_size);
break;

case Meta_data_d_tag:
m = new Meta_data_d(alloc_size);
break;

case Data_tag:
m = new Data(alloc_size);
break;

case View_change_tag:
m = new View_change(alloc_size);
break;

case New_view_tag:
m = new New_view((uint32_t)alloc_size);
break;

case New_principal_tag:
m = new New_principal(alloc_size);
break;

case Network_open_tag:
m = new Network_open((uint32_t)alloc_size);
break;

default:
// Unknown message type.
delete m;
}

memcpy(m->contents(), data, size);

return m;
}

void Replica::receive_message(const uint8_t* data, uint32_t size)
{
if (size > Max_message_size)
{
LOG_FAIL << "Received message size exceeds message: " << size << std::endl;
}
uint64_t alloc_size = std::max(size, (uint32_t)Max_message_size);
Message* m = new Message(alloc_size);

Message* m = create_message(data, size);
uint32_t target_thread = 0;

memcpy(m->contents(), data, size);

if (enclave::ThreadMessaging::thread_count > 1 && m->tag() == Request_tag)
{
uint32_t num_worker_thread = enclave::ThreadMessaging::thread_count - 1;
Expand Down Expand Up @@ -1168,8 +1256,7 @@ template <class T>
std::unique_ptr<T> Replica::create_message(
const uint8_t* message_data, size_t data_size)
{
Message* m = new Message(data_size);
std::copy(message_data, message_data + data_size, m->contents());
Message* m = create_message(message_data, data_size);
T* msg_type;
T::convert(m, msg_type);
return std::unique_ptr<T>(msg_type);
Expand Down Expand Up @@ -2776,7 +2863,7 @@ void Replica::handle(Reply_stable* m)
LOG_INFO << "sending recovery request" << std::endl;
// Send recovery request.
START_CC(rr_time);
rr = new Request(new_rid());
rr = new Request(new_rid(), -1);

int len;
char* buf = rr->store_command(len);
Expand Down
3 changes: 3 additions & 0 deletions src/consensus/pbft/libbyz/Replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class Replica : public Node, public IMessageReceiveBase
// Effects: Use when messages are passed to Replica rather than replica
// polling

static Message* create_message(const uint8_t* data, uint32_t size);
// Effects: Creates a new message from a buffer

bool compare_execution_results(const ByzInfo& info, Pre_prepare* pre_prepare);
// Compare the merkle root and batch ctx between the pre-prepare and the
// the corresponding fields in info after execution
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Reply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Reply::Reply(

Reply* Reply::copy(int id) const
{
Reply* ret = (Reply*)new Message(msg->size);
Reply* ret = (Reply*)new Reply(msg->size);
memcpy(ret->msg, msg, msg->size);
ret->rep().replica = id;
return ret;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Reply : public Message
public:
Reply() : Message() {}

Reply(uint32_t msg_size) : Message(msg_size) {}

Reply(Reply_rep* r);

Reply(View view, Request_id req, Seqno n, int replica, uint32_t reply_size);
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Reply_stable.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class Reply_stable : public Message
// Reply_stable messages
//
public:
Reply_stable(uint32_t msg_size = 0) : Message(msg_size) {}

Reply_stable(Seqno lc, Seqno lp, int n, Principal* p);
// Effects: Creates a new authenticated Reply_stable message with
// last checkpoint "lc", last prepared request "lp", for a
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Request::Request(Request_id r, short rr) :

Request* Request::clone() const
{
Request* ret = (Request*)new Message(max_size);
Request* ret = (Request*)new Request(max_size);
memcpy(ret->msg, msg, msg->size);
return ret;
}
Expand Down
5 changes: 3 additions & 2 deletions src/consensus/pbft/libbyz/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ class Request : public Message
// memory the user expects to be able to use.)
//
public:
Request() : Message() {}
// Request() : Message() {}
Request(uint32_t msg_size = 0) : Message(msg_size) {}

Request(Request_id r, short rr = -1);
Request(Request_id r, short rr);
// Effects: Creates a new signed Request message with an empty
// command and no authentication. The methods store_command and
// authenticate should be used to finish message construction.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class Status : public Message
// Status messages
//
public:
Status(uint32_t msg_size = 0) : Message(msg_size) {}

Status(View v, Seqno ls, Seqno le, bool hnvi, bool hnvm);
// Effects: Creates a new unauthenticated Status message. "v"
// should be the sending replica's current view, "ls" should be the
Expand Down
Loading