Skip to content

Commit

Permalink
Create a write thread (envoyproxy#67)
Browse files Browse the repository at this point in the history
* Create a write thread

* use unique_ptr

* fix asan test failure.

* remove one std::move
  • Loading branch information
qiwzhang authored May 18, 2017
1 parent 11f8ac6 commit 8035b35
Showing 1 changed file with 69 additions and 13 deletions.
82 changes: 69 additions & 13 deletions mixerclient/src/grpc_transport.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
* limitations under the License.
*/
#include "src/grpc_transport.h"

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

namespace istio {
Expand All @@ -37,25 +40,26 @@ class GrpcStream final : public WriteInterface<RequestType> {

static void Start(
std::shared_ptr<GrpcStream<RequestType, ResponseType>> grpc_stream) {
std::thread t([grpc_stream]() { grpc_stream->ReadMainLoop(); });
t.detach();
std::thread read_t([grpc_stream]() { grpc_stream->ReadMainLoop(); });
read_t.detach();
std::thread write_t([grpc_stream]() { grpc_stream->WriteMainLoop(); });
write_t.detach();
}

void Write(const RequestType& request) override {
std::lock_guard<std::mutex> lock(write_mutex_);
if (!stream_->Write(request)) {
GOOGLE_LOG(INFO) << "Stream Write failed: half close";
write_closed_ = true;
}
// make a copy and push to the queue
WriteQueuePush(new RequestType(request));
}

void WritesDone() override {
std::lock_guard<std::mutex> lock(write_mutex_);
stream_->WritesDone();
write_closed_ = true;
// push a nullptr to indicate half close
WriteQueuePush(nullptr);
}

bool is_write_closed() const override { return write_closed_; }
bool is_write_closed() const override {
std::lock_guard<std::mutex> lock(write_closed_mutex_);
return write_closed_;
}

private:
// The worker loop to read response messages.
Expand All @@ -67,23 +71,75 @@ class GrpcStream final : public WriteInterface<RequestType> {
::grpc::Status status = stream_->Finish();
GOOGLE_LOG(INFO) << "Stream Finished with status: "
<< status.error_message();

// Notify Write thread to quit.
set_write_closed();
WriteQueuePush(nullptr);

// Convert grpc status to protobuf status.
::google::protobuf::util::Status pb_status(
::google::protobuf::util::error::Code(status.error_code()),
::google::protobuf::StringPiece(status.error_message()));
reader_->OnClose(pb_status);
}

void WriteQueuePush(RequestType* request) {
std::unique_lock<std::mutex> lk(write_queue_mutex_);
write_queue_.emplace(request);
cv_.notify_one();
}

std::unique_ptr<RequestType> WriteQueuePop() {
std::unique_lock<std::mutex> lk(write_queue_mutex_);
while (write_queue_.empty()) {
cv_.wait(lk);
}
auto ret = std::move(write_queue_.front());
write_queue_.pop();
return ret;
}

void set_write_closed() {
std::lock_guard<std::mutex> lock(write_closed_mutex_);
write_closed_ = true;
}

// The worker loop to write request message.
void WriteMainLoop() {
while (true) {
auto request = WriteQueuePop();
if (!request) {
if (!is_write_closed()) {
stream_->WritesDone();
set_write_closed();
}
break;
}
if (!stream_->Write(*request)) {
set_write_closed();
break;
}
}
}

// The client context.
::grpc::ClientContext context_;
// Mutex to make sure not calling stream_->Write() parallelly.
std::mutex write_mutex_;

// The reader writer stream.
StreamPtr stream_;
// The reader interface from caller.
ReadInterface<ResponseType>* reader_;

// Indicates if write is closed.
mutable std::mutex write_closed_mutex_;
bool write_closed_;

// Mutex to protect write queue.
std::mutex write_queue_mutex_;
// condition to wait for write_queue.
std::condition_variable cv_;
// a queue to store pending queue for write
std::queue<std::unique_ptr<RequestType>> write_queue_;
};

typedef GrpcStream<::istio::mixer::v1::CheckRequest,
Expand Down

0 comments on commit 8035b35

Please sign in to comment.