Skip to content

Commit

Permalink
Merge pull request #685 from cole-miller/revert-exec-sql-suspend
Browse files Browse the repository at this point in the history
Revert suspending inside handle_exec_sql
  • Loading branch information
cole-miller authored Aug 15, 2024
2 parents 7dea935 + 2d0d71b commit 6e24f8c
Show file tree
Hide file tree
Showing 8 changed files with 9 additions and 101 deletions.
3 changes: 1 addition & 2 deletions src/conn.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ static void raft_connect(struct conn *c)
/* Close the connection without actually closing the transport, since
* the stream will be used by raft */
c->closed = true;
gateway__close(&c->gateway);
closeCb(&c->transport);
}

Expand Down Expand Up @@ -312,7 +311,7 @@ int conn__start(struct conn *c,
c->transport.data = c;
c->uv_transport = uv_transport;
c->close_cb = close_cb;
gateway__init(&c->gateway, config, loop, registry, raft, seed);
gateway__init(&c->gateway, config, registry, raft, seed);
rv = buffer__init(&c->read);
if (rv != 0) {
goto err_after_transport_init;
Expand Down
39 changes: 3 additions & 36 deletions src/gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed)
Expand All @@ -35,10 +34,6 @@ void gateway__init(struct gateway *g,
g->protocol = DQLITE_PROTOCOL_VERSION;
g->client_id = 0;
g->random_state = seed;
g->defer = (uv_timer_t){};
if (loop != NULL) {
uv_timer_init(loop, &g->defer);
}
}

void gateway__leader_close(struct gateway *g, int reason)
Expand Down Expand Up @@ -101,11 +96,6 @@ void gateway__leader_close(struct gateway *g, int reason)
void gateway__close(struct gateway *g)
{
tracef("gateway close");

if (g->defer.loop != NULL) {
uv_close((uv_handle_t *)&g->defer, NULL);
}

if (g->leader == NULL) {
stmt__registry_close(&g->stmts);
return;
Expand Down Expand Up @@ -728,14 +718,6 @@ static void handle_exec_sql_next(struct gateway *g,
struct handle *req,
bool done);

static void handle_exec_sql_next_defer_cb(uv_timer_t *t)
{
struct gateway *g = t->data;
PRE(g != NULL && g->req != NULL);
PRE(g->req->type == DQLITE_REQUEST_EXEC_SQL);
handle_exec_sql_next(g, g->req, true);
}

static void handle_exec_sql_cb(struct exec *exec, int status)
{
tracef("handle exec sql cb status %d", status);
Expand All @@ -745,27 +727,12 @@ static void handle_exec_sql_cb(struct exec *exec, int status)
req->exec_count += 1;
sqlite3_finalize(exec->stmt);

if (status != SQLITE_DONE) {
if (status == SQLITE_DONE) {
handle_exec_sql_next(g, req, true);
} else {
assert(g->leader != NULL);
failure(req, status, error_message(g->leader->conn, status));
g->req = NULL;
return;
}

/* It would be valid to always invoke handle_exec_sql_next directly
* here. But that can lead to bounded recursion when we have several
* `;`-separated statements in a row that do not generate rows. To make
* sure the stack depth stays under control, we defer have the event
* loop invoke handle_exec_sql_next itself on the next iteration, but
* only if there is a prior call to handle_exec_sql_next above us on
* the stack. We also invoke handle_exec_sql_next directly if the
* gateway doesn't have access to an event loop (this is only the case
* in the unit tests). */
if (exec->async || g->defer.loop == NULL) {
handle_exec_sql_next(g, req, true);
} else {
g->defer.data = g;
uv_timer_start(&g->defer, handle_exec_sql_next_defer_cb, 0, 0);
}
}

Expand Down
12 changes: 0 additions & 12 deletions src/gateway.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,10 @@ struct gateway {
uint64_t protocol; /* Protocol format version */
uint64_t client_id;
struct id_state random_state; /* For generating IDs */
/* The EXEC_SQL request handler uses this to defer work to the next
* loop iteration, to avoid recursion when processing multi-statement
* SQL strings. */
uv_timer_t defer;
};

/**
* Initialize a gateway.
*
* Passing NULL for the `loop` is permitted. Currently the loop is only used
* optionally to break potential recursion when handling an EXEC_SQL request
* that containss multiple `;`-separated statements.
*/
void gateway__init(struct gateway *g,
struct config *config,
uv_loop_t *loop,
struct registry *registry,
struct raft *raft,
struct id_state seed);
Expand Down
5 changes: 0 additions & 5 deletions src/leader.c
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,6 @@ static void leaderApplyFramesCb(struct raft_apply *req,
finish:
l->inflight = NULL;
l->db->tx_id = 0;
l->exec->async = true;
leaderExecDone(l->exec);
}

Expand Down Expand Up @@ -440,8 +439,6 @@ static void execBarrierCb(struct barrier *barrier, int status)
struct exec *req = barrier->data;
struct leader *l = req->leader;

req->async = req->barrier.async;

if (status != 0) {
l->exec->status = status;
leaderExecDone(l->exec);
Expand Down Expand Up @@ -508,7 +505,6 @@ static void raftBarrierCb(struct raft_barrier *req, int status)
return;
}
barrier->cb = NULL;
barrier->async = true;
cb(barrier, rv);
}

Expand All @@ -518,7 +514,6 @@ int leader__barrier(struct leader *l, struct barrier *barrier, barrier_cb cb)
int rv;
if (!needsBarrier(l)) {
tracef("not needed");
barrier->async = false;
cb(barrier, 0);
return 0;
}
Expand Down
9 changes: 1 addition & 8 deletions src/leader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,6 @@ struct barrier {
void *data;
struct leader *leader;
struct raft_barrier req;
/* When the callback is invoked, this field is `true` if raft_barrier
* was called and `false` if the callback was invoked immediately. */
bool async;
barrier_cb cb;
};

Expand All @@ -66,12 +63,8 @@ struct exec {
uint64_t id;
int status;
queue queue;
pool_work_t work;
/* When the callback is invoked, this field is `true` if raft_barrier
* or raft_apply was called and `false` if the callback was invoked
* immediately. */
bool async;
exec_cb cb;
pool_work_t work;
};

/**
Expand Down
26 changes: 0 additions & 26 deletions test/integration/test_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,3 @@ TEST(client, querySql, setUp, tearDown, 0, client_params)

return MUNIT_OK;
}

/* Stress test of an EXEC_SQL with many ';'-separated statements. */
TEST(client, semicolons, setUp, tearDown, 0, NULL)
{
struct fixture *f = data;
(void)params;

static const char trivial_stmt[] = "CREATE TABLE IF NOT EXISTS foo (n INT);";

size_t n = 1000;
size_t unit = sizeof(trivial_stmt) - 1;
char *sql = munit_malloc(n * unit);
char *p = sql;
for (size_t i = 0; i < n; i++) {
memcpy(p, trivial_stmt, unit);
p += unit;
}
sql[n * unit - 1] = '\0';

uint64_t last_insert_id;
uint64_t rows_affected;
EXEC_SQL(sql, &last_insert_id, &rows_affected);

free(sql);
return MUNIT_OK;
}
8 changes: 2 additions & 6 deletions test/unit/test_concurrency.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ struct connection {
struct request_open open; \
struct response_db db; \
struct id_state seed = { { 1 } }; \
gateway__init(&c->gateway, \
CLUSTER_CONFIG(0), \
NULL, \
CLUSTER_REGISTRY(0), \
CLUSTER_RAFT(0), \
seed); \
gateway__init(&c->gateway, CLUSTER_CONFIG(0), \
CLUSTER_REGISTRY(0), CLUSTER_RAFT(0), seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->request); \
munit_assert_int(rc, ==, 0); \
Expand Down
8 changes: 2 additions & 6 deletions test/unit/test_gateway.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,8 @@ struct connection {
struct id_state seed = { { 1 } }; \
config = CLUSTER_CONFIG(i); \
config->page_size = 512; \
gateway__init(&c->gateway, \
config, \
NULL, \
CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), \
seed); \
gateway__init(&c->gateway, config, CLUSTER_REGISTRY(i), \
CLUSTER_RAFT(i), seed); \
c->handle.data = &c->context; \
rc = buffer__init(&c->buf1); \
munit_assert_int(rc, ==, 0); \
Expand Down

0 comments on commit 6e24f8c

Please sign in to comment.