Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preparatory commits from #801 #836

Merged
merged 6 commits into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions oak/server/grpc_client_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ namespace {
void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
} // namespace

GrpcClientNode::GrpcClientNode(BaseRuntime* runtime, const std::string& name,
GrpcClientNode::GrpcClientNode(BaseRuntime* runtime, const std::string& name, NodeId node_id,
const std::string& grpc_address)
: NodeThread(runtime, name),
: NodeThread(runtime, name, node_id),
channel_(grpc::CreateChannel(grpc_address, grpc::InsecureChannelCredentials())),
stub_(new grpc::GenericStub(channel_)) {
OAK_LOG(INFO) << "Created gRPC client node for " << grpc_address;
Expand Down Expand Up @@ -79,6 +79,8 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
cq.Next(&got_tag, &ok);
if (!ok) {
OAK_LOG(ERROR) << "Failed to start gRPC method call";
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to start method call"));
return false;
}

Expand All @@ -89,11 +91,15 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
cq.Next(&got_tag, &ok);
if (!ok) {
OAK_LOG(ERROR) << "Failed to send gRPC request";
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to write request"));
return false;
}
call->WritesDone(tag(3));
cq.Next(&got_tag, &ok);
if (!ok) {
FailInvocation(invocation->rsp_handle.get(),
grpc::Status(grpc::StatusCode::INTERNAL, "failed to close request"));
OAK_LOG(ERROR) << "Failed to close gRPC request";
return false;
}
Expand Down Expand Up @@ -138,6 +144,8 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
}
if (!status.ok()) {
// Final status includes an error, so pass it back on the response channel.
FailInvocation(invocation->rsp_handle.get(), status);

oak::encap::GrpcResponse grpc_rsp;
grpc_rsp.set_last(true);
grpc_rsp.mutable_status()->set_code(status.error_code());
Expand All @@ -158,6 +166,22 @@ bool GrpcClientNode::HandleInvocation(Handle invocation_handle) {
return true;
}

void GrpcClientNode::FailInvocation(Handle rsp_handle, grpc::Status status) {
oak::encap::GrpcResponse grpc_rsp;
grpc_rsp.set_last(true);
grpc_rsp.mutable_status()->set_code(status.error_code());
grpc_rsp.mutable_status()->set_message(status.error_message());

auto rsp_msg = absl::make_unique<NodeMessage>();
size_t serialized_size = grpc_rsp.ByteSizeLong();
rsp_msg->data.resize(serialized_size);
grpc_rsp.SerializeToArray(rsp_msg->data.data(), rsp_msg->data.size());

OAK_LOG(INFO) << "Write final gRPC status of (" << status.error_code() << ", '"
<< status.error_message() << "') to response channel";
ChannelWrite(rsp_handle, std::move(rsp_msg));
}

void GrpcClientNode::Run(Handle invocation_handle) {
std::vector<std::unique_ptr<ChannelStatus>> channel_status;
channel_status.push_back(absl::make_unique<ChannelStatus>(invocation_handle));
Expand Down
4 changes: 3 additions & 1 deletion oak/server/grpc_client_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ namespace oak {

class GrpcClientNode final : public NodeThread {
public:
GrpcClientNode(BaseRuntime* runtime, const std::string& name, const std::string& grpc_address);
GrpcClientNode(BaseRuntime* runtime, const std::string& name, NodeId node_id,
const std::string& grpc_address);

private:
bool HandleInvocation(Handle invocation_handle);
void FailInvocation(Handle rsp_handle, grpc::Status status);
void Run(Handle handle) override;

std::shared_ptr<grpc::ChannelInterface> channel_;
Expand Down
19 changes: 15 additions & 4 deletions oak/server/loader/oak_runner_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

#include <csignal>
#include <sstream>
#include <string>

Expand All @@ -30,6 +31,12 @@ ABSL_FLAG(std::string, ca_cert, "", "Path to the PEM-encoded CA root certificate
ABSL_FLAG(std::string, private_key, "", "Path to the private key");
ABSL_FLAG(std::string, cert_chain, "", "Path to the PEM-encoded certificate chain");

namespace {

absl::Notification server_done;

void sigint_handler(int) { server_done.Notify(); }

std::shared_ptr<grpc::ServerCredentials> BuildTlsCredentials(std::string pem_root_certs,
std::string private_key,
std::string cert_chain) {
Expand All @@ -40,10 +47,12 @@ std::shared_ptr<grpc::ServerCredentials> BuildTlsCredentials(std::string pem_roo
return grpc::SslServerCredentials(options);
}

} // namespace

int main(int argc, char* argv[]) {
absl::ParseCommandLine(argc, argv);

// Create loader instance.
// Create the loader instance.
std::unique_ptr<oak::OakLoader> loader = absl::make_unique<oak::OakLoader>();

// Load application configuration.
Expand Down Expand Up @@ -71,14 +80,16 @@ int main(int argc, char* argv[]) {
OAK_LOG(ERROR) << "Failed to create application";
return 1;
}

std::stringstream address;
address << "0.0.0.0:" << application_config->grpc_port();
OAK_LOG(INFO) << "Oak Application: " << address.str();

// Wait (same as `sleep(86400)`).
absl::Notification server_timeout;
server_timeout.WaitForNotificationWithTimeout(absl::Hours(24));
// Wait until notification of signal termination.
std::signal(SIGINT, sigint_handler);
server_done.WaitForNotification();

OAK_LOG(ERROR) << "Terminate Oak Application";
loader->TerminateApplication();

return 0;
Expand Down
3 changes: 2 additions & 1 deletion oak/server/logging_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace oak {
// Pseudo-node to perform logging.
class LoggingNode final : public NodeThread {
public:
explicit LoggingNode(BaseRuntime* runtime, const std::string& name) : NodeThread(runtime, name) {}
explicit LoggingNode(BaseRuntime* runtime, const std::string& name, NodeId node_id)
: NodeThread(runtime, name, node_id) {}

private:
void Run(Handle handle) override;
Expand Down
3 changes: 2 additions & 1 deletion oak/server/node_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ namespace oak {
class NodeThread : public OakNode {
public:
// Construct a thread, identified by the given name in diagnostic messages.
NodeThread(BaseRuntime* runtime, const std::string& name) : OakNode(runtime, name) {}
NodeThread(BaseRuntime* runtime, const std::string& name, NodeId node_id)
: OakNode(runtime, name, node_id) {}
virtual ~NodeThread();

// Start kicks off a separate thread that invokes the Run() method.
Expand Down
4 changes: 2 additions & 2 deletions oak/server/oak_grpc_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
namespace oak {

std::unique_ptr<OakGrpcNode> OakGrpcNode::Create(
BaseRuntime* runtime, const std::string& name,
BaseRuntime* runtime, const std::string& name, NodeId node_id,
std::shared_ptr<grpc::ServerCredentials> grpc_credentials, const uint16_t port) {
std::unique_ptr<OakGrpcNode> node = absl::WrapUnique(new OakGrpcNode(runtime, name));
std::unique_ptr<OakGrpcNode> node = absl::WrapUnique(new OakGrpcNode(runtime, name, node_id));

// Build Server
grpc::ServerBuilder builder;
Expand Down
6 changes: 3 additions & 3 deletions oak/server/oak_grpc_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class OakGrpcNode final : public OakNode {
// Create an Oak node with the `name` and gRPC `port`.
// If `port` equals 0, then gRPC port is assigned automatically.
static std::unique_ptr<OakGrpcNode> Create(
BaseRuntime* runtime, const std::string& name,
BaseRuntime* runtime, const std::string& name, NodeId node_id,
std::shared_ptr<grpc::ServerCredentials> grpc_credentials, const uint16_t port = 0);
virtual ~OakGrpcNode(){};

Expand All @@ -48,8 +48,8 @@ class OakGrpcNode final : public OakNode {
private:
friend class ModuleInvocation;

OakGrpcNode(BaseRuntime* runtime, const std::string& name)
: OakNode(runtime, name), next_stream_id_(1), handle_(kInvalidHandle) {}
OakGrpcNode(BaseRuntime* runtime, const std::string& name, NodeId node_id)
: OakNode(runtime, name, node_id), next_stream_id_(1), handle_(kInvalidHandle) {}
OakGrpcNode(const OakGrpcNode&) = delete;
OakGrpcNode& operator=(const OakGrpcNode&) = delete;

Expand Down
9 changes: 7 additions & 2 deletions oak/server/oak_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#ifndef OAK_SERVER_NODE_H_
#define OAK_SERVER_NODE_H_

#include <stdint.h>

#include <atomic>
#include <memory>
#include <random>
Expand Down Expand Up @@ -57,10 +59,12 @@ struct NodeReadResult {
std::unique_ptr<NodeMessage> msg;
};

using NodeId = uint64_t;

class OakNode {
public:
OakNode(BaseRuntime* runtime, const std::string& name)
: name_(name), runtime_(runtime), prng_engine_() {}
OakNode(BaseRuntime* runtime, const std::string& name, NodeId node_id)
: name_(name), node_id_(node_id), runtime_(runtime), prng_engine_() {}
virtual ~OakNode() {}

virtual void Start(Handle handle) = 0;
Expand Down Expand Up @@ -98,6 +102,7 @@ class OakNode {
void ClearHandles() LOCKS_EXCLUDED(mu_);

const std::string name_;
const NodeId node_id_;

private:
// Allow the Runtime to use internal methods.
Expand Down
23 changes: 14 additions & 9 deletions oak/server/oak_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,20 @@ grpc::Status OakRuntime::Initialize(const ApplicationConfiguration& config,
}

// Create a gRPC pseudo-Node.
NodeId grpc_node_id = NextNodeId();
const std::string grpc_name = kGrpcNodeName;
const uint16_t grpc_port = config.grpc_port();
OAK_LOG(INFO) << "Create gRPC pseudo-Node named {" << grpc_name << "}";
std::unique_ptr<OakGrpcNode> grpc_node =
OakGrpcNode::Create(this, grpc_name, grpc_credentials, grpc_port);
OakGrpcNode::Create(this, grpc_name, grpc_node_id, grpc_credentials, grpc_port);
grpc_node_ = grpc_node.get(); // borrowed copy
nodes_[grpc_name] = std::move(grpc_node);

// Create the initial Application Node.
NodeId app_node_id = NextNodeId();
std::string node_name;
app_node_ =
CreateNode(config.initial_node_config_name(), config.initial_entrypoint_name(), &node_name);
app_node_ = CreateNode(config.initial_node_config_name(), config.initial_entrypoint_name(),
app_node_id, &node_name);
if (app_node_ == nullptr) {
return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, "Failed to create initial Oak Node");
}
Expand All @@ -105,28 +107,30 @@ std::string OakRuntime::NextNodeName(const std::string& config_name,
return absl::StrCat(config_name, "-", index, "-", entrypoint_name);
}

NodeId OakRuntime::NextNodeId() { return next_node_id_++; }

// Create (but don't start) a new Node instance. Return a borrowed pointer to
// the new Node (or nullptr on failure).
OakNode* OakRuntime::CreateNode(const std::string& config_name, const std::string& entrypoint_name,
std::string* node_name) {
NodeId node_id, std::string* node_name) {
std::string name = NextNodeName(config_name, entrypoint_name);
std::unique_ptr<OakNode> node;

if (wasm_config_.count(config_name) > 0) {
OAK_LOG(INFO) << "Create Wasm node named {" << name << "}";
const WebAssemblyConfiguration* wasm_cfg = wasm_config_[config_name].get();
node = WasmNode::Create(this, name, wasm_cfg->module_bytes(), entrypoint_name);
node = WasmNode::Create(this, name, node_id, wasm_cfg->module_bytes(), entrypoint_name);
} else if (log_config_.count(config_name) > 0) {
OAK_LOG(INFO) << "Create log node named {" << name << "}";
node = absl::make_unique<LoggingNode>(this, name);
node = absl::make_unique<LoggingNode>(this, name, node_id);
} else if (storage_config_.count(config_name) > 0) {
std::string address = *(storage_config_[config_name].get());
OAK_LOG(INFO) << "Create storage proxy node named {" << name << "} connecting to " << address;
node = absl::make_unique<StorageNode>(this, name, address);
node = absl::make_unique<StorageNode>(this, name, node_id, address);
} else if (grpc_client_config_.count(config_name) > 0) {
std::string address = *(grpc_client_config_[config_name].get());
OAK_LOG(INFO) << "Create gRPC client node named {" << name << "} connecting to " << address;
node = absl::make_unique<GrpcClientNode>(this, name, address);
node = absl::make_unique<GrpcClientNode>(this, name, node_id, address);
} else {
OAK_LOG(ERROR) << "failed to find config with name " << config_name;
return nullptr;
Expand All @@ -151,7 +155,8 @@ bool OakRuntime::CreateAndRunNode(const std::string& config_name,
}

absl::MutexLock lock(&mu_);
OakNode* node = CreateNode(config_name, entrypoint_name, node_name);
NodeId node_id = NextNodeId();
OakNode* node = CreateNode(config_name, entrypoint_name, node_id, node_name);
if (node == nullptr) {
return false;
}
Expand Down
13 changes: 8 additions & 5 deletions oak/server/oak_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class OakRuntime : public BaseRuntime {
grpc_handle_(kInvalidHandle),
app_node_(nullptr),
app_handle_(kInvalidHandle),
next_node_id_(0),
termination_pending_(false) {}
virtual ~OakRuntime() = default;

Expand All @@ -67,9 +68,10 @@ class OakRuntime : public BaseRuntime {

std::string NextNodeName(const std::string& config_name, const std::string& entrypoint_name)
EXCLUSIVE_LOCKS_REQUIRED(mu_);
NodeId NextNodeId() EXCLUSIVE_LOCKS_REQUIRED(mu_);

OakNode* CreateNode(const std::string& config_name, const std::string& entrypoint_name,
std::string* node_name) EXCLUSIVE_LOCKS_REQUIRED(mu_);
NodeId node_id, std::string* node_name) EXCLUSIVE_LOCKS_REQUIRED(mu_);

// Information derived from ApplicationConfiguration; const after Initialize() called:

Expand All @@ -93,16 +95,17 @@ class OakRuntime : public BaseRuntime {
// to the initial Application Wasm Node.
Handle app_handle_;

// Next index for node name generation.
mutable absl::Mutex mu_; // protects nodes_, next_index_;
// Next indexes for node name/ID generation.
mutable absl::Mutex mu_; // protects nodes_, next_index_, next_node_id_;
std::map<std::string, int> next_index_ GUARDED_BY(mu_);

std::atomic_bool termination_pending_;
NodeId next_node_id_ GUARDED_BY(mu_);

// Collection of running Nodes indexed by Node name. Note that Node name is
// unique but is not visible to the running Application in any way.
std::map<std::string, std::unique_ptr<OakNode>> nodes_ GUARDED_BY(mu_);

std::atomic_bool termination_pending_;

}; // class OakRuntime

} // namespace oak
Expand Down
2 changes: 1 addition & 1 deletion oak/server/rust/oak_runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ log = { version = "*" }
oak_abi = "=0.1.0"
prost = "*"
prost-types = "*"
rand = { version = "*", default-features = false, features = ["alloc"] }
rand = "*"
tokio = { version = "*", features = ["rt-core"] }
wasmi = { version = "*", default-features = false, features = ["core"] }

Expand Down
Loading