Skip to content

Commit

Permalink
Rust/for stream (#375)
Browse files Browse the repository at this point in the history
* broken implementation

* stream API works

---------

Co-authored-by: Stefan Hausotte <[email protected]>
  • Loading branch information
unglaublicherdude and secana authored Mar 1, 2024
1 parent 6501f21 commit 0d71de6
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 65 deletions.
24 changes: 0 additions & 24 deletions rust/.devcontainer/Dockerfile

This file was deleted.

60 changes: 27 additions & 33 deletions rust/.devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at:
// https://github.com/microsoft/vscode-dev-containers/tree/v0.187.0/containers/ubuntu
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
"name": "Ubuntu",
"build": {
"dockerfile": "Dockerfile",
// Update 'VARIANT' to pick an Ubuntu version: focal, bionic
"args": {
"VARIANT": "focal"
}
},
"mounts": [
"source=${localEnv:HOME}/.docker/config.json,target=/root/.docker/config.json,type=bind,consistency=cached",
"source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind,consistency=consistent"
],
"runArgs": [
"--network=host"
],
// Set *default* container specific settings.json values on container create.
"settings": {},
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"nico-castell.linux-desktop-file",
"bungcip.better-toml",
"jeff-hykin.better-dockerfile-syntax",
"SixtyFPS.sixtyfps-vscode",
"Gruntfuggly.todo-tree",
"rust-lang.rust-analyzer",
"Swellaby.vscode-rust-test-adapter"
],
"name": "Rust",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye"

// Use 'mounts' to make the cargo cache persistent in a Docker Volume.
// "mounts": [
// {
// "source": "devcontainer-cargo-cache-${devcontainerId}",
// "target": "/usr/local/cargo",
// "type": "volume"
// }
// ]

// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "uname -a",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode"
}
// "postCreateCommand": "rustc --version",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
12 changes: 12 additions & 0 deletions rust/.github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot

version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly
10 changes: 7 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ homepage = "https://github.com/GDATASoftwareAG/vaas"

[dependencies]
websockets = "0.3.0"
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "1.7", features = ["serde", "v4"] }
reqwest = "0.11"
reqwest = { version = "0.11", features = ["stream"] }
regex = "1.10"
tokio = { version = "1.36", features = ["sync", "fs"] }
sha2 = "0.10"
futures = "0.3"
rand = "0.8"
async-trait = "0.1"
bytes = "1.5.0"
tokio-util = "0.7"
futures-util = "0.3.30"
tokio-stream = "0.1.14"

[dev-dependencies]
dotenv = "0.15"
tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] }
tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion rust/src/auth/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
pub static DEFAULT_TOKEN_URL: &str =
"https://account.gdata.de/realms/vaas-production/protocol/openid-connect/token";

/// This trait has to be implemented by any authentication methods for VaaS.
/// This trait has to be implemented by any authentication methods for VaaS.
#[async_trait]
pub trait Authenticator {
/// Return a valid token that can be used to authenticate against the VaaS service.
Expand Down
2 changes: 1 addition & 1 deletion rust/src/auth/authenticators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! # Authenticators
//!
//!
//! This module contains the different **OAuth2 Grant Types** that can be used to authenticate against the VaaS service.
mod client_credentials;
Expand Down
114 changes: 112 additions & 2 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
use crate::error::{Error, VResult};
use crate::message::{
MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForUrl, VerdictResponse,
MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForStream, VerdictRequestForUrl,
VerdictResponse,
};
use crate::options::Options;
use crate::sha256::Sha256;
use crate::vaas_verdict::VaasVerdict;
use crate::CancellationToken;
use bytes::Bytes;
use futures::future::join_all;
use reqwest::Url;
use reqwest::{Body, Url};
use std::convert::TryFrom;
use std::ops::Deref;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -113,6 +115,48 @@ impl Connection {
VaasVerdict::try_from(response)
}

/// Request a verdict for a SHA256 file hash.
pub async fn for_stream<S>(
&self,
stream: S,
content_length: usize,
ct: &CancellationToken,
) -> VResult<VaasVerdict>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let request = VerdictRequestForStream::new(self.session_id.clone());
let guid = request.guid().to_string();

let response = Self::for_stream_request(
request,
self.ws_writer.clone(),
&mut self.result_channel.subscribe(),
ct,
)
.await?;

let verdict = Verdict::try_from(&response)?;

match verdict {
Verdict::Unknown { upload_url } => {
Self::handle_unknown_stream(
stream,
content_length,
&guid,
response,
upload_url,
&mut self.result_channel.subscribe(),
ct,
)
.await
}
_ => Err(Error::Cancelled),
}
}

/// Request verdicts for a list of SHA256 file hashes.
/// The order of the output is the same order as the provided input.
pub async fn for_sha256_list(
Expand Down Expand Up @@ -183,6 +227,37 @@ impl Connection {
VaasVerdict::try_from(resp)
}

async fn handle_unknown_stream<S>(
stream: S,
content_length: usize,
guid: &str,
response: VerdictResponse,
upload_url: UploadUrl,
result_channel: &mut ResultChannelRx,
ct: &CancellationToken,
) -> Result<VaasVerdict, Error>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let auth_token = response
.upload_token
.as_ref()
.ok_or(Error::MissingAuthToken)?;
let response = upload_stream(stream, content_length, upload_url, auth_token).await?;

if response.status() != 200 {
return Err(Error::FailedUploadFile(
response.status(),
response.text().await.expect("failed to get payload"),
));
}

let resp = Self::wait_for_response(guid, result_channel, ct).await?;
VaasVerdict::try_from(resp)
}

/// Request a verdict for a list of files.
/// The order of the output is the same order as the provided input.
pub async fn for_file_list(
Expand Down Expand Up @@ -216,6 +291,17 @@ impl Connection {
Self::wait_for_response(&guid, result_channel, ct).await
}

async fn for_stream_request(
request: VerdictRequestForStream,
ws_writer: WebSocketWriter,
result_channel: &mut ResultChannelRx,
ct: &CancellationToken,
) -> VResult<VerdictResponse> {
let guid = request.guid().to_string();
ws_writer.lock().await.send_text(request.to_json()?).await?;
Self::wait_for_response(&guid, result_channel, ct).await
}

async fn wait_for_response(
guid: &str,
result_channel: &mut ResultChannelRx,
Expand Down Expand Up @@ -307,6 +393,30 @@ async fn upload_file(
Ok(response)
}

async fn upload_stream<S>(
stream: S,
content_length: usize,
upload_url: UploadUrl,
auth_token: &str,
) -> VResult<reqwest::Response>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let client = reqwest::Client::new();
let body = Body::wrap_stream(stream);
let response = client
.put(upload_url.deref())
.body(body)
.header("Authorization", auth_token)
.header("Content-Length", content_length)
.send()
.await?;

Ok(response)
}

impl Drop for Connection {
fn drop(&mut self) {
// Abort the spawned threads in the case that the connection
Expand Down
1 change: 0 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ pub mod vaas_verdict;
pub use crate::vaas::Vaas;
pub use builder::Builder;
pub use cancellation::CancellationToken;
pub use cancellation::*;
pub use connection::Connection;
pub use sha256::Sha256;
pub use vaas_verdict::VaasVerdict;
1 change: 1 addition & 0 deletions rust/src/message/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub enum Kind {
VerdictResponse,
Error,
VerdictRequestForUrl,
VerdictRequestForStream,
}
2 changes: 2 additions & 0 deletions rust/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod open_id_connect_token_response;
mod upload_url;
mod verdict;
mod verdict_request;
mod verdict_request_for_stream;
mod verdict_request_for_url;
mod verdict_response;

Expand All @@ -20,5 +21,6 @@ pub(super) use open_id_connect_token_response::OpenIdConnectTokenResponse;
pub(super) use upload_url::UploadUrl;
pub use verdict::Verdict;
pub(super) use verdict_request::VerdictRequest;
pub(super) use verdict_request_for_stream::VerdictRequestForStream;
pub(super) use verdict_request_for_url::VerdictRequestForUrl;
pub(super) use verdict_response::VerdictResponse;
27 changes: 27 additions & 0 deletions rust/src/message/verdict_request_for_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::error::VResult;
use crate::message::kind::Kind;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VerdictRequestForStream {
pub kind: Kind,
pub guid: String,
pub session_id: String,
}

impl VerdictRequestForStream {
pub fn new(session_id: String) -> Self {
Self {
guid: uuid::Uuid::new_v4().to_string(),
kind: Kind::VerdictRequestForStream,
session_id,
}
}

pub fn to_json(&self) -> VResult<String> {
serde_json::to_string(self).map_err(|e| e.into())
}
pub fn guid(&self) -> &str {
&self.guid
}
}
28 changes: 28 additions & 0 deletions rust/tests/real_api_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ async fn from_sha256_single_malicious_hash() {
);
}

#[tokio::test]
async fn from_http_response_stream_returns_malicious_verdict() {
let result = reqwest::get("http://eicar.eu/eicar.com.txt").await;
let vaas = get_vaas().await;

let ct = CancellationToken::from_seconds(10);
let response = result.unwrap();
let content_length: usize = response.content_length().unwrap() as usize;
let byte_stream = response.bytes_stream();
let verdict = vaas.for_stream(byte_stream, content_length, &ct).await;

assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict);
}

#[tokio::test]
async fn from_string_stream_returns_malicious_verdict() {
let eicar_string = "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*";

let stream: Vec<Result<bytes::Bytes, std::io::Error>> = vec![Ok(bytes::Bytes::from(eicar_string))];
let stream = futures_util::stream::iter(stream);

let vaas = get_vaas().await;
let ct = CancellationToken::from_seconds(10);
let verdict = vaas.for_stream(stream, eicar_string.len(), &ct).await;

assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict);
}

// #[tokio::test]
// async fn from_sha256_single_pup_hash() {
// let vaas = get_vaas().await;
Expand Down

0 comments on commit 0d71de6

Please sign in to comment.