Skip to content

Commit

Permalink
WIP: mariadb-async
Browse files Browse the repository at this point in the history
  • Loading branch information
lpereira committed Sep 5, 2024
1 parent c8cb993 commit 3098a06
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 55 deletions.
127 changes: 93 additions & 34 deletions src/samples/techempower/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,36 @@
*/

#include <assert.h>
#include <string.h>
#include <mysql.h>
#include <sqlite3.h>
#include <stdarg.h>
#include <stddef.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>

#include "database.h"
#include "lwan-status.h"

/* Including "lwan.h" introduces namespace conflicts (linked-list
* functions from CCAN and MySQL headers.) */
struct lwan_request;
int lwan_request_await_read(struct lwan_request *r, int fd);
int lwan_request_await_write(struct lwan_request *r, int fd);
int lwan_request_await_read_write(struct lwan_request *r, int fd);

struct db_stmt {
bool (*bind)(const struct db_stmt *stmt,
struct db_row *rows);
bool (*bind)(const struct db_stmt *stmt, struct db_row *rows);
bool (*step)(const struct db_stmt *stmt, va_list ap);
void (*finalize)(struct db_stmt *stmt);
const char *param_signature;
const char *result_signature;
void *ctx;
};

struct db {
void (*disconnect)(struct db *db);
struct db_stmt *(*prepare)(const struct db *db,
void *ctx,
const char *sql,
const char *param_signature,
const char *result_signature);
Expand All @@ -58,13 +66,13 @@ struct db_stmt_mysql {
MYSQL_STMT *stmt;
MYSQL_BIND *param_bind;
MYSQL_BIND *result_bind;
int db_fd;
bool must_execute_again;
bool results_are_bound;
MYSQL_BIND param_result_bind[];
};

static bool db_stmt_bind_mysql(const struct db_stmt *stmt,
struct db_row *rows)
static bool db_stmt_bind_mysql(const struct db_stmt *stmt, struct db_row *rows)
{
struct db_stmt_mysql *stmt_mysql = (struct db_stmt_mysql *)stmt;
const char *signature = stmt->param_signature;
Expand Down Expand Up @@ -95,16 +103,55 @@ static bool db_stmt_bind_mysql(const struct db_stmt *stmt,
return !mysql_stmt_bind_param(stmt_mysql->stmt, stmt_mysql->param_bind);
}

static bool db_stmt_step_mysql(const struct db_stmt *stmt,
va_list ap)
static bool db_stmt_step_mysql(const struct db_stmt *stmt, va_list ap)
{
struct db_stmt_mysql *stmt_mysql = (struct db_stmt_mysql *)stmt;

if (stmt_mysql->must_execute_again) {
stmt_mysql->must_execute_again = false;
stmt_mysql->results_are_bound = false;
if (mysql_stmt_execute(stmt_mysql->stmt))
return false;

int status;

mysql_stmt_execute_start(&status, stmt_mysql->stmt);
while (status) {
/* FIXME: Handle MYSQL_WAIT_TIMEOUT and MYSQL_WAIT_EXCEPT
* properly.
*
* TIMEOUT is handled by calling mysql_get_timeout_value()
* and passing it to the await functions; that's not currently
* supported in Lwan, at least not easily (one could set a
* timeout by hand, similar to how lwan_request_sleep() does,
* but instead of yielding with CONN_CORO_SUSPEND, one would
* forward the arguments to the actual called await function;
* upon return, one would check what caused the coroutine to
* be resumed, and return -ETIMEDOUT or something.
*
* EXCEPT is handled like EPOLLPRI. Currently unsupported
* in Lwan too, although it's easier to implement than timeout.
*
* For now, ignore both flags. */
status &= ~(MYSQL_WAIT_TIMEOUT | MYSQL_WAIT_EXCEPT);

switch (status) {
case MYSQL_WAIT_READ:
lwan_request_await_read(stmt_mysql->base.ctx,
stmt_mysql->db_fd);
break;
case MYSQL_WAIT_WRITE:
lwan_request_await_write(stmt_mysql->base.ctx,
stmt_mysql->db_fd);
break;
case MYSQL_WAIT_READ | MYSQL_WAIT_WRITE:
/* FIXME: how do we know what will cause the coroutine
* to resume, a read or a write? */
lwan_request_await_read_write(stmt_mysql->base.ctx,
stmt_mysql->db_fd);
break;
}

status = mysql_stmt_execute_cont(&status, stmt_mysql->stmt, status);
}
}

if (!stmt_mysql->results_are_bound) {
Expand Down Expand Up @@ -154,15 +201,16 @@ static void db_stmt_finalize_mysql(struct db_stmt *stmt)
free(stmt_mysql);
}

static struct db_stmt *
db_prepare_mysql(const struct db *db,
const char *sql,
const char *param_signature,
const char *result_signature)
static struct db_stmt *db_prepare_mysql(const struct db *db,
void *ctx,
const char *sql,
const char *param_signature,
const char *result_signature)
{
const struct db_mysql *db_mysql = (const struct db_mysql *)db;
const size_t n_bounds = strlen(param_signature) + strlen(result_signature);
struct db_stmt_mysql *stmt_mysql = malloc(sizeof(*stmt_mysql) + n_bounds * sizeof(MYSQL_BIND));
struct db_stmt_mysql *stmt_mysql =
malloc(sizeof(*stmt_mysql) + n_bounds * sizeof(MYSQL_BIND));

if (!stmt_mysql)
return NULL;
Expand All @@ -175,19 +223,24 @@ db_prepare_mysql(const struct db *db,
goto out_close_stmt;

assert(strlen(param_signature) == mysql_stmt_param_count(stmt_mysql->stmt));
assert(strlen(result_signature) == mysql_stmt_field_count(stmt_mysql->stmt));
assert(strlen(result_signature) ==
mysql_stmt_field_count(stmt_mysql->stmt));

stmt_mysql->base.bind = db_stmt_bind_mysql;
stmt_mysql->base.step = db_stmt_step_mysql;
stmt_mysql->base.finalize = db_stmt_finalize_mysql;
stmt_mysql->param_bind = &stmt_mysql->param_result_bind[0];
stmt_mysql->result_bind = &stmt_mysql->param_result_bind[strlen(param_signature)];
stmt_mysql->result_bind =
&stmt_mysql->param_result_bind[strlen(param_signature)];
stmt_mysql->must_execute_again = true;
stmt_mysql->results_are_bound = false;

stmt_mysql->base.param_signature = param_signature;
stmt_mysql->base.result_signature = result_signature;

stmt_mysql->db_fd = mysql_get_socket(db_mysql->con);
stmt_mysql->base.ctx = ctx;

memset(stmt_mysql->param_result_bind, 0, n_bounds * sizeof(MYSQL_BIND));

return (struct db_stmt *)stmt_mysql;
Expand Down Expand Up @@ -231,6 +284,11 @@ struct db *db_connect_mysql(const char *host,
if (mysql_set_character_set(db_mysql->con, "utf8"))
goto error;

if (mysql_optionsv(db_mysql->con, MYSQL_OPT_NONBLOCK, 0)) {
lwan_status_error("Could not enable non-blocking mode");
goto error;
}

db_mysql->base.disconnect = db_disconnect_mysql;
db_mysql->base.prepare = db_prepare_mysql;

Expand All @@ -254,8 +312,7 @@ struct db_stmt_sqlite {
sqlite3_stmt *sqlite;
};

static bool db_stmt_bind_sqlite(const struct db_stmt *stmt,
struct db_row *rows)
static bool db_stmt_bind_sqlite(const struct db_stmt *stmt, struct db_row *rows)
{
const struct db_stmt_sqlite *stmt_sqlite =
(const struct db_stmt_sqlite *)stmt;
Expand All @@ -270,8 +327,8 @@ static bool db_stmt_bind_sqlite(const struct db_stmt *stmt,

switch (signature[row]) {
case 's':
ret = sqlite3_bind_text(stmt_sqlite->sqlite, (int)row + 1, r->u.s, -1,
NULL);
ret = sqlite3_bind_text(stmt_sqlite->sqlite, (int)row + 1, r->u.s,
-1, NULL);
break;
case 'i':
ret = sqlite3_bind_int(stmt_sqlite->sqlite, (int)row + 1, r->u.i);
Expand All @@ -287,8 +344,7 @@ static bool db_stmt_bind_sqlite(const struct db_stmt *stmt,
return true;
}

static bool db_stmt_step_sqlite(const struct db_stmt *stmt,
va_list ap)
static bool db_stmt_step_sqlite(const struct db_stmt *stmt, va_list ap)
{
const struct db_stmt_sqlite *stmt_sqlite =
(const struct db_stmt_sqlite *)stmt;
Expand Down Expand Up @@ -328,11 +384,11 @@ static void db_stmt_finalize_sqlite(struct db_stmt *stmt)
free(stmt_sqlite);
}

static struct db_stmt *
db_prepare_sqlite(const struct db *db,
const char *sql,
const char *param_signature,
const char *result_signature)
static struct db_stmt *db_prepare_sqlite(const struct db *db,
void *ctx,
const char *sql,
const char *param_signature,
const char *result_signature)
{
const struct db_sqlite *db_sqlite = (const struct db_sqlite *)db;
struct db_stmt_sqlite *stmt_sqlite = malloc(sizeof(*stmt_sqlite));
Expand All @@ -354,6 +410,8 @@ db_prepare_sqlite(const struct db *db,
stmt_sqlite->base.param_signature = param_signature;
stmt_sqlite->base.result_signature = result_signature;

stmt_sqlite->base.ctx = ctx;

return (struct db_stmt *)stmt_sqlite;
}

Expand Down Expand Up @@ -414,10 +472,11 @@ inline void db_stmt_finalize(struct db_stmt *stmt) { stmt->finalize(stmt); }

inline void db_disconnect(struct db *db) { db->disconnect(db); }

inline struct db_stmt *db_prepare_stmt(const struct db *db,
const char *sql,
const char *param_signature,
const char *result_signature)
inline struct db_stmt *db_prepare_stmt_ctx(const struct db *db,
void *ctx,
const char *sql,
const char *param_signature,
const char *result_signature)
{
return db->prepare(db, sql, param_signature, result_signature);
return db->prepare(db, ctx, sql, param_signature, result_signature);
}
13 changes: 12 additions & 1 deletion src/samples/techempower/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,21 @@ struct db_row {
};


struct db_stmt *db_prepare_stmt(const struct db *db,
struct db_stmt *db_prepare_stmt_ctx(const struct db *db,
void *ctx,
const char *sql,
const char *param_signature,
const char *result_signature);

static inline struct db_stmt *db_prepare_stmt(const struct db *db,
const char *sql,
const char *param_signature,
const char *result_signature)
{
return db_prepare_stmt_ctx(db, NULL, sql, param_signature,
result_signature);
}

void db_stmt_finalize(struct db_stmt *stmt);

bool db_stmt_bind(const struct db_stmt *stmt, struct db_row *rows);
Expand Down
42 changes: 22 additions & 20 deletions src/samples/techempower/techempower.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
#include <stdlib.h>
#include <string.h>

#include "lwan-private.h"
#include "int-to-str.h"
#include "lwan-cache.h"
#include "lwan-config.h"
#include "lwan-template.h"
#include "lwan-mod-lua.h"
#include "int-to-str.h"
#include "lwan-private.h"
#include "lwan-template.h"

#include "database.h"
#include "json.h"
Expand Down Expand Up @@ -63,6 +63,8 @@ struct Fortune {
int id;
char *message;
} item;

struct lwan_request *request;
};

DEFINE_ARRAY_TYPE_INLINEFIRST(fortune_array, struct Fortune)
Expand Down Expand Up @@ -219,7 +221,8 @@ static inline bool db_query(struct db_stmt *stmt, struct db_json *out)

LWAN_HANDLER(db)
{
struct db_stmt *stmt = db_prepare_stmt(get_db(), random_number_query, "i", "ii");
struct db_stmt *stmt =
db_prepare_stmt_ctx(get_db(), request, random_number_query, "i", "ii");
struct db_json db_json;

if (UNLIKELY(!stmt)) {
Expand All @@ -237,7 +240,7 @@ LWAN_HANDLER(db)
request->flags |= RESPONSE_NO_EXPIRES;

return json_response_obj(response, db_json_desc, N_ELEMENTS(db_json_desc),
&db_json);
&db_json);
}

static long get_number_of_queries(struct lwan_request *request)
Expand All @@ -253,7 +256,8 @@ LWAN_HANDLER(queries)
enum lwan_http_status ret = HTTP_INTERNAL_ERROR;
long queries = get_number_of_queries(request);

struct db_stmt *stmt = db_prepare_stmt(get_db(), random_number_query, "i", "ii");
struct db_stmt *stmt =
db_prepare_stmt_ctx(get_db(), request, random_number_query, "i", "ii");
if (UNLIKELY(!stmt))
return HTTP_INTERNAL_ERROR;

Expand Down Expand Up @@ -283,10 +287,8 @@ struct db_json_cached {
struct db_json db_json;
};

static struct cache_entry *cached_queries_new(const void *keyptr,
void *context,
void *create_ctx
__attribute__((unused)))
static struct cache_entry *
cached_queries_new(const void *keyptr, void *context, void *create_ctx)
{
struct db_json_cached *entry;
struct db_stmt *stmt;
Expand All @@ -296,7 +298,8 @@ static struct cache_entry *cached_queries_new(const void *keyptr,
if (UNLIKELY(!entry))
return NULL;

stmt = db_prepare_stmt(get_db(), cached_random_number_query, "i", "ii");
stmt = db_prepare_stmt_ctx(get_db(), create_ctx, cached_random_number_query,
"i", "ii");
if (UNLIKELY(!stmt)) {
free(entry);
return NULL;
Expand Down Expand Up @@ -327,8 +330,8 @@ LWAN_HANDLER(cached_queries)
int key = (int)lwan_random_uint64() % 10000;
int error;

jc = (struct db_json_cached *)cache_get_and_ref_entry(
cached_queries_cache, (void *)(intptr_t)key, &error);
jc = (struct db_json_cached *)cache_get_and_ref_entry_with_ctx(
cached_queries_cache, (void *)(intptr_t)key, request, &error);

qj.queries[i] = jc->db_json;

Expand Down Expand Up @@ -392,7 +395,8 @@ static int fortune_list_generator(struct coro *coro, void *data)
struct fortune_array fortunes;
struct db_stmt *stmt;

stmt = db_prepare_stmt(get_db(), fortune_query, "", "is");
stmt = db_prepare_stmt_ctx(get_db(), fortune->request, fortune_query, "",
"is");
if (UNLIKELY(!stmt))
return 0;

Expand Down Expand Up @@ -426,7 +430,7 @@ static int fortune_list_generator(struct coro *coro, void *data)

LWAN_HANDLER(fortunes)
{
struct Fortune fortune;
struct Fortune fortune = {.request = request};

lwan_strbuf_grow_to(response->buffer, 1500);

Expand Down Expand Up @@ -484,11 +488,9 @@ int main(void)
if (!fortune_tpl)
lwan_status_critical("Could not compile fortune templates");

cached_queries_cache = cache_create_full(cached_queries_new,
cached_queries_free,
hash_int_new,
NULL,
3600 /* 1 hour */);
cached_queries_cache =
cache_create_full(cached_queries_new, cached_queries_free, hash_int_new,
NULL, 3600 /* 1 hour */);
if (!cached_queries_cache)
lwan_status_critical("Could not create cached queries cache");
/* Pre-populate the cache and make it read-only to avoid locking in the fast
Expand Down

0 comments on commit 3098a06

Please sign in to comment.