Skip to content

Commit

Permalink
gRPC and HTTP calls return on RootContext. Add setEffectiveContext()… (
Browse files Browse the repository at this point in the history
…#105)

* gRPC and HTTP calls return on RootContext.  Add setEffectiveContext() call.

Signed-off-by: John Plevyak <[email protected]>
  • Loading branch information
jplevyak authored Jul 31, 2019
1 parent 73234ef commit 89200de
Show file tree
Hide file tree
Showing 33 changed files with 101,281 additions and 103,515 deletions.
3 changes: 3 additions & 0 deletions api/wasm/cpp/proxy_wasm_externs.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ extern "C" void proxy_incrementMetric(uint32_t metric_id, int64_t offset);
extern "C" void proxy_recordMetric(uint32_t metric_id, uint64_t value);
extern "C" uint64_t proxy_getMetric(uint32_t metric_id);

// System
// Returns 0 on success.
extern "C" uint32_t proxy_setEffectiveContext(uint32_t effective_context_id);
150 changes: 84 additions & 66 deletions api/wasm/cpp/proxy_wasm_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,26 +147,28 @@ using HeaderStringPairs = std::vector<std::pair<std::string, std::string>>;

class GrpcCallHandlerBase {
public:
GrpcCallHandlerBase(Context* context) : context_(context) {}
GrpcCallHandlerBase() {}
virtual ~GrpcCallHandlerBase() {}

RootContext* context() { return context_; }

void cancel();

virtual void onCreateInitialMetadata() {}
virtual void onSuccess(std::unique_ptr<WasmData> message) = 0;
virtual void onFailure(GrpcStatus status, std::unique_ptr<WasmData> error_message) = 0;

private:
friend class ContextBase;
friend class RootContext;

Context* const context_;
RootContext* context_{nullptr};
uint32_t token_;
};

template<typename Message>
class GrpcCallHandler : public GrpcCallHandlerBase {
public:
GrpcCallHandler(Context* context) : GrpcCallHandlerBase(context) {}
GrpcCallHandler() : GrpcCallHandlerBase() {}
virtual ~GrpcCallHandler() {}

virtual void onSuccess(Message&& response) = 0;
Expand All @@ -179,9 +181,11 @@ class GrpcCallHandler : public GrpcCallHandlerBase {

class GrpcStreamHandlerBase {
public:
GrpcStreamHandlerBase(Context* context) : context_(context) {}
GrpcStreamHandlerBase() {}
virtual ~GrpcStreamHandlerBase() {}

RootContext* context() { return context_; }

// NB: with end_of_stream == true, callbacks can still occur: reset() to prevent further callbacks.
void send(StringView message, bool end_of_stream);
void close(); // NB: callbacks can still occur: reset() to prevent further callbacks.
Expand All @@ -194,20 +198,20 @@ class GrpcStreamHandlerBase {
virtual void onRemoteClose(GrpcStatus status, std::unique_ptr<WasmData> error_message) = 0;

protected:
friend class ContextBase;
friend class RootContext;

void doRemoteClose(GrpcStatus status, std::unique_ptr<WasmData> error_message);

bool local_close_ = false;
bool remote_close_ = false;
Context* const context_;
bool local_close_{false};
bool remote_close_{false};
RootContext* context_{nullptr};
uint32_t token_;
};

template<typename Request, typename Response>
class GrpcStreamHandler : public GrpcStreamHandlerBase {
public:
GrpcStreamHandler(Context* context) : GrpcStreamHandlerBase(context) {}
GrpcStreamHandler() : GrpcStreamHandlerBase() {}
virtual ~GrpcStreamHandler() {}

void send(const Request& message, bool end_of_stream) {
Expand Down Expand Up @@ -235,15 +239,8 @@ class ContextBase {

uint32_t id() { return id_; }

// Low level HTTP/gRPC interface.
virtual void onHttpCallResponse(uint32_t token, std::unique_ptr<WasmData> header_pairs,
std::unique_ptr<WasmData> body,
std::unique_ptr<WasmData> trailer_pairs);
virtual void onGrpcCreateInitialMetadata(uint32_t token);
virtual void onGrpcReceiveInitialMetadata(uint32_t token);
virtual void onGrpcReceiveTrailingMetadata(uint32_t token);
virtual void onGrpcReceive(uint32_t token, std::unique_ptr<WasmData> message);
virtual void onGrpcClose(uint32_t token, GrpcStatus status, std::unique_ptr<WasmData> message);
// Make this context the effective context for calls out of the VM.
bool setEffectiveContext();

virtual RootContext* asRoot() { return nullptr; }
virtual Context* asContext() { return nullptr; }
Expand All @@ -255,38 +252,9 @@ class ContextBase {
MetadataResult nodeMetadataStruct(google::protobuf::Struct* struct_ptr);
MetadataResult namedMetadataValue(MetadataType type, StringView name, StringView key, google::protobuf::Value* value_ptr);

// Default high level HTTP/gRPC interface. NB: overriding the low level interface will disable this interface.
using HttpCallCallback = std::function<void(std::unique_ptr<WasmData> header_pairs,
std::unique_ptr<WasmData> body, std::unique_ptr<WasmData> trailer_pairs)>;
using GrpcSimpleCallCallback = std::function<void(GrpcStatus status, std::unique_ptr<WasmData> message)>;
// Returns false on setup error.
bool httpCall(StringView uri, const HeaderStringPairs& request_headers,
StringView request_body, const HeaderStringPairs& request_trailers,
uint32_t timeout_milliseconds, HttpCallCallback callback);
// NB: the message is the response if status == OK and an error message otherwise.
// Returns false on setup error.
bool grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds, GrpcSimpleCallCallback callback);
template<typename Response> void grpcSimpleCall(StringView service, StringView service_name,
StringView method_name, const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds,
std::function<void(Response&& response)> success_callback,
std::function<void(GrpcStatus status, StringView error_message)> failure_callback) {
auto callback = [success_callback, failure_callback](GrpcStatus status, std::unique_ptr<WasmData> message) {
if (status == GrpcStatus::Ok) {
success_callback(message->proto<Response>());
} else {
failure_callback(status, message->view());
}
};
grpcSimpleCall(service, service_name, method_name, request, timeout_milliseconds, callback);
}
// Returns false on setup error.
bool grpcCallHandler(StringView service, StringView service_name,
StringView method_name, const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler);
// Returns false on setup error.
bool grpcStreamHandler(StringView service, StringView service_name,
StringView method_name, std::unique_ptr<GrpcStreamHandlerBase> handler);

protected:
MetadataResult metadataValue(MetadataType type, StringView key, google::protobuf::Value* value_ptr);
Expand All @@ -300,14 +268,7 @@ class ContextBase {
std::unordered_map<std::pair<EnumType, std::string>, google::protobuf::Struct, PairHash> struct_cache_;

private:
friend class GrpcCallHandlerBase;
friend class GrpcStreamHandlerBase;

uint32_t id_;
std::unordered_map<uint32_t, HttpCallCallback> http_calls_;
std::unordered_map<uint32_t, GrpcSimpleCallCallback> simple_grpc_calls_;
std::unordered_map<uint32_t, std::unique_ptr<GrpcCallHandlerBase>> grpc_calls_;
std::unordered_map<uint32_t, std::unique_ptr<GrpcStreamHandlerBase>> grpc_streams_;
};

// A context unique for each root_id for a use-case (e.g. filter) compiled into module.
Expand All @@ -330,8 +291,55 @@ class RootContext : public ContextBase {
// Called when data arrives on a SharedQueue.
virtual void onQueueReady(uint32_t /* token */) {}

// Low level HTTP/gRPC interface.
virtual void onHttpCallResponse(uint32_t token, std::unique_ptr<WasmData> header_pairs,
std::unique_ptr<WasmData> body,
std::unique_ptr<WasmData> trailer_pairs);
virtual void onGrpcCreateInitialMetadata(uint32_t token);
virtual void onGrpcReceiveInitialMetadata(uint32_t token);
virtual void onGrpcReceiveTrailingMetadata(uint32_t token);
virtual void onGrpcReceive(uint32_t token, std::unique_ptr<WasmData> message);
virtual void onGrpcClose(uint32_t token, GrpcStatus status, std::unique_ptr<WasmData> message);

// Default high level HTTP/gRPC interface. NB: overriding the low level interface will disable this interface.
// Returns false on setup error.
bool httpCall(StringView uri, const HeaderStringPairs& request_headers,
StringView request_body, const HeaderStringPairs& request_trailers,
uint32_t timeout_milliseconds, HttpCallCallback callback);
// NB: the message is the response if status == OK and an error message otherwise.
// Returns false on setup error.
bool grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds, GrpcSimpleCallCallback callback);
template<typename Response> void grpcSimpleCall(StringView service, StringView service_name,
StringView method_name, const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds,
std::function<void(Response&& response)> success_callback,
std::function<void(GrpcStatus status, StringView error_message)> failure_callback) {
auto callback = [success_callback, failure_callback](GrpcStatus status, std::unique_ptr<WasmData> message) {
if (status == GrpcStatus::Ok) {
success_callback(message->proto<Response>());
} else {
failure_callback(status, message->view());
}
};
grpcSimpleCall(service, service_name, method_name, request, timeout_milliseconds, callback);
}
// Returns false on setup error.
bool grpcCallHandler(StringView service, StringView service_name,
StringView method_name, const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler);
// Returns false on setup error.
bool grpcStreamHandler(StringView service, StringView service_name,
StringView method_name, std::unique_ptr<GrpcStreamHandlerBase> handler);

private:
friend class GrpcCallHandlerBase;
friend class GrpcStreamHandlerBase;

const std::string root_id_;
std::unordered_map<uint32_t, HttpCallCallback> http_calls_;
std::unordered_map<uint32_t, GrpcSimpleCallCallback> simple_grpc_calls_;
std::unordered_map<uint32_t, std::unique_ptr<GrpcCallHandlerBase>> grpc_calls_;
std::unordered_map<uint32_t, std::unique_ptr<GrpcStreamHandlerBase>> grpc_streams_;
};

RootContext* getRoot(StringView root_id);
Expand Down Expand Up @@ -393,6 +401,9 @@ class Context : public ContextBase {
RootContext* root_{};
};

// Returns nullptr if the Context no longer exists (i.e. the stream has been destroyed).
Context* getContext(uint32_t context_id);

using RootFactory = std::function<std::unique_ptr<RootContext>(uint32_t id, StringView root_id)>;
using ContextFactory = std::function<std::unique_ptr<Context>(uint32_t id, RootContext* root)>;

Expand Down Expand Up @@ -1321,7 +1332,7 @@ inline void grpcSend(uint32_t token, StringView message, bool end_stream) {
return proxy_grpcSend(token, message.data(), message.size(), end_stream ? 1 : 0);
}

inline bool ContextBase::httpCall(StringView uri, const HeaderStringPairs& request_headers,
inline bool RootContext::httpCall(StringView uri, const HeaderStringPairs& request_headers,
StringView request_body, const HeaderStringPairs& request_trailers,
uint32_t timeout_milliseconds, HttpCallCallback callback) {
auto token = makeHttpCall(uri, request_headers, request_body, request_trailers, timeout_milliseconds);
Expand All @@ -1332,7 +1343,7 @@ inline bool ContextBase::httpCall(StringView uri, const HeaderStringPairs& reque
return false;
}

inline void ContextBase::onHttpCallResponse(uint32_t token, std::unique_ptr<WasmData> header_pairs,
inline void RootContext::onHttpCallResponse(uint32_t token, std::unique_ptr<WasmData> header_pairs,
std::unique_ptr<WasmData> body, std::unique_ptr<WasmData> trailer_pairs) {
auto it = http_calls_.find(token);
if (it != http_calls_.end()) {
Expand All @@ -1341,11 +1352,11 @@ inline void ContextBase::onHttpCallResponse(uint32_t token, std::unique_ptr<Wasm
}
}

inline bool ContextBase::grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
inline bool RootContext::grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds, Context::GrpcSimpleCallCallback callback) {
auto token = grpcCall(service, service_name, method_name, request, timeout_milliseconds);
if (token) {
simple_grpc_calls_[token] = std::move(callback);
asRoot()->simple_grpc_calls_[token] = std::move(callback);
return true;
}
return false;
Expand Down Expand Up @@ -1381,7 +1392,7 @@ inline void GrpcStreamHandlerBase::send(StringView message, bool end_of_stream)
}
}

inline void ContextBase::onGrpcCreateInitialMetadata(uint32_t token) {
inline void RootContext::onGrpcCreateInitialMetadata(uint32_t token) {
{
auto it = grpc_calls_.find(token);
if (it != grpc_calls_.end()) {
Expand All @@ -1398,7 +1409,7 @@ inline void ContextBase::onGrpcCreateInitialMetadata(uint32_t token) {
}
}

inline void ContextBase::onGrpcReceiveInitialMetadata(uint32_t token) {
inline void RootContext::onGrpcReceiveInitialMetadata(uint32_t token) {
{
auto it = grpc_streams_.find(token);
if (it != grpc_streams_.end()) {
Expand All @@ -1408,7 +1419,7 @@ inline void ContextBase::onGrpcReceiveInitialMetadata(uint32_t token) {
}
}

inline void ContextBase::onGrpcReceiveTrailingMetadata(uint32_t token) {
inline void RootContext::onGrpcReceiveTrailingMetadata(uint32_t token) {
{
auto it = grpc_streams_.find(token);
if (it != grpc_streams_.end()) {
Expand All @@ -1418,7 +1429,7 @@ inline void ContextBase::onGrpcReceiveTrailingMetadata(uint32_t token) {
}
}

inline void ContextBase::onGrpcReceive(uint32_t token, std::unique_ptr<WasmData> message) {
inline void RootContext::onGrpcReceive(uint32_t token, std::unique_ptr<WasmData> message) {
{
auto it = simple_grpc_calls_.find(token);
if (it != simple_grpc_calls_.end()) {
Expand Down Expand Up @@ -1459,7 +1470,7 @@ inline void GrpcStreamHandlerBase::doRemoteClose(GrpcStatus status, std::unique_
}
}

inline void ContextBase::onGrpcClose(uint32_t token, GrpcStatus status, std::unique_ptr<WasmData> message) {
inline void RootContext::onGrpcClose(uint32_t token, GrpcStatus status, std::unique_ptr<WasmData> message) {
{
auto it = simple_grpc_calls_.find(token);
if (it != simple_grpc_calls_.end()) {
Expand All @@ -1485,25 +1496,32 @@ inline void ContextBase::onGrpcClose(uint32_t token, GrpcStatus status, std::uni
}
}

inline bool ContextBase::grpcCallHandler(StringView service, StringView service_name,
inline bool RootContext::grpcCallHandler(StringView service, StringView service_name,
StringView method_name, const google::protobuf::MessageLite &request, uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler) {
auto token = grpcCall(service, service_name, method_name, request, timeout_milliseconds);
if (token) {
handler->token_ = token;
handler->context_ = this;
grpc_calls_[token] = std::move(handler);
return true;
}
return false;
}

inline bool ContextBase::grpcStreamHandler(StringView service, StringView service_name,
inline bool RootContext::grpcStreamHandler(StringView service, StringView service_name,
StringView method_name, std::unique_ptr<GrpcStreamHandlerBase> handler) {
auto token = grpcStream(service, service_name, method_name);
if (token) {
handler->token_ = token;
handler->context_ = this;
grpc_streams_[token] = std::move(handler);
return true;
}
return false;
}

inline bool ContextBase::setEffectiveContext() {
return proxy_setEffectiveContext(id_) == 0;
}

14 changes: 7 additions & 7 deletions api/wasm/cpp/proxy_wasm_intrinsics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ static ContextBase* getContextBase(uint32_t context_id) {
return it->second.get();
}

static Context* getContext(uint32_t context_id) {
Context* getContext(uint32_t context_id) {
auto it = context_map.find(context_id);
if (it == context_map.end() || !it->second->asContext()) {
return nullptr;
Expand Down Expand Up @@ -164,33 +164,33 @@ extern "C" EMSCRIPTEN_KEEPALIVE void
proxy_onHttpCallResponse(uint32_t context_id, uint32_t token, uint32_t header_pairs_ptr,
uint32_t header_pairs_size, uint32_t body_ptr, uint32_t body_size,
uint32_t trailer_pairs_ptr, uint32_t trailer_pairs_size) {
getContextBase(context_id)->onHttpCallResponse(
getRootContext(context_id)->onHttpCallResponse(
token,
std::make_unique<WasmData>(reinterpret_cast<char*>(header_pairs_ptr), header_pairs_size),
std::make_unique<WasmData>(reinterpret_cast<char*>(body_ptr), body_size),
std::make_unique<WasmData>(reinterpret_cast<char*>(trailer_pairs_ptr), trailer_pairs_size));
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onGrpcCreateInitialMetadata(uint32_t context_id, uint32_t token) {
getContextBase(context_id)->onGrpcCreateInitialMetadata(token);
getRootContext(context_id)->onGrpcCreateInitialMetadata(token);
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onGrpcReceiveInitialMetadata(uint32_t context_id, uint32_t token) {
getContextBase(context_id)->onGrpcReceiveInitialMetadata(token);
getRootContext(context_id)->onGrpcReceiveInitialMetadata(token);
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onGrpcReceiveTrailingMetadata(uint32_t context_id, uint32_t token) {
getContextBase(context_id)->onGrpcReceiveTrailingMetadata(token);
getRootContext(context_id)->onGrpcReceiveTrailingMetadata(token);
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onGrpcReceive(uint32_t context_id, uint32_t token,
uint32_t response_ptr, uint32_t response_size) {
getContextBase(context_id)->onGrpcReceive(token, std::make_unique<WasmData>(reinterpret_cast<char*>(response_ptr), response_size));
getRootContext(context_id)->onGrpcReceive(token, std::make_unique<WasmData>(reinterpret_cast<char*>(response_ptr), response_size));
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onGrpcClose(uint32_t context_id, uint32_t token,
uint32_t status_code, uint32_t status_message_ptr, uint32_t status_message_size) {
getContextBase(context_id)->onGrpcClose(token, static_cast<GrpcStatus>(status_code), std::make_unique<WasmData>(reinterpret_cast<char*>(status_message_ptr), status_message_size));
getRootContext(context_id)->onGrpcClose(token, static_cast<GrpcStatus>(status_code), std::make_unique<WasmData>(reinterpret_cast<char*>(status_message_ptr), status_message_size));
}

extern "C" EMSCRIPTEN_KEEPALIVE void proxy_onQueueReady(uint32_t context_id, uint32_t token) {
Expand Down
1 change: 1 addition & 0 deletions api/wasm/cpp/proxy_wasm_intrinsics.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ mergeInto(LibraryManager.library, {
proxy_grpcClose : function () {},
proxy_grpcCancel : function () {},
proxy_sendLocalResponse : function () {},
proxy_setEffectiveContext : function () {},
});
Binary file modified examples/wasm/envoy_filter_http_wasm_example.wasm
Binary file not shown.
Loading

0 comments on commit 89200de

Please sign in to comment.