Skip to content

Commit

Permalink
Change: remove FetchResult in favor of ScanResults
Browse files Browse the repository at this point in the history
Instead of using a tuple of (Status, [Result]) a struct ScanResults is
introduced. This will help us to create bulk retrieving operations on
implementations that makes sense.

Although the benefits are currently minimal when we introduce a
scheduler for scans it will make it easier to handle results in that
fashion.
  • Loading branch information
nichtsfrei committed Feb 28, 2024
1 parent 9e751be commit b5be432
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 68 deletions.
2 changes: 1 addition & 1 deletion rust/openvasd/src/controller/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ impl ScanDeleter for NoOpScanner {

#[async_trait]
impl ScanResultFetcher for NoOpScanner {
async fn fetch_results<I>(&self, _: I) -> Result<crate::scan::FetchResult, Error>
async fn fetch_results<I>(&self, _: I) -> Result<crate::scan::ScanResults, Error>
where
I: AsRef<str> + Send,
{
Expand Down
24 changes: 18 additions & 6 deletions rust/openvasd/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ mod tests {
use super::context::Context;
use crate::{
controller::{ClientIdentifier, ContextBuilder, NoOpScanner},
scan::{ScanDeleter, ScanStarter, ScanStopper},
scan::{ScanDeleter, ScanResults, ScanStarter, ScanStopper},
storage::file,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -203,8 +203,8 @@ mod tests {
impl crate::scan::ScanResultFetcher for FakeScanner {
async fn fetch_results<I>(
&self,
_id: I,
) -> Result<crate::scan::FetchResult, crate::scan::Error>
id: I,
) -> Result<crate::scan::ScanResults, crate::scan::Error>
where
I: AsRef<str> + Send,
{
Expand All @@ -216,7 +216,11 @@ mod tests {
..Default::default()
};
*count += 1;
Ok((status, vec![]))
Ok(ScanResults {
id: id.as_ref().to_string(),
status,
results: vec![],
})
}
1..=99 => {
let status = models::Status {
Expand All @@ -232,15 +236,23 @@ mod tests {
});
}
*count += 1;
Ok((status, results))
Ok(ScanResults {
id: id.as_ref().to_string(),
status,
results,
})
}
_ => {
*count += 1;
let status = models::Status {
status: models::Phase::Succeeded,
..Default::default()
};
Ok((status, vec![]))
Ok(ScanResults {
id: id.as_ref().to_string(),
status,
results: vec![],
})
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion rust/openvasd/src/controller/results.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ where
tracing::trace!("aborting");
break;
}
// TODO change this use a scan scheduler later on
// so that we don't have to iterate through all scans but just the ones that are
// actually running
let scans = ctx.db.get_scan_ids().await;
if let Err(e) = scans {
tracing::warn!("Failed to get scans: {e}");
Expand All @@ -43,6 +46,9 @@ where
tracing::trace!("{id} skipping status = {}", status.status);
}
Ok(_) => {
// TODO change fetch results to deliver all results of a given
// subset of ids so that it can decide itself if it gets results in
// bulk or per element.
let results = ctx.scanner.fetch_results(id.clone()).await;
match results {
Ok(fr) => {
Expand All @@ -51,7 +57,7 @@ where
// store them in the database.
// When this happens we effectively lost the results
// and need to escalate this.
ctx.db.append_fetched_result(id, fr).await.unwrap();
ctx.db.append_fetched_result(vec![fr]).await.unwrap();
}
Err(crate::scan::Error::Poisoned) => {
quit_on_poison::<()>();
Expand Down
23 changes: 18 additions & 5 deletions rust/openvasd/src/scan.rs → rust/openvasd/src/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,17 @@ use std::{path::PathBuf, sync::PoisonError, time::Duration};

use async_trait::async_trait;

/// The result of a fetch operation
pub type FetchResult = (models::Status, Vec<models::Result>);
/// Contains results of a scan as well as identification factors and statuses.
///
/// It is usually returned on fetch_results which gets all results of all running scans for further
/// processing.
#[derive(Debug, Default, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
pub struct ScanResults {
/// Identification of
pub id: String,
pub status: models::Status,
pub results: Vec<models::Result>,
}

impl From<osp::Error> for Error {
fn from(value: osp::Error) -> Self {
Expand Down Expand Up @@ -107,7 +116,7 @@ pub trait ScanDeleter {
#[async_trait]
pub trait ScanResultFetcher {
/// Fetches the results of a scan and combines the results with response
async fn fetch_results<I>(&self, id: I) -> Result<FetchResult, Error>
async fn fetch_results<I>(&self, id: I) -> Result<ScanResults, Error>
where
I: AsRef<str> + Send + 'static;
}
Expand Down Expand Up @@ -159,14 +168,18 @@ impl ScanDeleter for OSPDWrapper {

#[async_trait]
impl ScanResultFetcher for OSPDWrapper {
async fn fetch_results<I>(&self, id: I) -> Result<FetchResult, Error>
async fn fetch_results<I>(&self, id: I) -> Result<ScanResults, Error>
where
I: AsRef<str> + Send + 'static,
{
let rtimeout = self.r_timeout;
self.spawn_blocking(move |socket| {
osp::get_delete_scan_results(socket, rtimeout, id)
.map(|r| (r.clone().into(), r.into()))
.map(|r| ScanResults {
id: r.clone().id,
status: r.clone().into(),
results: r.into(),
})
.map_err(Error::from)
})
.await
Expand Down
Empty file.
74 changes: 40 additions & 34 deletions rust/openvasd/src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,32 +178,34 @@ impl<S> AppendFetchResult for Storage<S>
where
S: infisto::base::IndexedByteStorage + std::marker::Sync + std::marker::Send + Clone + 'static,
{
async fn append_fetched_result(
&self,
id: &str,
(status, results): FetchResult,
) -> Result<(), Error> {
let key = format!("results_{}", id);
self.update_status(id, status).await?;

let storage = Arc::clone(&self.storage);
tokio::task::spawn_blocking(move || {
let storage = &mut storage.write().unwrap();
let mut serialized_results = Vec::with_capacity(results.len());
let ilen = match storage.indices(&key) {
Ok(x) => x.len(),
Err(_) => 0,
};
for (i, mut result) in results.into_iter().enumerate() {
result.id = ilen + i;
let bytes = serde_json::to_vec(&result)?;
serialized_results.push(bytes);
}
storage.append_all(&key, &serialized_results)?;
Ok(())
})
.await
.unwrap()
async fn append_fetched_result(&self, results: Vec<ScanResults>) -> Result<(), Error> {
for r in results {
let id = &r.id;
let status = r.status;
let key = format!("results_{}", id);
self.update_status(id, status).await?;

let storage = Arc::clone(&self.storage);
tokio::task::spawn_blocking(move || {
let storage = &mut storage.write().unwrap();
let results = r.results;
let mut serialized_results = Vec::with_capacity(results.len());
let ilen = match storage.indices(&key) {
Ok(x) => x.len(),
Err(_) => 0,
};
for (i, mut result) in results.into_iter().enumerate() {
result.id = ilen + i;
let bytes = serde_json::to_vec(&result)?;
serialized_results.push(bytes);
}
storage.append_all(&key, &serialized_results)?;
Ok::<_, Error>(())
})
.await
.unwrap()?
}
Ok(())
}
}
#[async_trait]
Expand Down Expand Up @@ -630,21 +632,25 @@ mod tests {
assert_eq!(scans.len(), ids.len());
let status = models::Status::default();
let results = vec![models::Result::default()];
storage
.append_fetched_result("42", (status, results))
.await
.unwrap();
let results = vec![ScanResults {
id: "42".to_string(),
status,
results,
}];
storage.append_fetched_result(results).await.unwrap();

let status = models::Status {
status: models::Phase::Running,
..Default::default()
};

let results = vec![models::Result::default()];
storage
.append_fetched_result("42", (status.clone(), results))
.await
.unwrap();
let results = vec![ScanResults {
id: "42".to_string(),
status: status.clone(),
results,
}];
storage.append_fetched_result(results).await.unwrap();
let stored_status = storage.get_status("42").await.unwrap();
assert_eq!(status, stored_status);
let range: Vec<String> = storage
Expand Down
35 changes: 20 additions & 15 deletions rust/openvasd/src/storage/inmemory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,22 @@ impl<E> AppendFetchResult for Storage<E>
where
E: crate::crypt::Crypt + Send + Sync + 'static,
{
async fn append_fetched_result(
&self,
id: &str,
(status, results): FetchResult,
) -> Result<(), Error> {
async fn append_fetched_result(&self, results: Vec<ScanResults>) -> Result<(), Error> {
let mut scans = self.scans.write().await;
let progress = scans.get_mut(id).ok_or(Error::NotFound)?;
progress.status = status;
let mut len = progress.results.len();
for mut result in results {
result.id = len;
len += 1;
let bytes = serde_json::to_vec(&result)?;
progress.results.push(self.crypter.encrypt(bytes).await);
for r in results {
let id = &r.id;
let progress = scans.get_mut(id).ok_or(Error::NotFound)?;
progress.status = r.status;
let mut len = progress.results.len();
let results = r.results;
for mut result in results {
result.id = len;
len += 1;
let bytes = serde_json::to_vec(&result)?;
progress.results.push(self.crypter.encrypt(bytes).await);
}
}

Ok(())
}
}
Expand Down Expand Up @@ -496,9 +497,13 @@ mod tests {
let scan = Scan::default();
let id = scan.scan_id.clone();
storage.insert_scan(scan).await.unwrap();
let fetch_result = (models::Status::default(), vec![models::Result::default()]);
let fetch_result = ScanResults {
id: id.clone(),
status: models::Status::default(),
results: vec![models::Result::default()],
};
storage
.append_fetched_result(&id, fetch_result)
.append_fetched_result(vec![fetch_result])
.await
.unwrap();
let results: Vec<_> = storage
Expand Down
4 changes: 2 additions & 2 deletions rust/openvasd/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;

use crate::{controller::ClientHash, crypt, scan::FetchResult};
use crate::{controller::ClientHash, crypt, scan::ScanResults};

#[derive(Debug)]
pub enum Error {
Expand Down Expand Up @@ -150,7 +150,7 @@ pub trait ScanStorer {
///
/// This is used when a scan is started and the results are fetched from ospd.
pub trait AppendFetchResult {
async fn append_fetched_result(&self, id: &str, results: FetchResult) -> Result<(), Error>;
async fn append_fetched_result(&self, results: Vec<ScanResults>) -> Result<(), Error>;
}

#[async_trait]
Expand Down
6 changes: 3 additions & 3 deletions rust/openvasd/src/storage/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use redis_storage::{
use storage::{item::PerItemDispatcher, Dispatcher, Field};
use tokio::sync::RwLock;

use crate::{controller::ClientHash, scan::FetchResult};
use crate::{controller::ClientHash, scan::ScanResults};

use super::{AppendFetchResult, Error, NVTStorer, ProgressGetter, ScanIDClientMapper, ScanStorer};

Expand Down Expand Up @@ -282,7 +282,7 @@ impl<T> AppendFetchResult for Storage<T>
where
T: super::Storage + std::marker::Sync,
{
async fn append_fetched_result(&self, id: &str, results: FetchResult) -> Result<(), Error> {
self.underlying.append_fetched_result(id, results).await
async fn append_fetched_result(&self, results: Vec<ScanResults>) -> Result<(), Error> {
self.underlying.append_fetched_result(results).await
}
}
2 changes: 1 addition & 1 deletion rust/osp/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ impl Default for Scan {
}
}
}

// TODO when traits moved to models create From for ScanResults
impl From<Scan> for models::Status {
fn from(value: Scan) -> Self {
let phase: models::Phase = match value.status {
Expand Down

0 comments on commit b5be432

Please sign in to comment.