Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rust/for stream #375

Merged
merged 2 commits into from
Mar 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions rust/.devcontainer/Dockerfile

This file was deleted.

60 changes: 27 additions & 33 deletions rust/.devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
// For format details, see https://aka.ms/devcontainer.json. For config options, see the README at:
// https://github.com/microsoft/vscode-dev-containers/tree/v0.187.0/containers/ubuntu
// For format details, see https://aka.ms/devcontainer.json. For config options, see the
// README at: https://github.com/devcontainers/templates/tree/main/src/rust
{
"name": "Ubuntu",
"build": {
"dockerfile": "Dockerfile",
// Update 'VARIANT' to pick an Ubuntu version: focal, bionic
"args": {
"VARIANT": "focal"
}
},
"mounts": [
"source=${localEnv:HOME}/.docker/config.json,target=/root/.docker/config.json,type=bind,consistency=cached",
"source=/var/run/docker.sock,target=/var/run/docker.sock,type=bind,consistency=consistent"
],
"runArgs": [
"--network=host"
],
// Set *default* container specific settings.json values on container create.
"settings": {},
// Add the IDs of extensions you want installed when the container is created.
"extensions": [
"nico-castell.linux-desktop-file",
"bungcip.better-toml",
"jeff-hykin.better-dockerfile-syntax",
"SixtyFPS.sixtyfps-vscode",
"Gruntfuggly.todo-tree",
"rust-lang.rust-analyzer",
"Swellaby.vscode-rust-test-adapter"
],
"name": "Rust",
// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/rust:1-1-bullseye"

// Use 'mounts' to make the cargo cache persistent in a Docker Volume.
// "mounts": [
// {
// "source": "devcontainer-cargo-cache-${devcontainerId}",
// "target": "/usr/local/cargo",
// "type": "volume"
// }
// ]

// Features to add to the dev container. More info: https://containers.dev/features.
// "features": {},

// Use 'forwardPorts' to make a list of ports inside the container available locally.
// "forwardPorts": [],

// Use 'postCreateCommand' to run commands after the container is created.
// "postCreateCommand": "uname -a",
// Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root.
"remoteUser": "vscode"
}
// "postCreateCommand": "rustc --version",

// Configure tool-specific properties.
// "customizations": {},

// Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root.
// "remoteUser": "root"
}
12 changes: 12 additions & 0 deletions rust/.github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for more information:
# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates
# https://containers.dev/guide/dependabot

version: 2
updates:
- package-ecosystem: "devcontainers"
directory: "/"
schedule:
interval: weekly
10 changes: 7 additions & 3 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ homepage = "https://github.com/GDATASoftwareAG/vaas"

[dependencies]
websockets = "0.3.0"
serde = { version = "1.0", features = ["derive"]}
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
uuid = { version = "1.7", features = ["serde", "v4"] }
reqwest = "0.11"
reqwest = { version = "0.11", features = ["stream"] }
regex = "1.10"
tokio = { version = "1.36", features = ["sync", "fs"] }
sha2 = "0.10"
futures = "0.3"
rand = "0.8"
async-trait = "0.1"
bytes = "1.5.0"
tokio-util = "0.7"
futures-util = "0.3.30"
tokio-stream = "0.1.14"

[dev-dependencies]
dotenv = "0.15"
tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] }
tokio = { version = "1.36", features = ["rt", "macros", "rt-multi-thread"] }
2 changes: 1 addition & 1 deletion rust/src/auth/authenticator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
pub static DEFAULT_TOKEN_URL: &str =
"https://account.gdata.de/realms/vaas-production/protocol/openid-connect/token";

/// This trait has to be implemented by any authentication methods for VaaS.
/// This trait has to be implemented by any authentication methods for VaaS.
#[async_trait]
pub trait Authenticator {
/// Return a valid token that can be used to authenticate against the VaaS service.
Expand Down
2 changes: 1 addition & 1 deletion rust/src/auth/authenticators/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! # Authenticators
//!
//!
//! This module contains the different **OAuth2 Grant Types** that can be used to authenticate against the VaaS service.

mod client_credentials;
Expand Down
114 changes: 112 additions & 2 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

use crate::error::{Error, VResult};
use crate::message::{
MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForUrl, VerdictResponse,
MessageType, UploadUrl, Verdict, VerdictRequest, VerdictRequestForStream, VerdictRequestForUrl,
VerdictResponse,
};
use crate::options::Options;
use crate::sha256::Sha256;
use crate::vaas_verdict::VaasVerdict;
use crate::CancellationToken;
use bytes::Bytes;
use futures::future::join_all;
use reqwest::Url;
use reqwest::{Body, Url};
use std::convert::TryFrom;
use std::ops::Deref;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -113,6 +115,48 @@ impl Connection {
VaasVerdict::try_from(response)
}

/// Request a verdict for a SHA256 file hash.
pub async fn for_stream<S>(
&self,
stream: S,
content_length: usize,
ct: &CancellationToken,
) -> VResult<VaasVerdict>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let request = VerdictRequestForStream::new(self.session_id.clone());
let guid = request.guid().to_string();

let response = Self::for_stream_request(
request,
self.ws_writer.clone(),
&mut self.result_channel.subscribe(),
ct,
)
.await?;

let verdict = Verdict::try_from(&response)?;

match verdict {
Verdict::Unknown { upload_url } => {
Self::handle_unknown_stream(
stream,
content_length,
&guid,
response,
upload_url,
&mut self.result_channel.subscribe(),
ct,
)
.await
}
_ => Err(Error::Cancelled),
}
}

/// Request verdicts for a list of SHA256 file hashes.
/// The order of the output is the same order as the provided input.
pub async fn for_sha256_list(
Expand Down Expand Up @@ -183,6 +227,37 @@ impl Connection {
VaasVerdict::try_from(resp)
}

async fn handle_unknown_stream<S>(
stream: S,
content_length: usize,
guid: &str,
response: VerdictResponse,
upload_url: UploadUrl,
result_channel: &mut ResultChannelRx,
ct: &CancellationToken,
) -> Result<VaasVerdict, Error>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let auth_token = response
.upload_token
.as_ref()
.ok_or(Error::MissingAuthToken)?;
let response = upload_stream(stream, content_length, upload_url, auth_token).await?;

if response.status() != 200 {
return Err(Error::FailedUploadFile(
response.status(),
response.text().await.expect("failed to get payload"),
));
}

let resp = Self::wait_for_response(guid, result_channel, ct).await?;
VaasVerdict::try_from(resp)
}

/// Request a verdict for a list of files.
/// The order of the output is the same order as the provided input.
pub async fn for_file_list(
Expand Down Expand Up @@ -216,6 +291,17 @@ impl Connection {
Self::wait_for_response(&guid, result_channel, ct).await
}

async fn for_stream_request(
request: VerdictRequestForStream,
ws_writer: WebSocketWriter,
result_channel: &mut ResultChannelRx,
ct: &CancellationToken,
) -> VResult<VerdictResponse> {
let guid = request.guid().to_string();
ws_writer.lock().await.send_text(request.to_json()?).await?;
Self::wait_for_response(&guid, result_channel, ct).await
}

async fn wait_for_response(
guid: &str,
result_channel: &mut ResultChannelRx,
Expand Down Expand Up @@ -307,6 +393,30 @@ async fn upload_file(
Ok(response)
}

async fn upload_stream<S>(
stream: S,
content_length: usize,
upload_url: UploadUrl,
auth_token: &str,
) -> VResult<reqwest::Response>
where
S: futures_util::stream::TryStream + Send + Sync + 'static,
S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
Bytes: From<S::Ok>,
{
let client = reqwest::Client::new();
let body = Body::wrap_stream(stream);
let response = client
.put(upload_url.deref())
.body(body)
.header("Authorization", auth_token)
.header("Content-Length", content_length)
.send()
.await?;

Ok(response)
}

impl Drop for Connection {
fn drop(&mut self) {
// Abort the spawned threads in the case that the connection
Expand Down
1 change: 0 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ pub mod vaas_verdict;
pub use crate::vaas::Vaas;
pub use builder::Builder;
pub use cancellation::CancellationToken;
pub use cancellation::*;
pub use connection::Connection;
pub use sha256::Sha256;
pub use vaas_verdict::VaasVerdict;
1 change: 1 addition & 0 deletions rust/src/message/kind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub enum Kind {
VerdictResponse,
Error,
VerdictRequestForUrl,
VerdictRequestForStream,
}
2 changes: 2 additions & 0 deletions rust/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod open_id_connect_token_response;
mod upload_url;
mod verdict;
mod verdict_request;
mod verdict_request_for_stream;
mod verdict_request_for_url;
mod verdict_response;

Expand All @@ -20,5 +21,6 @@ pub(super) use open_id_connect_token_response::OpenIdConnectTokenResponse;
pub(super) use upload_url::UploadUrl;
pub use verdict::Verdict;
pub(super) use verdict_request::VerdictRequest;
pub(super) use verdict_request_for_stream::VerdictRequestForStream;
pub(super) use verdict_request_for_url::VerdictRequestForUrl;
pub(super) use verdict_response::VerdictResponse;
27 changes: 27 additions & 0 deletions rust/src/message/verdict_request_for_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::error::VResult;
use crate::message::kind::Kind;
use serde::{Deserialize, Serialize};

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct VerdictRequestForStream {
pub kind: Kind,
pub guid: String,
pub session_id: String,
}

impl VerdictRequestForStream {
pub fn new(session_id: String) -> Self {
Self {
guid: uuid::Uuid::new_v4().to_string(),
kind: Kind::VerdictRequestForStream,
session_id,
}
}

pub fn to_json(&self) -> VResult<String> {
serde_json::to_string(self).map_err(|e| e.into())
}
pub fn guid(&self) -> &str {
&self.guid
}
}
28 changes: 28 additions & 0 deletions rust/tests/real_api_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,34 @@ async fn from_sha256_single_malicious_hash() {
);
}

#[tokio::test]
async fn from_http_response_stream_returns_malicious_verdict() {
let result = reqwest::get("http://eicar.eu/eicar.com.txt").await;
let vaas = get_vaas().await;

let ct = CancellationToken::from_seconds(10);
let response = result.unwrap();
let content_length: usize = response.content_length().unwrap() as usize;
let byte_stream = response.bytes_stream();
let verdict = vaas.for_stream(byte_stream, content_length, &ct).await;

assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict);
}

#[tokio::test]
async fn from_string_stream_returns_malicious_verdict() {
let eicar_string = "X5O!P%@AP[4\\PZX54(P^)7CC)7}$EICAR-STANDARD-ANTIVIRUS-TEST-FILE!$H+H*";

let stream: Vec<Result<bytes::Bytes, std::io::Error>> = vec![Ok(bytes::Bytes::from(eicar_string))];
let stream = futures_util::stream::iter(stream);

let vaas = get_vaas().await;
let ct = CancellationToken::from_seconds(10);
let verdict = vaas.for_stream(stream, eicar_string.len(), &ct).await;

assert_eq!(Verdict::Malicious, verdict.as_ref().unwrap().verdict);
}

// #[tokio::test]
// async fn from_sha256_single_pup_hash() {
// let vaas = get_vaas().await;
Expand Down
Loading