From 0d71de6e84ad94bd45579f1204944b05afd3381a Mon Sep 17 00:00:00 2001 From: Matthias Simonis Date: Fri, 1 Mar 2024 09:29:13 +0100 Subject: [PATCH] Rust/for stream (#375) * broken implementation * stream API works --------- Co-authored-by: Stefan Hausotte --- rust/.devcontainer/Dockerfile | 24 ---- rust/.devcontainer/devcontainer.json | 60 +++++---- rust/.github/dependabot.yml | 12 ++ rust/Cargo.toml | 10 +- rust/src/auth/authenticator.rs | 2 +- rust/src/auth/authenticators/mod.rs | 2 +- rust/src/connection.rs | 114 +++++++++++++++++- rust/src/lib.rs | 1 - rust/src/message/kind.rs | 1 + rust/src/message/mod.rs | 2 + .../src/message/verdict_request_for_stream.rs | 27 +++++ rust/tests/real_api_integration_tests.rs | 28 +++++ 12 files changed, 218 insertions(+), 65 deletions(-) delete mode 100644 rust/.devcontainer/Dockerfile create mode 100644 rust/.github/dependabot.yml create mode 100644 rust/src/message/verdict_request_for_stream.rs diff --git a/rust/.devcontainer/Dockerfile b/rust/.devcontainer/Dockerfile deleted file mode 100644 index 679b7f9e..00000000 --- a/rust/.devcontainer/Dockerfile +++ /dev/null @@ -1,24 +0,0 @@ -FROM mcr.microsoft.com/vscode/devcontainers/base:1-focal - -# Install common stuff -RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ - && apt-get -y install --no-install-recommends \ - # common stuff - bash-completion gcc g++ make build-essential libssl-dev pkg-config \ - software-properties-common - -# install sepcifics for the code -RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ - && apt-get -y install --no-install-recommends \ - # dependencies of examples/kde_dolphin - libglib2.0-dev libcairo2-dev libpango1.0-dev libatk1.0-dev \ - libgdk-pixbuf2.0-dev libsoup2.4-dev libgtk-3-dev libwebkit2gtk-4.0-dev \ - && apt-get -y clean && apt-get -y autoclean && apt-get -y autoremove - -USER vscode -# setup search path -ENV PATH="/home/vscode/.cargo/bin:/home/vscode/.local/bin/:${PATH}" -RUN mkdir -p ~/.local/bin - -# install rust tooling for the vscode user -RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y diff --git a/rust/.devcontainer/devcontainer.json b/rust/.devcontainer/devcontainer.json index 294ef22d..f8fff109 100644 --- a/rust/.devcontainer/devcontainer.json +++ b/rust/.devcontainer/devcontainer.json @@ -1,37 +1,31 @@ -// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at: -// https://github.com/microsoft/vscode-dev-containers/tree/v0.187.0/containers/ubuntu +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/rust { - "name": "Ubuntu", - "build": { - "dockerfile": "Dockerfile", - // Update 'VARIANT' to pick an Ubuntu version: focal, bionic - "args": { - "VARIANT": "focal" - } - }, - "mounts": [ - "source=${localEnv:HOME}/.docker/config.json,target=/root/.docker/config.json,type=bind,consistency=cached", - "source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind,consistency=consistent" - ], - "runArgs": [ - "--network=host" - ], - // Set *default* container specific settings.json values on container create. - "settings": {}, - // Add the IDs of extensions you want installed when the container is created. - "extensions": [ - "nico-castell.linux-desktop-file", - "bungcip.better-toml", - "jeff-hykin.better-dockerfile-syntax", - "SixtyFPS.sixtyfps-vscode", - "Gruntfuggly.todo-tree", - "rust-lang.rust-analyzer", - "Swellaby.vscode-rust-test-adapter" - ], + "name": "Rust", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye" + + // Use 'mounts' to make the cargo cache persistent in a Docker Volume. + // "mounts": [ + // { + // "source": "devcontainer-cargo-cache-${devcontainerId}", + // "target": "/usr/local/cargo", + // "type": "volume" + // } + // ] + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], + // Use 'postCreateCommand' to run commands after the container is created. - // "postCreateCommand": "uname -a", - // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. - "remoteUser": "vscode" -} \ No newline at end of file + // "postCreateCommand": "rustc --version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} diff --git a/rust/.github/dependabot.yml b/rust/.github/dependabot.yml new file mode 100644 index 00000000..f33a02cd --- /dev/null +++ b/rust/.github/dependabot.yml @@ -0,0 +1,12 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for more information: +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates +# https://containers.dev/guide/dependabot + +version: 2 +updates: + - package-ecosystem: "devcontainers" + directory: "/" + schedule: + interval: weekly diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 7b5c5a84..52496664 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -12,18 +12,22 @@ homepage = "https://github.com/GDATASoftwareAG/vaas" [dependencies] websockets = "0.3.0" -serde = { version = "1.0", features = ["derive"]} +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" uuid = { version = "1.7", features = ["serde", "v4"] } -reqwest = "0.11" +reqwest = { version = "0.11", features = ["stream"] } regex = "1.10" tokio = { version = "1.36", features = ["sync", "fs"] } sha2 = "0.10" futures = "0.3" rand = "0.8" async-trait = "0.1" +bytes = "1.5.0" +tokio-util = "0.7" +futures-util = "0.3.30" +tokio-stream = "0.1.14" [dev-dependencies] dotenv = "0.15" -tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] } \ No newline at end of file +tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] } diff --git a/rust/src/auth/authenticator.rs b/rust/src/auth/authenticator.rs index 30141b9f..7b7ebc6b 100644 --- a/rust/src/auth/authenticator.rs +++ b/rust/src/auth/authenticator.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; pub static DEFAULT_TOKEN_URL: &str = "https://account.gdata.de/realms/vaas-production/protocol/openid-connect/token"; -/// This trait has to be implemented by any authentication methods for VaaS. +/// This trait has to be implemented by any authentication methods for VaaS. #[async_trait] pub trait Authenticator { /// Return a valid token that can be used to authenticate against the VaaS service. diff --git a/rust/src/auth/authenticators/mod.rs b/rust/src/auth/authenticators/mod.rs index ade49b67..8d6de9cc 100644 --- a/rust/src/auth/authenticators/mod.rs +++ b/rust/src/auth/authenticators/mod.rs @@ -1,5 +1,5 @@ //! # Authenticators -//! +//! //! This module contains the different **OAuth2 Grant Types** that can be used to authenticate against the VaaS service. mod client_credentials; diff --git a/rust/src/connection.rs b/rust/src/connection.rs index 5e152efb..b831f372 100644 --- a/rust/src/connection.rs +++ b/rust/src/connection.rs @@ -2,14 +2,16 @@ use crate::error::{Error, VResult}; use crate::message::{ - MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForUrl, VerdictResponse, + MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForStream, VerdictRequestForUrl, + VerdictResponse, }; use crate::options::Options; use crate::sha256::Sha256; use crate::vaas_verdict::VaasVerdict; use crate::CancellationToken; +use bytes::Bytes; use futures::future::join_all; -use reqwest::Url; +use reqwest::{Body, Url}; use std::convert::TryFrom; use std::ops::Deref; use std::path::{Path, PathBuf}; @@ -113,6 +115,48 @@ impl Connection { VaasVerdict::try_from(response) } + /// Request a verdict for a SHA256 file hash. + pub async fn for_stream( + &self, + stream: S, + content_length: usize, + ct: &CancellationToken, + ) -> VResult + where + S: futures_util::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Bytes: From, + { + let request = VerdictRequestForStream::new(self.session_id.clone()); + let guid = request.guid().to_string(); + + let response = Self::for_stream_request( + request, + self.ws_writer.clone(), + &mut self.result_channel.subscribe(), + ct, + ) + .await?; + + let verdict = Verdict::try_from(&response)?; + + match verdict { + Verdict::Unknown { upload_url } => { + Self::handle_unknown_stream( + stream, + content_length, + &guid, + response, + upload_url, + &mut self.result_channel.subscribe(), + ct, + ) + .await + } + _ => Err(Error::Cancelled), + } + } + /// Request verdicts for a list of SHA256 file hashes. /// The order of the output is the same order as the provided input. pub async fn for_sha256_list( @@ -183,6 +227,37 @@ impl Connection { VaasVerdict::try_from(resp) } + async fn handle_unknown_stream( + stream: S, + content_length: usize, + guid: &str, + response: VerdictResponse, + upload_url: UploadUrl, + result_channel: &mut ResultChannelRx, + ct: &CancellationToken, + ) -> Result + where + S: futures_util::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Bytes: From, + { + let auth_token = response + .upload_token + .as_ref() + .ok_or(Error::MissingAuthToken)?; + let response = upload_stream(stream, content_length, upload_url, auth_token).await?; + + if response.status() != 200 { + return Err(Error::FailedUploadFile( + response.status(), + response.text().await.expect("failed to get payload"), + )); + } + + let resp = Self::wait_for_response(guid, result_channel, ct).await?; + VaasVerdict::try_from(resp) + } + /// Request a verdict for a list of files. /// The order of the output is the same order as the provided input. pub async fn for_file_list( @@ -216,6 +291,17 @@ impl Connection { Self::wait_for_response(&guid, result_channel, ct).await } + async fn for_stream_request( + request: VerdictRequestForStream, + ws_writer: WebSocketWriter, + result_channel: &mut ResultChannelRx, + ct: &CancellationToken, + ) -> VResult { + let guid = request.guid().to_string(); + ws_writer.lock().await.send_text(request.to_json()?).await?; + Self::wait_for_response(&guid, result_channel, ct).await + } + async fn wait_for_response( guid: &str, result_channel: &mut ResultChannelRx, @@ -307,6 +393,30 @@ async fn upload_file( Ok(response) } +async fn upload_stream( + stream: S, + content_length: usize, + upload_url: UploadUrl, + auth_token: &str, +) -> VResult +where + S: futures_util::stream::TryStream + Send + Sync + 'static, + S::Error: Into>, + Bytes: From, +{ + let client = reqwest::Client::new(); + let body = Body::wrap_stream(stream); + let response = client + .put(upload_url.deref()) + .body(body) + .header("Authorization", auth_token) + .header("Content-Length", content_length) + .send() + .await?; + + Ok(response) +} + impl Drop for Connection { fn drop(&mut self) { // Abort the spawned threads in the case that the connection diff --git a/rust/src/lib.rs b/rust/src/lib.rs index 3c8cb5ac..dc136a06 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -110,7 +110,6 @@ pub mod vaas_verdict; pub use crate::vaas::Vaas; pub use builder::Builder; pub use cancellation::CancellationToken; -pub use cancellation::*; pub use connection::Connection; pub use sha256::Sha256; pub use vaas_verdict::VaasVerdict; diff --git a/rust/src/message/kind.rs b/rust/src/message/kind.rs index dd09f4b9..5fb7ead3 100644 --- a/rust/src/message/kind.rs +++ b/rust/src/message/kind.rs @@ -8,4 +8,5 @@ pub enum Kind { VerdictResponse, Error, VerdictRequestForUrl, + VerdictRequestForStream, } diff --git a/rust/src/message/mod.rs b/rust/src/message/mod.rs index 21b831c5..b834ff2d 100644 --- a/rust/src/message/mod.rs +++ b/rust/src/message/mod.rs @@ -9,6 +9,7 @@ mod open_id_connect_token_response; mod upload_url; mod verdict; mod verdict_request; +mod verdict_request_for_stream; mod verdict_request_for_url; mod verdict_response; @@ -20,5 +21,6 @@ pub(super) use open_id_connect_token_response::OpenIdConnectTokenResponse; pub(super) use upload_url::UploadUrl; pub use verdict::Verdict; pub(super) use verdict_request::VerdictRequest; +pub(super) use verdict_request_for_stream::VerdictRequestForStream; pub(super) use verdict_request_for_url::VerdictRequestForUrl; pub(super) use verdict_response::VerdictResponse; diff --git a/rust/src/message/verdict_request_for_stream.rs b/rust/src/message/verdict_request_for_stream.rs new file mode 100644 index 00000000..deb8a3dc --- /dev/null +++ b/rust/src/message/verdict_request_for_stream.rs @@ -0,0 +1,27 @@ +use crate::error::VResult; +use crate::message::kind::Kind; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct VerdictRequestForStream { + pub kind: Kind, + pub guid: String, + pub session_id: String, +} + +impl VerdictRequestForStream { + pub fn new(session_id: String) -> Self { + Self { + guid: uuid::Uuid::new_v4().to_string(), + kind: Kind::VerdictRequestForStream, + session_id, + } + } + + pub fn to_json(&self) -> VResult { + serde_json::to_string(self).map_err(|e| e.into()) + } + pub fn guid(&self) -> &str { + &self.guid + } +} diff --git a/rust/tests/real_api_integration_tests.rs b/rust/tests/real_api_integration_tests.rs index 35898803..3ea2f443 100644 --- a/rust/tests/real_api_integration_tests.rs +++ b/rust/tests/real_api_integration_tests.rs @@ -110,6 +110,34 @@ async fn from_sha256_single_malicious_hash() { ); } +#[tokio::test] +async fn from_http_response_stream_returns_malicious_verdict() { + let result = reqwest::get("http://eicar.eu/eicar.com.txt").await; + let vaas = get_vaas().await; + + let ct = CancellationToken::from_seconds(10); + let response = result.unwrap(); + let content_length: usize = response.content_length().unwrap() as usize; + let byte_stream = response.bytes_stream(); + let verdict = vaas.for_stream(byte_stream, content_length, &ct).await; + + assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict); +} + +#[tokio::test] +async fn from_string_stream_returns_malicious_verdict() { + let eicar_string = "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*"; + + let stream: Vec> = vec![Ok(bytes::Bytes::from(eicar_string))]; + let stream = futures_util::stream::iter(stream); + + let vaas = get_vaas().await; + let ct = CancellationToken::from_seconds(10); + let verdict = vaas.for_stream(stream, eicar_string.len(), &ct).await; + + assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict); +} + // #[tokio::test] // async fn from_sha256_single_pup_hash() { // let vaas = get_vaas().await;