Skip to content

Commit

Permalink
modules/content-sqlite: [cleanup] refactor load/store
Browse files Browse the repository at this point in the history
Factor sqlite portion of load/store ooperations out of message handers
to their own functions.  This improves clarity and lets them be
used for other purposes.
  • Loading branch information
garlick committed Feb 28, 2020
1 parent 3a418eb commit 290a17e
Showing 1 changed file with 110 additions and 76 deletions.
186 changes: 110 additions & 76 deletions src/modules/content-sqlite/content-sqlite.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "src/common/libutil/blobref.h"
#include "src/common/libutil/cleanup.h"
#include "src/common/libutil/log.h"
#include "src/common/libutil/errno_safe.h"

const size_t lzo_buf_chunksize = 1024*1024;
const size_t compression_threshold = 256; /* compress blobs >= this size */
Expand Down Expand Up @@ -107,7 +108,7 @@ static void set_errno_from_sqlite_error (struct content_sqlite *ctx)
}
}

int grow_lzo_buf (struct content_sqlite *ctx, size_t size)
static int grow_lzo_buf (struct content_sqlite *ctx, size_t size)
{
size_t newsize = ctx->lzo_bufsize;
void *newbuf;
Expand All @@ -122,37 +123,26 @@ int grow_lzo_buf (struct content_sqlite *ctx, size_t size)
return 0;
}

void load_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
/* Load blob from objects table, uncompressing if necessary.
* Returns 0 on success, -1 on error with errno set.
* On successful return, must call sqlite3_reset (ctx->load_stmt),
* which invalidates returned data.
*/
static int content_sqlite_load (struct content_sqlite *ctx,
const char *blobref,
const void **datap,
int *sizep)
{
struct content_sqlite *ctx = arg;
const char *blobref = "-";
int blobref_size;
uint8_t hash[BLOBREF_MAX_DIGEST_SIZE];
int hash_len;
const void *data = NULL;
int size = 0;
int uncompressed_size;
int rc = -1;

if (flux_request_decode_raw (msg,
NULL,
(const void **)&blobref,
&blobref_size) < 0) {
flux_log_error (h, "load: request decode failed");
goto done;
}
if (!blobref || blobref[blobref_size - 1] != '\0') {
errno = EPROTO;
flux_log_error (h, "load: malformed blobref");
goto done;
}
if ((hash_len = blobref_strtohash (blobref, hash, sizeof (hash))) < 0) {
errno = ENOENT;
flux_log_error (h, "load: unexpected foreign blobref");
goto done;
flux_log_error (ctx->h, "load: unexpected foreign blobref");
return -1;
}
if (sqlite3_bind_text (ctx->load_stmt,
1,
Expand All @@ -161,97 +151,89 @@ void load_cb (flux_t *h,
SQLITE_STATIC) != SQLITE_OK) {
log_sqlite_error (ctx, "load: binding key");
set_errno_from_sqlite_error (ctx);
goto done;
goto error;
}
if (sqlite3_step (ctx->load_stmt) != SQLITE_ROW) {
//log_sqlite_error (ctx, "load: executing stmt");
errno = ENOENT;
goto done;
goto error;
}
size = sqlite3_column_bytes (ctx->load_stmt, 0);
if (sqlite3_column_type (ctx->load_stmt, 0) != SQLITE_BLOB && size > 0) {
flux_log (h, LOG_ERR, "load: selected value is not a blob");
flux_log (ctx->h, LOG_ERR, "load: selected value is not a blob");
errno = EINVAL;
goto done;
goto error;
}
data = sqlite3_column_blob (ctx->load_stmt, 0);
if (sqlite3_column_type (ctx->load_stmt, 1) != SQLITE_INTEGER) {
flux_log (h, LOG_ERR, "load: selected value is not an integer");
flux_log (ctx->h, LOG_ERR, "load: selected value is not an integer");
errno = EINVAL;
goto done;
goto error;
}
uncompressed_size = sqlite3_column_int (ctx->load_stmt, 1);
if (uncompressed_size != -1) {
if (ctx->lzo_bufsize < uncompressed_size
&& grow_lzo_buf (ctx, uncompressed_size) < 0)
goto done;
goto error;
int r = LZ4_decompress_safe (data,
ctx->lzo_buf,
size,
uncompressed_size);
if (r < 0) {
errno = EINVAL;
goto done;
goto error;
}
if (r != uncompressed_size) {
flux_log (h, LOG_ERR, "load: blob size mismatch");
flux_log (ctx->h, LOG_ERR, "load: blob size mismatch");
errno = EINVAL;
goto done;
goto error;
}
data = ctx->lzo_buf;
size = uncompressed_size;
}
rc = 0;
done:
if (rc < 0) {
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "load: flux_respond_error");
}
else {
if (flux_respond_raw (h, msg, data, size) < 0)
flux_log_error (h, "load: flux_respond_raw");
}
(void )sqlite3_reset (ctx->load_stmt);
*datap = data;
*sizep = size;
return 0;
error:
ERRNO_SAFE_WRAP (sqlite3_reset, ctx->load_stmt);
return -1;
}

void store_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
/* Store blob to objects table, compressing if necessary.
* Blobref resulting from hash over 'data' is stored to 'blobref'.
* Returns 0 on success, -1 on error with errno set.
*/
static int content_sqlite_store (struct content_sqlite *ctx,
const void *data,
int size,
char *blobref,
int blobrefsz)
{
struct content_sqlite *ctx = arg;
const void *data;
int size, hash_len;
uint8_t hash[BLOBREF_MAX_DIGEST_SIZE];
char blobref[BLOBREF_MAX_STRING_SIZE] = "-";
int hash_len;
int uncompressed_size = -1;
int rc = -1;

if (flux_request_decode_raw (msg, NULL, &data, &size) < 0) {
flux_log_error (h, "store: request decode failed");
goto done;
}
if (size > ctx->blob_size_limit) {
errno = EFBIG;
goto done;
return -1;
}
if (blobref_hash (ctx->hashfun,
(uint8_t *)data,
size,
blobref,
sizeof (blobref)) < 0)
goto done;
blobrefsz) < 0)
return -1;
if ((hash_len = blobref_strtohash (blobref, hash, sizeof (hash))) < 0)
goto done;
return -1;
if (size >= compression_threshold) {
int r;
int out_len = LZ4_compressBound(size);
if (ctx->lzo_bufsize < out_len && grow_lzo_buf (ctx, out_len) < 0)
goto done;
return -1;
r = LZ4_compress_default (data, ctx->lzo_buf, size, out_len);
if (r == 0) {
errno = EINVAL;
goto done;
return -1;
}
uncompressed_size = size;
size = r;
Expand All @@ -264,14 +246,14 @@ void store_cb (flux_t *h,
SQLITE_STATIC) != SQLITE_OK) {
log_sqlite_error (ctx, "store: binding key");
set_errno_from_sqlite_error (ctx);
goto done;
goto error;
}
if (sqlite3_bind_int (ctx->store_stmt,
2,
uncompressed_size) != SQLITE_OK) {
log_sqlite_error (ctx, "store: binding size");
set_errno_from_sqlite_error (ctx);
goto done;
goto error;
}
if (sqlite3_bind_blob (ctx->store_stmt,
3,
Expand All @@ -280,25 +262,77 @@ void store_cb (flux_t *h,
SQLITE_STATIC) != SQLITE_OK) {
log_sqlite_error (ctx, "store: binding data");
set_errno_from_sqlite_error (ctx);
goto done;
goto error;
}
if (sqlite3_step (ctx->store_stmt) != SQLITE_DONE
&& sqlite3_errcode (ctx->db) != SQLITE_CONSTRAINT) {
log_sqlite_error (ctx, "store: executing stmt");
set_errno_from_sqlite_error (ctx);
goto done;
goto error;
}
rc = 0;
done:
if (rc < 0) {
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "store: flux_respond_error");
sqlite3_reset (ctx->store_stmt);
return 0;
error:
ERRNO_SAFE_WRAP (sqlite3_reset, ctx->store_stmt);
return -1;
}

static void load_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct content_sqlite *ctx = arg;
const char *blobref;
int blobref_size;
const void *data;
int size;

if (flux_request_decode_raw (msg,
NULL,
(const void **)&blobref,
&blobref_size) < 0) {
flux_log_error (h, "load: request decode failed");
goto error;
}
else {
if (flux_respond_raw (h, msg, blobref, strlen (blobref) + 1) < 0)
flux_log_error (h, "store: flux_respond_raw");
if (!blobref || blobref[blobref_size - 1] != '\0') {
errno = EPROTO;
flux_log_error (h, "load: malformed blobref");
goto error;
}
if (content_sqlite_load (ctx, blobref, &data, &size) < 0)
goto error;
if (flux_respond_raw (h, msg, data, size) < 0)
flux_log_error (h, "load: flux_respond_raw");
(void )sqlite3_reset (ctx->load_stmt);
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "load: flux_respond_error");
}

void store_cb (flux_t *h,
flux_msg_handler_t *mh,
const flux_msg_t *msg,
void *arg)
{
struct content_sqlite *ctx = arg;
const void *data;
int size;
char blobref[BLOBREF_MAX_STRING_SIZE];

if (flux_request_decode_raw (msg, NULL, &data, &size) < 0) {
flux_log_error (h, "store: request decode failed");
goto error;
}
(void) sqlite3_reset (ctx->store_stmt);
if (content_sqlite_store (ctx, data, size, blobref, sizeof (blobref)) < 0)
goto error;
if (flux_respond_raw (h, msg, blobref, strlen (blobref) + 1) < 0)
flux_log_error (h, "store: flux_respond_raw");
return;
error:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "store: flux_respond_error");
}

void metaget_cb (flux_t *h,
Expand Down

0 comments on commit 290a17e

Please sign in to comment.