diff --git a/Cross.toml b/Cross.toml index 3fa78fc5e..dae228924 100644 --- a/Cross.toml +++ b/Cross.toml @@ -12,4 +12,4 @@ image = "ghcr.io/deislabs/akri/rust-crossbuild:x86_64-unknown-linux-gnu-0.1.16-0 image = "ghcr.io/deislabs/akri/rust-crossbuild:armv7-unknown-linux-gnueabihf-0.1.16-0.0.7" [target.aarch64-unknown-linux-gnu] -image = "ghcr.io/deislabs/akri/rust-crossbuild:aarch64-unknown-linux-gnu-0.1.16-0.0.7" \ No newline at end of file +image = "ghcr.io/deislabs/akri/rust-crossbuild:aarch64-unknown-linux-gnu-0.1.16-0.0.7" diff --git a/agent/src/protocols/http/discovery_handler.rs b/agent/src/protocols/http/discovery_handler.rs new file mode 100644 index 000000000..892ee6241 --- /dev/null +++ b/agent/src/protocols/http/discovery_handler.rs @@ -0,0 +1,79 @@ +use super::super::{DiscoveryHandler, DiscoveryResult}; + +use akri_shared::akri::configuration::HTTPDiscoveryHandlerConfig; +use async_trait::async_trait; +use failure::Error; +use reqwest::get; +use std::collections::HashMap; + +const BROKER_NAME: &str = "AKRI_HTTP"; +const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT"; + +pub struct HTTPDiscoveryHandler { + discovery_handler_config: HTTPDiscoveryHandlerConfig, +} +impl HTTPDiscoveryHandler { + pub fn new(discovery_handler_config: &HTTPDiscoveryHandlerConfig) -> Self { + trace!("[http:new] Entered"); + HTTPDiscoveryHandler { + discovery_handler_config: discovery_handler_config.clone(), + } + } +} +#[async_trait] + +impl DiscoveryHandler for HTTPDiscoveryHandler { + async fn discover(&self) -> Result, failure::Error> { + trace!("[http:discover] Entered"); + + let url = self.discovery_handler_config.discovery_endpoint.clone(); + trace!("[http:discover] url: {}", &url); + + match get(&url).await { + Ok(resp) => { + trace!( + "[http:discover] Connected to discovery endpoint: {:?} => {:?}", + &url, + &resp + ); + + // Reponse is a newline separated list of devices (host:port) or empty + let device_list = &resp.text().await?; + + let result = device_list + .lines() + .map(|endpoint| { + trace!("[http:discover:map] Creating DiscoverResult: {}", endpoint); + trace!( + "[http:discover] props.inserting: {}, {}", + BROKER_NAME, + DEVICE_ENDPOINT, + ); + let mut props = HashMap::new(); + props.insert(BROKER_NAME.to_string(), "http".to_string()); + props.insert(DEVICE_ENDPOINT.to_string(), endpoint.to_string()); + DiscoveryResult::new(endpoint, props, true) + }) + .collect::>(); + trace!("[protocol:http] Result: {:?}", &result); + Ok(result) + } + Err(err) => { + trace!( + "[http:discover] Failed to connect to discovery endpoint: {}", + &url + ); + trace!("[http:discover] Error: {}", err); + + Err(failure::format_err!( + "Failed to connect to discovery endpoint results: {:?}", + err + )) + } + } + } + fn are_shared(&self) -> Result { + trace!("[http:are_shared] Entered"); + Ok(true) + } +} diff --git a/agent/src/protocols/http/mod.rs b/agent/src/protocols/http/mod.rs new file mode 100644 index 000000000..090496fd3 --- /dev/null +++ b/agent/src/protocols/http/mod.rs @@ -0,0 +1,2 @@ +mod discovery_handler; +pub use self::discovery_handler::HTTPDiscoveryHandler; diff --git a/docs/extensibility-http-grpc.md b/docs/extensibility-http-grpc.md new file mode 100644 index 000000000..aa192ca0f --- /dev/null +++ b/docs/extensibility-http-grpc.md @@ -0,0 +1,463 @@ +# Deeper dive into HTTP-based Device brokers + +3 different broker implementations have been created for the HTTP protocol in the http-extensibility branch, 2 in Rust and 1 in Go: +* The standalone broker is a self-contained scenario that demonstrates the ability to interact with HTTP-based devices by `curl`ing a device's endpoints. This type of solution would be applicable in batch-like scenarios where the broker performs a predictable set of processing steps for a device. +* The second scenario uses gRPC. gRPC is an increasingly common alternative to REST-like APIs and supports high-throughput and streaming methods. gRPC is not a requirement for broker implements in Akri but is used here as one of many mechanisms that may be used. The gRPC-based broker has a companion client. This is a more realistic scenario in which the broker proxies client requests using gRPC to HTTP-based devices. The advantage of this approach is that device functionality is encapsulated by an API that is exposed by the broker. In this case the API has a single method but in practice, there could be many methods implemented. +* The third implemnentation is a gRPC-based broker and companion client implemented in Golang. This is functionally equivalent to the Rust implementation and shares a protobuf definition. For this reason, you may combine the Rust broker and client with the Golang broker and client arbitrarily. The Golang broker is described in the [`http-apps`](./samples/apps/http-apps/README.md) directory. + +The first option, a standalone broker, is described in docs/extensibility.md. + +The two gRPC brokers are implemented here as well. This document will describe the second option, a Rust gRPC broker. + +Please read docs/extensibility.md before reading this document. This document will not cover [creating and deploying mock HTTP-based Devices](docs/extensibility.md#create-some-http-devices), [how to add the HTTP protocol to Akri](docs/extensibility.md#new-discoveryhandler-implementation), or [how to deploy the updated Akri](docs/extensibility.md#deploy-akri). + +## Creating a Rust gRPC broker (and client) + +First, we need to create a project. We can use `cargo` to create our project by navigating to `samples/brokers` and running `cargo new http`. Once the http project has been created, it can be added to the greater Akri project by adding `"samples/brokers/http"` to the **members** in `./Cargo.toml`. + +The broker implementation can be split into parts: + +1. Accessing the HTTP-based Device data +1. Exposing the data to the cluster + +We also provide a gRPC client implementation that can be used to access the brokered data. + +1. Reading the data in the cluster + +### Accessing the data +To access the HTTP-based Device data, we first need to retrieve any discovery information. Any information stored in the DiscoveryResult properties map will be transferred into the broker container's environment variables. Retrieving them is simply a matter of querying environment variables like this: + +```rust +let device_url = env::var("AKRI_HTTP_DEVICE_ENDPOINT")?; +``` + +For our HTTP-based Device broker, the data can be generated with an http get. In fact, the code we used in `discover` can be adapted for what we need: + +```rust +async fn read_sensor( + &self, + _rqst: Request, +) -> Result, Status> { + match get(&self.device_url).await { + Ok(resp) => { + let body = resp.text().await.unwrap(); + Ok(Response::new(ReadSensorResponse { value: body })) + } + Err(err) => { + Err(Status::new(Code::Unavailable, "device is unavailable")) + } + } +} +``` + +### Exposing the data to the cluster +For a gRPC service, we need to do several things: + +1. Create a proto file describing our gRPC service +1. Create a build file that a gRPC library like Tonic can use +1. Leverage the output of our gRPC library build + +The first step is fairly simple for our Http devices (create this in `samples/brokers/http/proto/http.proto`): + +```proto +syntax = "proto3"; + +option go_package = "github.com/deislabs/akri/http-extensibility/proto"; + +package http; + +service DeviceService { + rpc ReadSensor (ReadSensorRequest) returns (ReadSensorResponse); +} + +message ReadSensorRequest { + string name = 1; +} +message ReadSensorResponse { + string value = 1; +} +``` + +The second step, assuming Tonic (though there are several very good gRPC libraries) is to create `samples/brokers/http/build.rs`: + +```rust +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/http.proto")?; + Ok(()) +} +``` + +With the gRPC implementation created, we can now start utilizing it. Tonic has made this very simple, we can leverage a simple macro like this: + +```rust +pub mod http { + tonic::include_proto!("http"); +} +``` + +We can tie these pieces together in our main and retrieve the endpoint from the environment variables in `samples/brokers/http/src/broker.rs` (notice that we specify broker.rs, as main.rs is used for our standalone broker). Here we use the generated gRPC service code to listen for gRPC requests: + +```rust +pub mod http { + tonic::include_proto!("http"); +} + +use clap::{App, Arg}; +use http::{ + device_service_server::{DeviceService, DeviceServiceServer}, + ReadSensorRequest, ReadSensorResponse, +}; +use reqwest::get; +use std::env; +use std::net::SocketAddr; +use tonic::{transport::Server, Code, Request, Response, Status}; + +const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT"; + +#[derive(Default)] +pub struct Device { + device_url: String, +} + +#[tonic::async_trait] +impl DeviceService for Device { + async fn read_sensor( + &self, + _rqst: Request, + ) -> Result, Status> { + match get(&self.device_url).await { + Ok(resp) => { + let body = resp.text().await.unwrap(); + println!("[read_sensor] Response body: {:?}", body); + Ok(Response::new(ReadSensorResponse { value: body })) + } + Err(err) => { + Err(Status::new(Code::Unavailable, "device is unavailable")) + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("[main] Entered"); + let matches = App::new("broker") + .arg( + Arg::with_name("grpc_endpoint") + .long("grpc_endpoint") + .value_name("ENDPOINT") + .help("Endpoint address that the gRPC server will listen on.") + .required(true), + ) + .get_matches(); + let grpc_endpoint = matches.value_of("grpc_endpoint").unwrap(); + let addr: SocketAddr = grpc_endpoint.parse().unwrap(); + let device_url = env::var(DEVICE_ENDPOINT)?; + println!("[main] gRPC service proxying: {}", device_url); + let device_service = Device { device_url }; + let service = DeviceServiceServer::new(device_service); + + Server::builder() + .add_service(service) + .serve(addr) + .await + .expect("unable to start http-prtocol gRPC server"); + + Ok(()) +} +``` + +To ensure that the broker builds, update `samples/brokers/http/Cargo.toml` with the broker `[[bin]]` and dependencies: + +```toml +[[bin]] +name = "broker" +path = "src/grpc/broker.rs" + +[dependencies] +clap = "2.33.3" +futures = "0.3" +futures-util = "0.3" +prost = "0.6" +reqwest = "0.10.8" +tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } +tonic = "0.1" + +[build-dependencies] +tonic-build = "0.1.1" +``` + +### Reading the data in the cluster + +The steps to generate a gRPC client are very similar to creating a broker. We will start here, with the assumption that a broker has been created and leverage the directory structure and files that have already been created. + +Having already created out gRPC implementation, we can now start using it with the Tonic macros: + +```rust +pub mod http { + tonic::include_proto!("http"); +} +``` + +This provides an easy way to query our HTTP-based Device gRPC in `samples/brokers/http/src/client.rs` (notice, again, that we use client.rs rather than main.rs or broker.rs). Here we create a simlpe loop that calls into the generated gRPC client code to read our HTTP-based Device data: + +```rust +pub mod http { + tonic::include_proto!("http"); +} + +use clap::{App, Arg}; +use http::{device_service_client::DeviceServiceClient, ReadSensorRequest}; +use tokio::{time, time::Duration}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let matches = App::new("client") + .arg( + Arg::with_name("grpc_endpoint") + .long("grpc_endpoint") + .value_name("ENDPOINT") + .help("Endpoint address of the gRPC server.") + .required(true), + ) + .get_matches(); + let grpc_endpoint = matches.value_of("grpc_endpoint").unwrap(); + let endpoint = format!("http://{}", grpc_endpoint); + let mut client = DeviceServiceClient::connect(endpoint).await?; + + loop { + let rqst = tonic::Request::new(ReadSensorRequest { + name: "/".to_string(), + }); + println!("[main:loop] Calling read_sensor"); + let resp = client.read_sensor(rqst).await?; + println!("[main:loop] Response: {:?}", resp); + time::delay_for(Duration::from_secs(10)).await; + } + Ok(()) +} +``` + +To ensure that our client builds, we have update `samples/brokers/http/Cargo.toml` with the client `[[bin]]`: + +```toml +[[bin]] +name = "broker" +path = "src/grpc/broker.rs" + +[[bin]] +name = "client" +path = "src/grpc/client.rs" + +[dependencies] +clap = "2.33.3" +futures = "0.3" +futures-util = "0.3" +prost = "0.6" +reqwest = "0.10.8" +tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } +tonic = "0.1" + +[build-dependencies] +tonic-build = "0.1.1" +``` + +## Build and Deploy gRPC broker and client + +To build the broker and client, we create simple Dockerfiles + +`samples/brokers/http/Dockerfiles/grpc.broker` +```dockerfile +FROM amd64/rust:1.47 as build +RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu +RUN USER=root cargo new --bin http +WORKDIR /http +COPY ./samples/brokers/http/Cargo.toml ./Cargo.toml +RUN cargo build \ + --bin=broker \ + --release +RUN rm ./src/*.rs +RUN rm ./target/release/deps/http* +COPY ./samples/brokers/http . +RUN cargo build \ + --bin=broker \ + --release +FROM amd64/debian:buster-slim +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + libssl-dev \ + openssl && \ + apt-get clean +COPY --from=build /http/target/release/broker /broker +LABEL org.opencontainers.image.source https://github.com/deislabs/akri +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs +ENV RUST_LOG broker +ENTRYPOINT ["/broker"] +``` + +`samples/brokers/http/Dockerfiles/grpc.client` +```dockerfile +FROM amd64/rust:1.47 as build +RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu +RUN USER=root cargo new --bin http +WORKDIR /http +COPY ./samples/brokers/http/Cargo.toml ./Cargo.toml +RUN cargo build \ + --bin=client \ + --release +RUN rm ./src/*.rs +RUN rm ./target/release/deps/http* +COPY ./samples/brokers/http . +RUN cargo build \ + --bin=client \ + --release +FROM amd64/debian:buster-slim +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + libssl-dev \ + openssl && \ + apt-get clean +COPY --from=build /http/target/release/client /client +LABEL org.opencontainers.image.source https://github.com/deislabs/akri +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs +ENV RUST_LOG client +ENTRYPOINT ["/client"] +``` + +We can build the containers using `docker build` and make them available to our cluster with `docker push`: +```bash +HOST="ghcr.io" +USER=[[GITHUB-USER]] +BROKER="http-broker" +TAGS="v1" + +for APP in "broker" "client" +do + docker build \ + --tag=${HOST}/${USER}/${REPO}-grpc-${APP}:${TAGS} \ + --file=./samples/brokers/http/Dockerfiles/grpc.${APP} \ + . && \ + docker push ${HOST}/${USER}/${REPO}-grpc-${APP}:${TAGS} +done +``` + +Now we can deploy the gRPC-enabled broker using an Akri Configuration, `samples/brokers/http/kubernetes/http.grpc.broker.yaml` (being sure to update **image** according to the last steps): + +```yaml +apiVersion: akri.sh/v0 +kind: Configuration +metadata: + name: http-grpc-broker-rust +spec: + protocol: + http: + discoveryEndpoint: http://discovery:8080/discovery + capacity: 1 + brokerPodSpec: + imagePullSecrets: # GitHub Container Registry secret + - name: SECRET + containers: + - name: http-grpc-broker-rust + image: IMAGE + args: + - --grpc_endpoint=0.0.0.0:50051 + resources: + limits: + "{{PLACEHOLDER}}": "1" + instanceServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 + configurationServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 +``` + +With this Akri Configuration, we can use `kubectl` to update the cluster: + +```bash +kubectl apply --filename=./kubernetes/http.grpc.broker.yaml +``` + +Assuming that you have [created and deployed mock HTTP-based Devices](docs/extensibility.md#create-some-http-devices), you can query the broker's logs and should see the gRPC starting and then pending: + +```bash +kubectl logs pod/akri-http-...-pod +[main] Entered +[main] gRPC service proxying: http://device-7:8080 +``` + +> Optional: you can test the gRPC service using [`grpcurl`](https://github.com/fullstorydev/grpcurl/releases) +> +> ```bash +> BROKER=$( kubectl get service/http-svc --output=jsonpath="{.spec.clusterIP}") +> +> ./grpcurl \ +> --plaintext \ +> -proto ./http.proto \ +> ${BROKER}:50051 \ +> http.DeviceService.ReadSensor +> { +> "value": "0.4871220658001621" +> } +> ``` +> +> This uses the `configurationServiceSepc` service name (`http-svc`) which randomly picks one of the HTTP brokers and it uses the service's ClusterIP because the cluster DNS is inaccessible to `grpcurl`. + +The gRPC client can be deployed as any Kubernetes workload. For our example, we create a Deployment, `samples/brokers/http/kubernetes/http.grpc.client.yaml` (updating **image** according to the previous `docker push` commands): +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: http-grpc-client-rust +spec: + replicas: 1 + selector: + matchLabels: + id: akri-http-client-rust + template: + metadata: + labels: + id: akri-http-client-rust + name: http-grpc-client-rust + spec: + imagePullSecrets: + - name: SECRET + containers: + - name: http-grpc-client-rust + image: IMAGE + args: + - --grpc_endpoint=http-svc:50051 +``` + +You may then deploy the gRPC client: + +```bash +kubectl apply --filename=./kubernetes/http.grpc.client.yaml +``` + +This uses the `configurationServiceSpec` service name (`http-svc`) which randomly picks one of the HTTP brokers. + +You may check the client's logs: + +```bash +kubectl logs deployment/http-grpc-client-rust +``` + +Yielding something of the form: + +```console +[main:loop] Calling read_sensor +[main:loop] Response: Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Wed, 11 Nov 2020 17:46:55 GMT", "grpc-status": "0"} }, message: ReadSensorResponse { value: "0.6088971084079992" } } +[main:loop] Constructing Request +[main:loop] Calling read_sensor +[main:loop] Response: Response { metadata: MetadataMap { headers: {"content-type": "application/grpc", "date": "Wed, 11 Nov 2020 17:47:05 GMT", "grpc-status": "0"} }, message: ReadSensorResponse { value: "0.9686970038897007" } } +``` + diff --git a/samples/apps/http-apps/Dockerfiles/device b/samples/apps/http-apps/Dockerfiles/device new file mode 100644 index 000000000..f9592dfc7 --- /dev/null +++ b/samples/apps/http-apps/Dockerfiles/device @@ -0,0 +1,31 @@ +FROM golang:1.15 as build + +ARG PROJECT="http-extensibility" +ARG MODULE="github.com/deislabs/akri/${PROJECT}" + +WORKDIR /${PROJECT} + +# Copy go.mod first and install dependencies +COPY go.mod . +RUN go mod download + +# Copy all sources +COPY . . + +# Compile Go binary +RUN GOOS=linux \ + go build -a -installsuffix cgo \ + -o /bin/device \ + ${MODULE}/cmd/device + + +FROM gcr.io/distroless/base-debian10 + +COPY --from=build /bin/device / + +USER 999 +EXPOSE 8080 + + +ENTRYPOINT ["/device"] +CMD ["--path=/","--path=/sensor"] diff --git a/samples/apps/http-apps/Dockerfiles/discovery b/samples/apps/http-apps/Dockerfiles/discovery new file mode 100644 index 000000000..89db2120e --- /dev/null +++ b/samples/apps/http-apps/Dockerfiles/discovery @@ -0,0 +1,31 @@ +FROM golang:1.15 as build + +ARG PROJECT="http-extensibility" +ARG MODULE="github.com/deislabs/akri/${PROJECT}" + +WORKDIR /${PROJECT} + +# Copy go.mod first and install dependencies +COPY go.mod . +RUN go mod download + +# Copy all sources +COPY . . + +# Compile Go binary +RUN GOOS=linux \ + go build -a -installsuffix cgo \ + -o /bin/discovery \ + ${MODULE}/cmd/discovery + + +FROM gcr.io/distroless/base-debian10 + +COPY --from=build /bin/discovery / + +USER 999 +EXPOSE 9999 + + +ENTRYPOINT ["/discovery"] +CMD ["--device=device:8000","--device=device:8001"] diff --git a/samples/apps/http-apps/Dockerfiles/grpc.broker b/samples/apps/http-apps/Dockerfiles/grpc.broker new file mode 100644 index 000000000..0719a9b27 --- /dev/null +++ b/samples/apps/http-apps/Dockerfiles/grpc.broker @@ -0,0 +1,48 @@ +FROM golang:1.15 as build + +ARG PROJECT="http-extensibility" +ARG MODULE="github.com/deislabs/akri/${PROJECT}" + +WORKDIR /${PROJECT} + +# Copy go.mod first and install dependencies +COPY go.mod . +RUN go mod download + +# Copy sources +COPY . . + +# Installs protoc and plugins: protoc-gen-go +ARG VERS="3.14.0" +ARG ARCH="linux-x86_64" +ARG NAME="protoc-${VERS}-${ARCH}" +RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v${VERS}/${NAME}.zip --output-document=./${NAME}.zip && \ + apt update && apt install -y unzip && \ + unzip -o ${NAME}.zip -d ${NAME} && \ + mv ${NAME}/bin/* /usr/local/bin && \ + mv ${NAME}/include/* /usr/local/include && \ + go get -u github.com/golang/protobuf/protoc-gen-go + +# Generates the Golang protobuf files +RUN protoc \ + --proto_path=./proto \ + --go_out=plugins=grpc,module=${MODULE}:. \ + ./proto/http.proto + +# Compile Go binary +RUN GOOS=linux \ + go build -a -installsuffix cgo \ + -o /bin/broker \ + ${MODULE}/cmd/grpc/broker + + +FROM gcr.io/distroless/base-debian10 + +COPY --from=build /bin/broker / + +USER 999 + +EXPOSE 50051 + +ENTRYPOINT ["/broker"] +CMD ["--grpc_endpoint=:50051"] diff --git a/samples/apps/http-apps/Dockerfiles/grpc.client b/samples/apps/http-apps/Dockerfiles/grpc.client new file mode 100644 index 000000000..4e4aade71 --- /dev/null +++ b/samples/apps/http-apps/Dockerfiles/grpc.client @@ -0,0 +1,48 @@ +FROM golang:1.15 as build + +ARG PROJECT="http-extensibility" +ARG MODULE="github.com/deislabs/akri/${PROJECT}" + +WORKDIR /${PROJECT} + +# Copy go.mod first and install dependencies +COPY go.mod . +RUN go mod download + +# Copy sources +COPY . . + +# Installs protoc and plugins: protoc-gen-go +ARG VERS="3.14.0" +ARG ARCH="linux-x86_64" +ARG NAME="protoc-${VERS}-${ARCH}" +RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v${VERS}/${NAME}.zip --output-document=./${NAME}.zip && \ + apt update && apt install -y unzip && \ + unzip -o ${NAME}.zip -d ${NAME} && \ + mv ${NAME}/bin/* /usr/local/bin && \ + mv ${NAME}/include/* /usr/local/include && \ + go get -u github.com/golang/protobuf/protoc-gen-go + +# Generates the Golang protobuf files +RUN protoc \ + --proto_path=./proto \ + --go_out=plugins=grpc,module=${MODULE}:. \ + ./proto/http.proto + +# Compile Go binary +RUN GOOS=linux \ + go build -a -installsuffix cgo \ + -o /bin/client \ + ${MODULE}/cmd/grpc/client + + +FROM gcr.io/distroless/base-debian10 + +COPY --from=build /bin/client / + +USER 999 + +EXPOSE 50051 + +ENTRYPOINT ["/client"] +CMD ["--grpc_endpoint=:50051"] diff --git a/samples/apps/http-apps/README.md b/samples/apps/http-apps/README.md new file mode 100644 index 000000000..4bcf156f0 --- /dev/null +++ b/samples/apps/http-apps/README.md @@ -0,0 +1,231 @@ +# HTTP Protocol Sample Device|Discovery apps + +This directory provides implementations of IoT devices and a discovery service that can be used to test the Akri HTTP Protocol Broker. + +This directory includes an alternative gRPC implementation of the Akri HTTP Protocol gRPC Broker and a Client too. + +## Environment + +```bash +export REGISTRY="ghcr.io" +export USER=[[GITHUB-USER]] +export PREFIX="http-apps" +export TAG="v1" +``` + +## Build + +The images are built by GitHub Actions in the repository but, you may also build them yourself using: + +```bash +./build.sh +``` + +This will generate 4 images: + ++ `${PREFIX}-device` ++ `${PREFIX}-discovery` ++ `${PREFIX}-grpc-broker` ++ `${PREFIX}-grpc-client` + +## Device|Discovery Services + +There are two applications: + ++ `device` ++ `discovery` + +### Docker + +You may run the images standalone: + +```bash +# Create devices on ports 8000:8009 +DISCOVERY=() +for PORT in {8000..8009} +do + # Create the device on ${PORT} + # For Docker only: name each device: device-${PORT} + docker run \ + --rm --detach=true \ + --name=device-${PORT} \ + --publish=${PORT}:8080 \ + ${REGISTRY}/${USER}/${PREFIX}-device:${TAG} \ + --path="/" + # Add the device to the discovery document + DISCOVERY+=("--device=http://localhost:${PORT} ") +done + +# Create a discovery server for these devices +docker run \ + --rm --detach=true \ + --name=discovery \ + --publish=9999:9999 \ + ${REGISTRY}/${USER}/${PREFIX}-discovery:${TAG} ${DISCOVERY[@]} +``` + +Test: + +```bash +curl http://localhost:9999/ +http://localhost:8000 +http://localhost:8001 +http://localhost:8002 +http://localhost:8003 +http://localhost:8004 +http://localhost:8005 +http://localhost:8006 +http://localhost:8007 +http://localhost:8008 +http://localhost:8009 + +curl http://localhost:8006/sensor +``` + +To stop: + +```bash +# Delete devices on ports 8000:8009 +for PORT in {8000..8009} +do + docker stop device-${PORT} +done + +# Delete discovery server +docker stop discovery +``` + +### Kubernetes + +And most useful on Kubernetes because one (!) or more devices can be created and then discovery can be created with correct DNS names. + +Ensure the `image` references are updated in `./kubernetes/device.yaml` and `./kubernetes/discovery.yaml` + +```bash +for APP in "device" "discovery" +do + IMAGE="$(docker inspect --format='{{index .RepoDigests 0}}' ${REGISTRY}/${USER}/${PREFIX}-${APP}:${TAG})" + sed \ + --in-place \ + "s|IMAGE|${IMAGE}|g" + ./kubernetes/${APP}.yaml +done +``` + +Then: + +```bash + +# Create one device deployment +kubectl apply --filename=./device.yaml + +# But multiple Services against the single Pod +for NUM in {1..9} +do + # Services are uniquely named + # The service uses the Pods port: 8080 + kubectl expose deployment/device \ + --name=device-${NUM} \ + --port=8080 \ + --target-port=8080 +done +service/device-1 exposed +service/device-2 exposed +service/device-3 exposed +service/device-4 exposed +service/device-5 exposed +service/device-6 exposed +service/device-7 exposed +service/device-8 exposed +service/device-9 exposed + +# Create one discovery deployment +kubectl apply --filename=./discovery.yaml + +# Expose Discovery as a service on its default port: 9999 +# The Discovery service spec is statically configured for devices 1-9 +kubectl expose deployment/discovery \ +--name=discovery \ +--port=9999 \ +--target-port=9999 + +kubectl run curl --image=radial/busyboxplus:curl --stdin --tty --rm +curl http://discovery:9999 +http://device-1:8080 +http://device-2:8080 +http://device-3:8080 +http://device-4:8080 +http://device-5:8080 +http://device-6:8080 +http://device-7:8080 +http://device-8:8080 +http://device-9:8080 +``` + +Delete: + +```bash +kubectl delete deployment/discovery +kubectl delete deployment/device + +kubectl delete service/discovery + +for NUM in {1..9} +do + kubectl delete service/device-${NUM} +done +``` + +## gRPC Broker|Client + +This is a Golang implementation of the Broker gRPC server and client. It is an alternative implementation to the Rust gRPC server and client found in `./samples/brokers/http/src/grpc`. + +### Docker + +These are containerized too: + +```bash +docker run \ +--rm --interactive --tty \ +--net=host \ +--name=grpc-broker-golang \ +--env=AKRI_HTTP_DEVICE_ENDPOINT=localhost:8005 \ +${REGISTRY}/${USER}/${PREFIX}-grpc-broker:${TAG} \ +--grpc_endpoint=:50051 +``` + +And: + +```bash +docker run \ +--rm --interactive --tty \ +--net=host \ +--name=grpc-client-golang \ +${REGISTRY}/${USER}/${PREFIX}-grpc-client:${TAG} \ +--grpc_endpoint=:50051 +``` + +### Kubernetes + +You will need to replace `IMAGE` and `SECRET` in the Kubernetes configs before you deploy them. + +`SECRET` should be replaced with the value (if any) of the Kubernetes Secret that provides the token to your registry. + +```bash +for APP in "broker" "client" +do + IMAGE="$(docker inspect --format='{{index .RepoDigests 0}}' ${REGISTRY}/${USER}/${PREFIX}-grpc-${APP}:${TAG})" + sed \ + --in-place \ + "s|IMAGE|${IMAGE}|g" + ./kubernetes/grpc.${APP}.yaml +done +``` + +Then: + +```bash +kubectl apply --filename=./kubernetes/gprc.broker.yaml +kubectl apply --filename=./kubernetes/grpc.client.yaml +``` + diff --git a/samples/apps/http-apps/build.sh b/samples/apps/http-apps/build.sh new file mode 100755 index 000000000..efa9ac501 --- /dev/null +++ b/samples/apps/http-apps/build.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +: "${REGISTRY:?Need to export REGISTRY e.g. ghcr.io}" +: "${USER:?Need to export USER e.g. ghcr.io/deislabs/...}" +: "${PREFIX:?Need to export PREFIX e.g. ${REGISTRY}/${USER}/http-apps...}" +: "${TAG:?Need to export TAG e.g. v1}" + +for APP in "device" "discovery" +do + IMAGE="${REGISTRY}/${USER}/${PREFIX}-${APP}:${TAG}" + docker build \ + --tag=${IMAGE} \ + --file=./Dockerfiles/${APP} \ + . + docker push ${IMAGE} +done + +for APP in "broker" "client" +do + IMAGE="${REGISTRY}/${USER}/${PREFIX}-grpc-${APP}-golang:${TAG}" + docker build \ + --tag=${IMAGE} \ + --file=./Dockerfiles/grpc.${APP} \ + . + docker push ${IMAGE} +done \ No newline at end of file diff --git a/samples/apps/http-apps/cmd/device/main.go b/samples/apps/http-apps/cmd/device/main.go new file mode 100644 index 000000000..6de076937 --- /dev/null +++ b/samples/apps/http-apps/cmd/device/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "flag" + "fmt" + "log" + "math/rand" + "net" + "net/http" + "time" + + "github.com/deislabs/akri/http-extensibility/shared" +) + +const ( + addr = ":8080" +) + +var _ flag.Value = (*shared.RepeatableFlag)(nil) +var paths shared.RepeatableFlag + +func main() { + flag.Var(&paths, "path", "Repeat this flag to add paths for the device") + flag.Parse() + + // At a minimum, respond on `/` + if len(paths) == 0 { + paths = []string{"/"} + } + log.Printf("[main] Paths: %d", len(paths)) + + seed := rand.NewSource(time.Now().UnixNano()) + entr := rand.New(seed) + + handler := http.NewServeMux() + + // Create handler for each endpoint + for _, path := range paths { + log.Printf("[main] Creating handler: %s", path) + handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + log.Printf("[main:handler] Handler entered: %s", path) + fmt.Fprint(w, entr.Float64()) + }) + } + + s := &http.Server{ + Addr: addr, + Handler: handler, + } + listen, err := net.Listen("tcp", addr) + if err != nil { + log.Fatal(err) + } + + log.Printf("[main] Starting Device: [%s]", addr) + log.Fatal(s.Serve(listen)) +} diff --git a/samples/apps/http-apps/cmd/discovery/main.go b/samples/apps/http-apps/cmd/discovery/main.go new file mode 100644 index 000000000..72d7d5446 --- /dev/null +++ b/samples/apps/http-apps/cmd/discovery/main.go @@ -0,0 +1,42 @@ +package main + +import ( + "flag" + "fmt" + "html" + "log" + "net" + "net/http" + + "github.com/deislabs/akri/http-extensibility/shared" +) + +const ( + addr = ":9999" +) + +var _ flag.Value = (*shared.RepeatableFlag)(nil) +var devices shared.RepeatableFlag + +func main() { + flag.Var(&devices, "device", "Repeat this flag to add devices to the discovery service") + flag.Parse() + + handler := http.NewServeMux() + handler.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + log.Printf("[discovery] Handler entered") + fmt.Fprintf(w, "%s\n", html.EscapeString(devices.String())) + }) + + s := &http.Server{ + Addr: addr, + Handler: handler, + } + listen, err := net.Listen("tcp", addr) + if err != nil { + log.Fatal(err) + } + + log.Printf("[createDiscoveryService] Starting Discovery Service: %s", addr) + log.Fatal(s.Serve(listen)) +} diff --git a/samples/apps/http-apps/cmd/grpc/broker/main.go b/samples/apps/http-apps/cmd/grpc/broker/main.go new file mode 100644 index 000000000..ca7794ca4 --- /dev/null +++ b/samples/apps/http-apps/cmd/grpc/broker/main.go @@ -0,0 +1,46 @@ +package main + +import ( + "flag" + "log" + "net" + "os" + + pb "github.com/deislabs/akri/http-extensibility/prots" + + "google.golang.org/grpc" +) + +const ( + deviceEndpoint = "AKRI_HTTP_DEVICE_ENDPOINT" +) + +var ( + grpcEndpoint = flag.String("grpc_endpoint", "", "The endpoint of this gRPC server.") +) + +func main() { + log.Println("[main] Starting gRPC server") + + flag.Parse() + if *grpcEndpoint == "" { + log.Fatal("[main] Unable to start server. Requires gRPC endpoint.") + } + + deviceURL := os.Getenv(deviceEndpoint) + if deviceURL == "" { + log.Fatalf("Unable to determine Device URL using environment: %s", deviceEndpoint) + } + + serverOpts := []grpc.ServerOption{} + grpcServer := grpc.NewServer(serverOpts...) + + pb.RegisterDeviceServiceServer(grpcServer, NewServer(deviceURL)) + + listen, err := net.Listen("tcp", *grpcEndpoint) + if err != nil { + log.Fatal(err) + } + log.Printf("[main] Starting gRPC Listener [%s]\n", *grpcEndpoint) + log.Fatal(grpcServer.Serve(listen)) +} diff --git a/samples/apps/http-apps/cmd/grpc/broker/server.go b/samples/apps/http-apps/cmd/grpc/broker/server.go new file mode 100644 index 000000000..085b34258 --- /dev/null +++ b/samples/apps/http-apps/cmd/grpc/broker/server.go @@ -0,0 +1,50 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "log" + "net/http" + + pb "github.com/deislabs/akri/http-extensibility/protos" +) + +var _ pb.DeviceServiceServer = (*Server)(nil) + +// Server is a type that implements pb.DeviceServiceServer +type Server struct { + DeviceURL string +} + +// NewServer is a function that returns a new Server +func NewServer(deviceURL string) *Server { + return &Server{ + DeviceURL: deviceURL, + } +} + +// ReadSensor is a method that implements the pb.HTTPServer interface +func (s *Server) ReadSensor(ctx context.Context, rqst *pb.ReadSensorRequest) (*pb.ReadSensorResponse, error) { + log.Println("[read_sensor] Entered") + resp, err := http.Get(s.DeviceURL) + if err != nil { + return &pb.ReadSensorResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode > 299 { + log.Printf("[read_sensor] Response status: %d", resp.StatusCode) + return &pb.ReadSensorResponse{}, fmt.Errorf("response code: %d", resp.StatusCode) + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return &pb.ReadSensorResponse{}, err + } + + log.Printf("[read_sensor] Response body: %s", body) + return &pb.ReadSensorResponse{ + Value: string(body), + }, nil +} diff --git a/samples/apps/http-apps/cmd/grpc/client/main.go b/samples/apps/http-apps/cmd/grpc/client/main.go new file mode 100644 index 000000000..d856fcc25 --- /dev/null +++ b/samples/apps/http-apps/cmd/grpc/client/main.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" + "flag" + "log" + "time" + + pb "github.com/deislabs/akri/http-extensibility/proto" + + "google.golang.org/grpc" +) + +var ( + grpcEndpoint = flag.String("grpc_endpoint", "", "The endpoint of the gRPC server.") +) + +func main() { + log.Println("[main] Starting gRPC client") + defer func() { + log.Println("[main] Stopping gRPC client") + }() + + flag.Parse() + if *grpcEndpoint == "" { + log.Fatal("[main] Unable to start client. Requires endpoint to a gRPC Server.") + } + + dialOpts := []grpc.DialOption{ + grpc.WithInsecure(), + } + log.Printf("Connecting to gRPC server [%s]", *grpcEndpoint) + conn, err := grpc.Dial(*grpcEndpoint, dialOpts...) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + + client := pb.NewDeviceServiceClient(conn) + ctx := context.Background() + + for { + log.Println("[main:loop]") + + // Call Service + { + rqst := &pb.ReadSensorRequest{ + Name: "/", + } + log.Println("[main:loop] Calling read_sensor") + resp, err := client.ReadSensor(ctx, rqst) + if err != nil { + log.Fatal(err) + } + + log.Printf("[main:loop] Success: %+v", resp) + } + + // Add a pause between iterations + log.Println("[main:loop] Sleep") + time.Sleep(10 * time.Second) + } +} diff --git a/samples/apps/http-apps/go.mod b/samples/apps/http-apps/go.mod new file mode 100644 index 000000000..57ad18608 --- /dev/null +++ b/samples/apps/http-apps/go.mod @@ -0,0 +1,9 @@ +module github.com/deislabs/akri/http-extensibility + +go 1.15 + +require ( + github.com/golang/protobuf v1.4.3 + google.golang.org/grpc v1.33.2 + google.golang.org/protobuf v1.25.0 +) diff --git a/samples/apps/http-apps/kubernetes/device.yaml b/samples/apps/http-apps/kubernetes/device.yaml new file mode 100644 index 000000000..f5cb0910a --- /dev/null +++ b/samples/apps/http-apps/kubernetes/device.yaml @@ -0,0 +1,30 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: device +spec: + replicas: 1 + selector: + matchLabels: + project: akri + protocol: http + function: device + template: + metadata: + labels: + project: akri + protocol: http + function: device + name: device + spec: + imagePullSecrets: + - name: SECRET + containers: + - name: device + image: IMAGE + imagePullPolicy: Always + args: + - --path=/ + ports: + - name: http + containerPort: 8080 diff --git a/samples/apps/http-apps/kubernetes/discovery.yaml b/samples/apps/http-apps/kubernetes/discovery.yaml new file mode 100644 index 000000000..878843c3a --- /dev/null +++ b/samples/apps/http-apps/kubernetes/discovery.yaml @@ -0,0 +1,38 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: discovery +spec: + replicas: 1 + selector: + matchLabels: + project: akri + protocol: http + function: discovery + template: + metadata: + labels: + project: akri + protocol: http + function: discovery + name: discovery + spec: + imagePullSecrets: + - name: SECRET + containers: + - name: discovery + image: IMAGE + imagePullPolicy: Always + args: + - --device=http://device-1:8080 + - --device=http://device-2:8080 + - --device=http://device-3:8080 + - --device=http://device-4:8080 + - --device=http://device-5:8080 + - --device=http://device-6:8080 + - --device=http://device-7:8080 + - --device=http://device-8:8080 + - --device=http://device-9:8080 + ports: + - name: http + containerPort: 9999 diff --git a/samples/apps/http-apps/kubernetes/grpc.broker.yaml b/samples/apps/http-apps/kubernetes/grpc.broker.yaml new file mode 100644 index 000000000..249a4129e --- /dev/null +++ b/samples/apps/http-apps/kubernetes/grpc.broker.yaml @@ -0,0 +1,30 @@ +apiVersion: akri.sh/v0 +kind: Configuration +metadata: + name: http-grpc-broker-golang +spec: + protocol: + http: + discoveryEndpoint: http://discovery:9999 + capacity: 1 + brokerPodSpec: + imagePullSecrets: # GitHub Container Registry secret + - name: SECRET + containers: + - name: http-grpc-broker-golang + image: IMAGE + args: + - --grpc_endpoint=0.0.0.0:50051 + resources: + limits: + "{{PLACEHOLDER}}": "1" + instanceServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 + configurationServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 diff --git a/samples/apps/http-apps/kubernetes/grpc.client.yaml b/samples/apps/http-apps/kubernetes/grpc.client.yaml new file mode 100644 index 000000000..c7ff8454b --- /dev/null +++ b/samples/apps/http-apps/kubernetes/grpc.client.yaml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: http-grpc-client-golang +spec: + replicas: 1 + selector: + matchLabels: + project: akri + protocol: http + function: client + language: golang + template: + metadata: + labels: + project: akri + protocol: http + function: client + language: golang + name: http-grpc-client-golang + spec: + imagePullSecrets: + - name: SECRET + containers: + - name: http-grpc-client-golang + image: IMAGE + args: + - --grpc_endpoint=http-svc:50051 diff --git a/samples/apps/http-apps/proto/http.proto b/samples/apps/http-apps/proto/http.proto new file mode 100644 index 000000000..6d74c7777 --- /dev/null +++ b/samples/apps/http-apps/proto/http.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option go_package = "github.com/deislabs/akri/http-extensibility/proto"; + +package http; + +service DeviceService { + rpc ReadSensor (ReadSensorRequest) returns (ReadSensorResponse); +} + +message ReadSensorRequest { + string name = 1; +} +message ReadSensorResponse { + string value = 1; +} diff --git a/samples/apps/http-apps/shared/paths.go b/samples/apps/http-apps/shared/paths.go new file mode 100644 index 000000000..d544a910c --- /dev/null +++ b/samples/apps/http-apps/shared/paths.go @@ -0,0 +1,20 @@ +package shared + +import ( + "strings" +) + +// RepeatableFlag is an alias to use repeated flags with flag +type RepeatableFlag []string + +// String is a method required by flag.Value interface +func (e *RepeatableFlag) String() string { + result := strings.Join(*e, "\n") + return result +} + +// Set is a method required by flag.Value interface +func (e *RepeatableFlag) Set(value string) error { + *e = append(*e, value) + return nil +} diff --git a/samples/brokers/http/Cargo.toml b/samples/brokers/http/Cargo.toml new file mode 100644 index 000000000..ca349bdf6 --- /dev/null +++ b/samples/brokers/http/Cargo.toml @@ -0,0 +1,29 @@ +[package] +name = "http" +version = "0.1.0" +authors = ["DazWilkin "] +edition = "2018" + +[[bin]] +name = "standalone" +path = "src/main.rs" + +[[bin]] +name = "broker" +path = "src/grpc/broker.rs" + +[[bin]] +name = "client" +path = "src/grpc/client.rs" + +[dependencies] +clap = "2.33.3" +futures = "0.3" +futures-util = "0.3" +prost = "0.6" +reqwest = "0.10.8" +tokio = { version = "0.2", features = ["rt-threaded", "time", "stream", "fs", "macros", "uds"] } +tonic = "0.1" + +[build-dependencies] +tonic-build = "0.1.1" diff --git a/samples/brokers/http/Dockerfiles/grpc.broker b/samples/brokers/http/Dockerfiles/grpc.broker new file mode 100644 index 000000000..2053edea3 --- /dev/null +++ b/samples/brokers/http/Dockerfiles/grpc.broker @@ -0,0 +1,51 @@ +ARG PLATFORM=amd64 +ARG CROSS_BUILD_TARGET=x86_64-unknown-linux-gnu +ARG BROKER=http + +FROM ${PLATFORM}/rust:1.47 as build + +ARG BROKER + +RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu + +RUN USER=root cargo new --bin ${BROKER} + +WORKDIR /${BROKER} + +COPY ./samples/brokers/${BROKER}/Cargo.toml ./Cargo.toml +RUN cargo build \ + --bin=standalone \ + --release +RUN rm ./src/*.rs +RUN rm ./target/release/deps/${BROKER}* + +COPY ./samples/brokers/${BROKER} . + +RUN cargo build \ + --bin=broker \ + --release + +FROM amd64/debian:buster-slim + +ARG BROKER + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + libssl-dev \ + openssl && \ + apt-get clean + +COPY --from=build /${BROKER}/target/release/broker /broker + +LABEL org.opencontainers.image.source https://github.com/deislabs/akri + +# Expose port used by broker service +EXPOSE 8084 + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs +ENV RUST_LOG ${BROKER},akri_shared + +ENTRYPOINT ["/broker"] +CMD ["--grpc_endpoint=0.0.0.0:8084"] diff --git a/samples/brokers/http/Dockerfiles/grpc.client b/samples/brokers/http/Dockerfiles/grpc.client new file mode 100644 index 000000000..09cbd1464 --- /dev/null +++ b/samples/brokers/http/Dockerfiles/grpc.client @@ -0,0 +1,48 @@ +ARG PLATFORM=amd64 +ARG CROSS_BUILD_TARGET=x86_64-unknown-linux-gnu +ARG BROKER=http + +FROM ${PLATFORM}/rust:1.47 as build + +ARG BROKER + +RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu + +RUN USER=root cargo new --bin ${BROKER} + +WORKDIR /${BROKER} + +COPY ./samples/brokers/${BROKER}/Cargo.toml ./Cargo.toml +RUN cargo build \ + --bin=standalone \ + --release +RUN rm ./src/*.rs +RUN rm ./target/release/deps/${BROKER}* + +COPY ./samples/brokers/${BROKER} . + +RUN cargo build \ + --bin=client \ + --release + +FROM amd64/debian:buster-slim + +ARG BROKER + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + libssl-dev \ + openssl && \ + apt-get clean + +COPY --from=build /${BROKER}/target/release/client /client + +LABEL org.opencontainers.image.source https://github.com/deislabs/akri + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs +ENV RUST_LOG ${BROKER},akri_shared + +ENTRYPOINT ["/client"] +CMD ["--grpc_endpoint=:8084"] diff --git a/samples/brokers/http/Dockerfiles/standalone b/samples/brokers/http/Dockerfiles/standalone new file mode 100644 index 000000000..d9bf95955 --- /dev/null +++ b/samples/brokers/http/Dockerfiles/standalone @@ -0,0 +1,48 @@ +ARG PLATFORM=amd64 +ARG CROSS_BUILD_TARGET=x86_64-unknown-linux-gnu +ARG BROKER=http + +FROM ${PLATFORM}/rust:1.47 as build + +ARG BROKER + +RUN rustup component add rustfmt --toolchain 1.47.0-x86_64-unknown-linux-gnu + +RUN USER=root cargo new --bin ${BROKER} + +WORKDIR /${BROKER} + +COPY ./samples/brokers/${BROKER}/Cargo.toml ./Cargo.toml +RUN cargo build \ + --bin=standalone \ + --release +RUN rm ./src/*.rs +RUN rm ./target/release/deps/${BROKER}* + +COPY ./samples/brokers/${BROKER} . + +RUN cargo build \ + --bin=standalone \ + --release + +FROM amd64/debian:buster-slim + +ARG BROKER + +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + libssl-dev \ + openssl && \ + apt-get clean + +# Rename ${BROKER} binary to broker +COPY --from=build /${BROKER}/target/release/standalone /broker + +LABEL org.opencontainers.image.source https://github.com/deislabs/akri + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs +ENV RUST_LOG ${BROKER},akri_shared + +ENTRYPOINT ["/broker"] diff --git a/samples/brokers/http/build.rs b/samples/brokers/http/build.rs new file mode 100644 index 000000000..a173cec1a --- /dev/null +++ b/samples/brokers/http/build.rs @@ -0,0 +1,4 @@ +fn main() -> Result<(), Box> { + tonic_build::compile_protos("proto/http.proto")?; + Ok(()) +} diff --git a/samples/brokers/http/build.sh b/samples/brokers/http/build.sh new file mode 100755 index 000000000..0199ab176 --- /dev/null +++ b/samples/brokers/http/build.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +: "${REGISTRY:?Need to export REGISTRY e.g. ghcr.io}" +: "${USER:?Need to export USER e.g. ghcr.io/deislabs/...}" +: "${PREFIX:?Need to export PREFIX e.g. ${REGISTRY}/${USER}/http...}" +: "${TAG:?Need to export TAG e.g. v1}" + +# Standalone +( + IMAGE="${REGISTRY}/${USER}/${PREFIX}-broker:${TAG}" + docker build \ + --tag=${IMAGE} \ + --file=./Dockerfiles/standalone \ + ../../.. + + docker push ${IMAGE} +) + +# gRPC Broker|Client +# Broker +( + IMAGE="${REGISTRY}/${USER}/${PREFIX}-grpc-broker:${TAG}" + docker build \ + --tag=${IMAGE} \ + --file=./Dockerfiles/grpc.broker \ + ../../.. + + docker push ${IMAGE} +) +# Client +( + IMAGE="${REGISTRY}/${USER}/${PREFIX}-grpc-client:${TAG}" + docker build \ + --tag=${IMAGE} \ + --file=./Dockerfiles/grpc.client \ + ../../.. + + docker push ${IMAGE} +) diff --git a/samples/brokers/http/kubernetes/http.grpc.broker.yaml b/samples/brokers/http/kubernetes/http.grpc.broker.yaml new file mode 100644 index 000000000..774189169 --- /dev/null +++ b/samples/brokers/http/kubernetes/http.grpc.broker.yaml @@ -0,0 +1,30 @@ +apiVersion: akri.sh/v0 +kind: Configuration +metadata: + name: http-grpc-broker-rust +spec: + protocol: + http: + discoveryEndpoint: http://discovery:9999 + capacity: 1 + brokerPodSpec: + imagePullSecrets: # GitHub Container Registry secret + - name: SECRET + containers: + - name: http-grpc-broker-rust + image: IMAGE + args: + - --grpc_endpoint=0.0.0.0:50051 + resources: + limits: + "{{PLACEHOLDER}}": "1" + instanceServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 + configurationServiceSpec: + ports: + - name: grpc + port: 50051 + targetPort: 50051 diff --git a/samples/brokers/http/kubernetes/http.grpc.client.yaml b/samples/brokers/http/kubernetes/http.grpc.client.yaml new file mode 100644 index 000000000..c0be084ff --- /dev/null +++ b/samples/brokers/http/kubernetes/http.grpc.client.yaml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: http-grpc-client-rust +spec: + replicas: 1 + selector: + matchLabels: + project: akri + protocol: http + function: client + language: rust + template: + metadata: + labels: + project: akri + protocol: http + function: client + language: rust + name: http-grpc-client-rust + spec: + imagePullSecrets: + - name: SECRET + containers: + - name: http-grpc-client-rust + image: IMAGE + args: + - --grpc_endpoint=http-svc:50051 diff --git a/samples/brokers/http/kubernetes/http.yaml b/samples/brokers/http/kubernetes/http.yaml new file mode 100644 index 000000000..e86906c79 --- /dev/null +++ b/samples/brokers/http/kubernetes/http.yaml @@ -0,0 +1,18 @@ +apiVersion: akri.sh/v0 +kind: Configuration +metadata: + name: http +spec: + protocol: + http: + discoveryEndpoint: http://discovery:9999 + capacity: 1 + brokerPodSpec: + imagePullSecrets: # Container Registry secret + - name: SECRET + containers: + - name: http-broker + image: IMAGE + resources: + limits: + "{{PLACEHOLDER}}": "1" diff --git a/samples/brokers/http/proto/http.proto b/samples/brokers/http/proto/http.proto new file mode 100644 index 000000000..6d74c7777 --- /dev/null +++ b/samples/brokers/http/proto/http.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option go_package = "github.com/deislabs/akri/http-extensibility/proto"; + +package http; + +service DeviceService { + rpc ReadSensor (ReadSensorRequest) returns (ReadSensorResponse); +} + +message ReadSensorRequest { + string name = 1; +} +message ReadSensorResponse { + string value = 1; +} diff --git a/samples/brokers/http/src/grpc/broker.rs b/samples/brokers/http/src/grpc/broker.rs new file mode 100644 index 000000000..9a4d9fefa --- /dev/null +++ b/samples/brokers/http/src/grpc/broker.rs @@ -0,0 +1,76 @@ +pub mod http { + tonic::include_proto!("http"); +} + +use clap::{App, Arg}; +use http::{ + device_service_server::{DeviceService, DeviceServiceServer}, + ReadSensorRequest, ReadSensorResponse, +}; +use reqwest::get; +use std::env; +use std::net::SocketAddr; +use tonic::{transport::Server, Code, Request, Response, Status}; + +const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT"; + +#[derive(Default)] +pub struct Device { + device_url: String, +} + +#[tonic::async_trait] +impl DeviceService for Device { + async fn read_sensor( + &self, + _rqst: Request, + ) -> Result, Status> { + println!("[read_sensor] Entered"); + match get(&self.device_url).await { + Ok(resp) => { + println!("[read_sensor] Response status: {:?}", resp.status()); + let body = resp.text().await.unwrap(); + println!("[read_sensor] Response body: {:?}", body); + Ok(Response::new(ReadSensorResponse { value: body })) + } + Err(err) => { + println!("[read_sensor] Error: {:?}", err); + Err(Status::new(Code::Unavailable, "device is unavailable")) + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("[main] Entered"); + + let matches = App::new("broker") + .arg( + Arg::with_name("grpc_endpoint") + .long("grpc_endpoint") + .value_name("ENDPOINT") + .help("Endpoint address that the gRPC server will listen on.") + .required(true), + ) + .get_matches(); + let grpc_endpoint = matches.value_of("grpc_endpoint").unwrap(); + + let addr: SocketAddr = grpc_endpoint.parse().unwrap(); + println!("[main] gRPC service endpoint: {}", addr); + + let device_url = env::var(DEVICE_ENDPOINT)?; + println!("[main] gRPC service proxying: {}", device_url); + + let device_service = Device { device_url }; + let service = DeviceServiceServer::new(device_service); + + println!("[main] gRPC service starting"); + Server::builder() + .add_service(service) + .serve(addr) + .await + .expect("unable to start http-prtocol gRPC server"); + + Ok(()) +} diff --git a/samples/brokers/http/src/grpc/client.rs b/samples/brokers/http/src/grpc/client.rs new file mode 100644 index 000000000..6d12d23bd --- /dev/null +++ b/samples/brokers/http/src/grpc/client.rs @@ -0,0 +1,42 @@ +pub mod http { + tonic::include_proto!("http"); +} + +use clap::{App, Arg}; +use http::{device_service_client::DeviceServiceClient, ReadSensorRequest}; +use tokio::{time, time::Duration}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("[main] Entered"); + + let matches = App::new("client") + .arg( + Arg::with_name("grpc_endpoint") + .long("grpc_endpoint") + .value_name("ENDPOINT") + .help("Endpoint address of the gRPC server.") + .required(true), + ) + .get_matches(); + let grpc_endpoint = matches.value_of("grpc_endpoint").unwrap(); + + let endpoint = format!("http://{}", grpc_endpoint); + println!("[main] gRPC client dialing: {}", endpoint); + let mut client = DeviceServiceClient::connect(endpoint).await?; + + loop { + println!("[main:loop] Constructing Request"); + let rqst = tonic::Request::new(ReadSensorRequest { + name: "/".to_string(), + }); + println!("[main:loop] Calling read_sensor"); + let resp = client.read_sensor(rqst).await?; + println!("[main:loop] Response: {:?}", resp); + + println!("[main:loop] Sleep"); + time::delay_for(Duration::from_secs(10)).await; + } + + Ok(()) +} diff --git a/samples/brokers/http/src/main.rs b/samples/brokers/http/src/main.rs new file mode 100644 index 000000000..65a97c8ee --- /dev/null +++ b/samples/brokers/http/src/main.rs @@ -0,0 +1,36 @@ +use reqwest::get; +use std::env; +use tokio::{time, time::Duration}; + +const DEVICE_ENDPOINT: &str = "AKRI_HTTP_DEVICE_ENDPOINT"; + +async fn read_sensor(device_url: &str) { + println!("[http:read_sensor] Entered"); + match get(device_url).await { + Ok(resp) => { + println!("[main:read_sensor] Response status: {:?}", resp.status()); + let body = resp.text().await; + println!("[main:read_sensor] Response body: {:?}", body); + } + Err(err) => println!("Error: {:?}", err), + }; +} +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("[http:main] Entered"); + + let device_url = env::var(DEVICE_ENDPOINT)?; + println!("[http:main] Device: {}", &device_url); + + let mut tasks = Vec::new(); + tasks.push(tokio::spawn(async move { + loop { + println!("[http:main:loop] Sleep"); + time::delay_for(Duration::from_secs(10)).await; + println!("[http:main:loop] read_sensor({})", &device_url); + read_sensor(&device_url[..]).await; + } + })); + futures::future::join_all(tasks).await; + Ok(()) +}