Skip to content

Commit

Permalink
Receive the feed by chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
jjnicola committed Oct 11, 2024
1 parent 5a18152 commit ff7a256
Showing 1 changed file with 198 additions and 45 deletions.
243 changes: 198 additions & 45 deletions src/manage_sql_nvts.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
/**
* @brief Enable extra GNU functions.
*/
#define _GNU_SOURCE /* See feature_test_macros(7) */
#define _FILE_OFFSET_BITS 64
#include <stdio.h>

#include <gvm/base/nvti.h>
#include "glibconfig.h"
Expand Down Expand Up @@ -2147,18 +2150,111 @@ update_nvts_from_vts (element_t *get_vts_response,
return 0;
}


/**
* @brief Struct containing the stream buffer.
*/
struct FILESTREAM {
char *stream_buffer;
size_t size_of_buffer;
size_t last_read;
size_t last_write;
};

/**
* @brief Hook function to read the stream file cookie
*/
static ssize_t
readcookie (void *stream_cookie, char *buf, size_t size)
{
struct FILESTREAM *stream = stream_cookie;
size_t to_read = stream->last_write - stream->last_read;
if (to_read < 0)
to_read = 0;

if (to_read > size)
to_read = size;
memcpy (buf, &stream->stream_buffer[stream->last_read], to_read);

stream->last_read += to_read;
return to_read;
}

/**
* @brief Hook function to close the stream file cookie
*/
static int
closecookie(void *filestream)
{
struct FILESTREAM *stream = filestream;
g_free(stream->stream_buffer);
stream->size_of_buffer = 0;
stream->stream_buffer = NULL;
return 0;
}

/**
* @brief Hook function to write the stream file cookie
*/
static ssize_t
writecookie (void *stream_cookie, const char *buf, size_t size)
{
struct FILESTREAM *stream = stream_cookie;
size_t next_size = stream->last_write + size;
if (next_size > stream->size_of_buffer)
{
stream->size_of_buffer = next_size + GVM_JSON_PULL_PARSE_BUFFER_LIMIT;
stream->stream_buffer = g_realloc (stream->stream_buffer,
stream->size_of_buffer);
if (stream->stream_buffer == NULL)
{
g_message ("%s: Buffer overflow", __func__);
return 0;
}
}

memcpy (&(stream->stream_buffer[stream->last_write]), buf, size);
stream->last_write+=size;

return size;
}

/**
* @brief Move non read data to beggining of the buffer
*/
static int move_buffer_data(struct FILESTREAM *filestream){
char *auxbuf;
size_t non_read_chars_count = filestream->last_write - filestream->last_read;

auxbuf = g_malloc0 (sizeof(char) * filestream->size_of_buffer);
if (auxbuf == NULL)
return -1;

memcpy (auxbuf, &filestream->stream_buffer[filestream->last_read],
non_read_chars_count);
memset (filestream->stream_buffer, '\0', filestream->size_of_buffer);
memcpy (filestream->stream_buffer, auxbuf, non_read_chars_count);

filestream->last_read = 0;
filestream->last_write = non_read_chars_count;

g_free(auxbuf);

return 0;
}

/**
* @brief Update NVTs from Json response
* @brief Update NVTs from Json response chunk by chunk
*
* @param[in] get_vts_response Openvasd VTS response.
* @param[in] curl_hnd Curl handler to perform the request
* @param[in] res Struct containing the response chunks
* @param[in] scanner_feed_version Version of feed from scanner.
* @param[in] rebuild Whether we're rebuilding the tables.
*
* @return 0 success, 1 VT integrity check failed, -1 error
*/
static int
update_nvts_from_json_vts (gvm_json_pull_parser_t *parser,
gvm_json_pull_event_t *event,
update_nvts_from_json_vts (openvasd_connector_t *connector,
const gchar *scanner_feed_version,
int rebuild)
{
Expand Down Expand Up @@ -2206,36 +2302,112 @@ update_nvts_from_json_vts (gvm_json_pull_parser_t *parser,
vt_refs_batch = batch_start (vt_ref_insert_size);
vt_sevs_batch = batch_start (vt_sev_insert_size);

nvti_t *nvti = openvasd_parse_vt (parser,event);
while (nvti)
int running = 0;
openvasd_resp_t resp;
gvm_json_pull_event_t event;
gvm_json_pull_parser_t parser;
FILE *stream = NULL;
struct FILESTREAM *filestream;
nvti_t *nvti = NULL;
curlm_t curl_hnd = NULL;
stringstream res;

init_stringstream (&res);
resp = openvasd_get_vts_stream_init(connector, &curl_hnd, &res);
if (resp->code < 0)
{
if (nvti == NULL)
continue;
g_warning ("%s: failed to get VTs", __func__);
return -1;
}

if (nvti_creation_time (nvti) > feed_version_epoch)
count_new_vts += 1;
else
count_modified_vts += 1;
cookie_io_functions_t cookiehooks = {
.read = readcookie,
.write = writecookie,
.seek = NULL,
.close = closecookie,
};

insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch);
filestream = g_malloc0 (sizeof(struct FILESTREAM));
filestream->size_of_buffer = GVM_JSON_PULL_PARSE_BUFFER_LIMIT;
filestream->stream_buffer =
g_malloc0 (sizeof(char) * filestream->size_of_buffer);

preferences = NULL;
if (update_preferences_from_json_nvt (nvti, &preferences))
stream = fopencookie (filestream, "a+", cookiehooks);

gvm_json_pull_parser_init_full (&parser, stream,
GVM_JSON_PULL_PARSE_BUFFER_LIMIT,
GVM_JSON_PULL_READ_BUFFER_SIZE * 8);
gvm_json_pull_event_init (&event);

// First run for initial data in the stream
running = openvasd_get_vts_stream (curl_hnd);
fwrite (res.ptr, 1, res.len, stream);
g_free (res.ptr);
init_stringstream (&res);
int break_flag = 0;
while (running)
{
size_t non_read_count = 0;
// Ensure a big chunk of data.
// Realloc is expensive therefore we realloc with bigger chuncks
while (running > 0 && res.len < GVM_JSON_PULL_READ_BUFFER_SIZE * 8)
running = openvasd_get_vts_stream (curl_hnd);

if (res.len > 0)
{
sql_rollback ();
return -1;
move_buffer_data (filestream);
fwrite (res.ptr, 1, res.len, stream);
g_free (res.ptr);
init_stringstream (&res);
}
if (rebuild == 0)
sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';",
rebuild ? "_rebuild" : "",
nvti_oid (nvti));
insert_nvt_preferences_list (preferences, rebuild);
g_list_free_full (preferences, (GDestroyNotify) preference_free);

nvti_free (nvti);
nvti = openvasd_parse_vt (parser, event);
non_read_count = filestream->last_write - filestream->last_read;
// While streaming, parse some VTs and continue for a new chunk.
// If the stream is not running anymore, parse the remaining VTs.
while ((running && non_read_count > GVM_JSON_PULL_READ_BUFFER_SIZE * 8) || !running)
{
nvti = openvasd_parse_vt (&parser, &event);
if (nvti == NULL)
{
break_flag = 1;
break;
}
if (nvti_creation_time (nvti) > feed_version_epoch)
count_new_vts += 1;
else
count_modified_vts += 1;

insert_nvt (nvti, rebuild, vt_refs_batch, vt_sevs_batch);

preferences = NULL;
if (update_preferences_from_json_nvt (nvti, &preferences))
{
sql_rollback ();
return -1;
}
if (rebuild == 0)
sql ("DELETE FROM nvt_preferences%s WHERE name LIKE '%s:%%';",
rebuild ? "_rebuild" : "",
nvti_oid (nvti));
insert_nvt_preferences_list (preferences, rebuild);
g_list_free_full (preferences, (GDestroyNotify) preference_free);

g_free(nvti);
non_read_count = filestream->last_write - filestream->last_read;
}
if (break_flag)
break;
}

gvm_json_pull_event_cleanup(&event);
gvm_json_pull_parser_cleanup(&parser);
fclose(stream);

g_free (res.ptr);
openvasd_curl_handler_close (&curl_hnd);
openvasd_response_free (resp);


batch_end (vt_refs_batch);
batch_end (vt_sevs_batch);

Expand Down Expand Up @@ -2662,27 +2834,8 @@ update_nvt_cache_openvasd (gchar* openvasd_uuid, gchar *db_feed_version,
return -1;
}

resp = openvasd_get_vts (&connector);
if (resp->code != 200)
{
g_warning ("%s: failed to get VTs", __func__);
return -1;
}

FILE *stream;
gvm_json_pull_event_t event;
gvm_json_pull_parser_t parser;

stream = fmemopen (resp->body, strlen (resp->body), "r");
gvm_json_pull_parser_init (&parser, stream);
gvm_json_pull_event_init (&event);
ret = update_nvts_from_json_vts (&parser, &event, scanner_feed_version,
ret = update_nvts_from_json_vts (&connector, scanner_feed_version,
rebuild);
fclose(stream);
gvm_json_pull_parser_cleanup (&parser);

openvasd_response_free (resp);

if (ret)
{
openvasd_connector_free (&connector);
Expand Down

0 comments on commit ff7a256

Please sign in to comment.