Skip to content

Commit

Permalink
Run multiple threads for PBFT (microsoft#838)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashamis authored Feb 13, 2020
1 parent 4f2ebfe commit dc6e8e5
Show file tree
Hide file tree
Showing 29 changed files with 187 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cmake/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ else()
set(CONSENSUS_ARG "raft")
endif()

if((NOT CMAKE_BUILD_TYPE STREQUAL "Debug") AND NOT PBFT)
if((NOT CMAKE_BUILD_TYPE STREQUAL "Debug") AND NOT SAN)
set(WORKER_THREADS 2)
else()
set(WORKER_THREADS 0)
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Checkpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class Checkpoint : public Message
// Checkpoint messages
//
public:
Checkpoint(uint32_t msg_size = 0) : Message(msg_size) {}

Checkpoint(Seqno s, Digest& d, bool stable = false);
// Effects: Creates a new signed Checkpoint message with sequence
// number "s" and digest "d". "stable" should be true iff the checkpoint
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Client_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ bool ClientProxy<T, C>::send_request(
}

Request_id rid = request_id_generator.next_rid();
auto req = std::make_unique<Request>(rid);
auto req = std::make_unique<Request>(rid, -1);
if (req == nullptr)
{
current_outstanding.fetch_sub(1);
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Commit.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class Commit : public Message
// Commit messages
//
public:
Commit(uint32_t msg_size = 0) : Message(msg_size) {}

Commit(View v, Seqno s);
// Effects: Creates a new Commit message with view number "v"
// and sequence number "s".
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Data.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class Data : public Message
// Data messages
//
public:
Data(uint32_t msg_size = 0) : Message(msg_size) {}

Data(size_t i, Seqno lm, char* data);
// Effects: Creates a new Data message. i is the index of he data block, lm
// is the last sequence number when it was modified, and data is a pointer to
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Fetch.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class Fetch : public Message
// Fetch messages
//
public:
Fetch(uint32_t msg_size = 0) : Message(msg_size) {}

Fetch(
Request_id rid,
Seqno lu,
Expand Down
36 changes: 15 additions & 21 deletions src/consensus/pbft/libbyz/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,14 @@ class Message
//
// Generic messages
//
public:
protected:
Message(unsigned sz = 0);
// Effects: Creates an untagged Message object that can hold up
// to "sz" bytes and holds zero bytes. Useful to create message
// buffers to receive messages from the network.

~Message();
public:
virtual ~Message();
// Effects: Deallocates all storage associated with this message.

void trim();
Expand All @@ -91,6 +92,18 @@ class Message
int tag() const;
// Effects: Fetches the message tag.

static int get_tag(const uint8_t* data)
{
Message_rep* m = (Message_rep*)data;
return m->tag;
}

static int get_size(const uint8_t* data)
{
Message_rep* m = (Message_rep*)data;
return m->size;
}

bool has_tag(int t, int sz) const;
// Effects: If message has tag "t", its size is greater than "sz",
// its size less than or equal to "max_size", and its size is a
Expand All @@ -103,10 +116,6 @@ class Message
// Effects: Messages may be full or empty. Empty messages are just
// digests of full messages.

// Message-specific heap management operators.
void* operator new(size_t s);
void operator delete(void* x, size_t s);

const char* stag();
// Effects: Returns a string with tag name

Expand Down Expand Up @@ -186,21 +195,6 @@ inline bool Message::full() const
return true;
}

inline void* Message::operator new(size_t s)
{
void* ret = malloc(s);
PBFT_ASSERT(ret != 0, "Ran out of memory\n");
return ret;
}

inline void Message::operator delete(void* x, size_t s)
{
if (x != 0)
{
free(x);
}
}

inline int Message::msize() const
{
return (max_size >= 0) ? max_size : msg->size;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Meta_data.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Meta_data : public Message
// Meta_data messages
//
public:
Meta_data(uint32_t msg_size = 0) : Message(msg_size) {}

Meta_data(Request_id r, int l, size_t i, Seqno lu, Seqno lm, Digest& d);
// Effects: Creates a new un-authenticated Meta_data message with no
// subpartition information.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Meta_data_d.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class Meta_data_d : public Message
// Meta_data_d messages
//
public:
Meta_data_d(uint32_t msg_size = 0) : Message(msg_size) {}

Meta_data_d(Request_id r, int l, size_t i, Seqno ls);
// Effects: Creates a new un-authenticated Meta_data_d message with no
// partition digests.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Network_open.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ struct Network_open_rep : public Message_rep
class Network_open : public Message
{
public:
Network_open(uint32_t msg_size = 0) : Message(msg_size) {}

Network_open(int id);

int id() const;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/New_principal.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ struct New_principal_rep : public Message_rep
class New_principal : public Message
{
public:
New_principal(uint32_t msg_size = 0) : Message(msg_size) {}

New_principal(
NodeId id,
short port,
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/New_view.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class New_view : public Message
// New_view messages
//
public:
New_view(uint32_t msg_size = 0) : Message(msg_size) {}

New_view(View v);
// Effects: Creates a new (unsigned) New_view message with an empty
// set of view change messages.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Pre_prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ class Pre_prepare : public Message
// Pre_prepare messages
//
public:
Pre_prepare(uint32_t msg_size = 0) : Message(msg_size) {}

Pre_prepare(View v, Seqno s, Req_queue& reqs, size_t& requests_in_batch);
// Effects: Creates a new signed Pre_prepare message with view
// number "v", sequence number "s", the requests in "reqs" (up to a
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Prepare.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class Prepare : public Message
// Prepare messages
//
public:
Prepare(uint32_t msg_size = 0) : Message(msg_size) {}

Prepare(
View v, Seqno s, Digest& d, Principal* dst = 0, bool is_signed = false);
// Effects: Creates a new signed Prepare message with view number
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Query_stable.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Query_stable : public Message
// Query_stable messages
//
public:
Query_stable(uint32_t msg_size = 0) : Message(msg_size) {}

Query_stable();
// Effects: Creates a new authenticated Query_stable message.

Expand Down
103 changes: 95 additions & 8 deletions src/consensus/pbft/libbyz/Replica.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,19 +247,107 @@ static void pre_verify_cb(std::unique_ptr<enclave::Tmsg<PreVerifyCbMsg>> req)

static uint64_t verification_thread = 0;

Message* Replica::create_message(const uint8_t* data, uint32_t size)
{
uint64_t alloc_size = std::max(size, (uint32_t)Max_message_size);

Message* m;

switch (Message::get_tag(data))
{
case Request_tag:
m = new Request(alloc_size);
break;

case Reply_tag:
m = new Reply(alloc_size);
break;

case Pre_prepare_tag:
m = new Pre_prepare(alloc_size);
break;

case Prepare_tag:
m = new Prepare(alloc_size);
break;

case Commit_tag:
m = new Commit(alloc_size);
break;

case Checkpoint_tag:
m = new Checkpoint(alloc_size);
break;

#ifndef USE_PKEY_VIEW_CHANGES
case View_change_ack_tag:
m = new View_change_ack(alloc_size);
break;
#endif

case Status_tag:
m = new Status(alloc_size);
break;

case Fetch_tag:
m = new Fetch(alloc_size);
break;

case Query_stable_tag:
m = new Query_stable(alloc_size);
break;

case Reply_stable_tag:
m = new Reply_stable(alloc_size);
break;

case Meta_data_tag:
m = new Meta_data(alloc_size);
break;

case Meta_data_d_tag:
m = new Meta_data_d(alloc_size);
break;

case Data_tag:
m = new Data(alloc_size);
break;

case View_change_tag:
m = new View_change(alloc_size);
break;

case New_view_tag:
m = new New_view((uint32_t)alloc_size);
break;

case New_principal_tag:
m = new New_principal(alloc_size);
break;

case Network_open_tag:
m = new Network_open((uint32_t)alloc_size);
break;

default:
// Unknown message type.
delete m;
}

memcpy(m->contents(), data, size);

return m;
}

void Replica::receive_message(const uint8_t* data, uint32_t size)
{
if (size > Max_message_size)
{
LOG_FAIL << "Received message size exceeds message: " << size << std::endl;
}
uint64_t alloc_size = std::max(size, (uint32_t)Max_message_size);
Message* m = new Message(alloc_size);

Message* m = create_message(data, size);
uint32_t target_thread = 0;

memcpy(m->contents(), data, size);

if (enclave::ThreadMessaging::thread_count > 1 && m->tag() == Request_tag)
{
uint32_t num_worker_thread = enclave::ThreadMessaging::thread_count - 1;
Expand Down Expand Up @@ -1168,8 +1256,7 @@ template <class T>
std::unique_ptr<T> Replica::create_message(
const uint8_t* message_data, size_t data_size)
{
Message* m = new Message(data_size);
std::copy(message_data, message_data + data_size, m->contents());
Message* m = create_message(message_data, data_size);
T* msg_type;
T::convert(m, msg_type);
return std::unique_ptr<T>(msg_type);
Expand Down Expand Up @@ -2776,7 +2863,7 @@ void Replica::handle(Reply_stable* m)
LOG_INFO << "sending recovery request" << std::endl;
// Send recovery request.
START_CC(rr_time);
rr = new Request(new_rid());
rr = new Request(new_rid(), -1);

int len;
char* buf = rr->store_command(len);
Expand Down
3 changes: 3 additions & 0 deletions src/consensus/pbft/libbyz/Replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ class Replica : public Node, public IMessageReceiveBase
// Effects: Use when messages are passed to Replica rather than replica
// polling

static Message* create_message(const uint8_t* data, uint32_t size);
// Effects: Creates a new message from a buffer

bool compare_execution_results(const ByzInfo& info, Pre_prepare* pre_prepare);
// Compare the merkle root and batch ctx between the pre-prepare and the
// the corresponding fields in info after execution
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Reply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Reply::Reply(

Reply* Reply::copy(int id) const
{
Reply* ret = (Reply*)new Message(msg->size);
Reply* ret = (Reply*)new Reply(msg->size);
memcpy(ret->msg, msg, msg->size);
ret->rep().replica = id;
return ret;
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Reply.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class Reply : public Message
public:
Reply() : Message() {}

Reply(uint32_t msg_size) : Message(msg_size) {}

Reply(Reply_rep* r);

Reply(View view, Request_id req, Seqno n, int replica, uint32_t reply_size);
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Reply_stable.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class Reply_stable : public Message
// Reply_stable messages
//
public:
Reply_stable(uint32_t msg_size = 0) : Message(msg_size) {}

Reply_stable(Seqno lc, Seqno lp, int n, Principal* p);
// Effects: Creates a new authenticated Reply_stable message with
// last checkpoint "lc", last prepared request "lp", for a
Expand Down
2 changes: 1 addition & 1 deletion src/consensus/pbft/libbyz/Request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Request::Request(Request_id r, short rr) :

Request* Request::clone() const
{
Request* ret = (Request*)new Message(max_size);
Request* ret = (Request*)new Request(max_size);
memcpy(ret->msg, msg, msg->size);
return ret;
}
Expand Down
5 changes: 3 additions & 2 deletions src/consensus/pbft/libbyz/Request.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ class Request : public Message
// memory the user expects to be able to use.)
//
public:
Request() : Message() {}
// Request() : Message() {}
Request(uint32_t msg_size = 0) : Message(msg_size) {}

Request(Request_id r, short rr = -1);
Request(Request_id r, short rr);
// Effects: Creates a new signed Request message with an empty
// command and no authentication. The methods store_command and
// authenticate should be used to finish message construction.
Expand Down
2 changes: 2 additions & 0 deletions src/consensus/pbft/libbyz/Status.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class Status : public Message
// Status messages
//
public:
Status(uint32_t msg_size = 0) : Message(msg_size) {}

Status(View v, Seqno ls, Seqno le, bool hnvi, bool hnvm);
// Effects: Creates a new unauthenticated Status message. "v"
// should be the sending replica's current view, "ls" should be the
Expand Down
Loading

0 comments on commit dc6e8e5

Please sign in to comment.