-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathdb.ss
335 lines (312 loc) · 15.4 KB
/
db.ss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
;; User-level transactions on top of a single synchronous leveldb writebatch.
;;
;; This file defines transactional access to a simple key-value store.
;; All current transactions go into a current batch, that at one point is
;; triggered for commit. At that point, it will wait for current transactions
;; to all be closed while blocking any new transaction from being open.
;; Thus, transactions must always be short or there may be long delays,
;; deadlocks, and/or memory overflow. Also, we have only roll-forward and
;; not roll-back of transactions, and so they must use suitable mechanisms
;; for mutual exclusion and validation before to write anything to the database.
;;
;; Multiple threads can read the database with db-key? and db-get,
;; seeing the state before the current transaction batch,
;; and contribute to the current transaction with db-put! and db-delete!.
;; Transactions ensure that all updates made within the transaction are
;; included in a same atomic batch. A transaction is scheduled for commit
;; with close-transaction, and you can wait for its completion with
;; commit-transaction.
;;
;; with-tx is a no-op if there is already a (current-db-transaction); if not,
;; it opens a transaction, executes code while this transaction is the
;; (current-db-transaction), commit it at the end, and waits for commit.
;;
;; This code was ported from my OCaml library legilogic_lib,
;; though I did refactor and enhance the code as I ported it.
;; Notably, in the OCaml variant, every function was defined twice,
;; as a client half sending a request and some server spaghetti code
;; processing it, with some data in between and plenty of promises
;; to communicate (like Gerbil std/misc/completion, just separating the
;; post! and wait! capabilities). Instead, in Gerbil, I use locking
;; so functions can be defined only once, at the cost of having to
;; explicitly use the fields of a struct to share state instead of
;; just nicely scoped variables.
;;
;; Also, I added a timer for deferred triggering of batches by transaction.
;; I thought I had implemented that in OCaml, but that wasn't currently
;; in the master branch of legicash-facts. This is made necessary in Gambit,
;; because Gambit is lacking the OCaml feature allowing to run a FFI function
;; (in this case, the leveldb in parallel with other OCaml functions).
;; This parallelism in OCaml naturally allowed the workers to synchronize with
;; the speed of the batch commits, but that won't work on Gambit,
;; until we debug the Gambit SMP support.
;;
;; Final difference from OCaml, we use parameters for dynamic binding of the
;; database context, where OCaml could only use global variables, static binding,
;; and/or an explicit reader monad.
(export #t)
(import
:clan/db/leveldb
:std/assert
:std/misc/completion :std/misc/list :std/misc/number
:std/sugar
:clan/base :clan/concurrency :std/misc/path :clan/path-config)
(defstruct DbConnection
(name ;; name, a string, also path to the leveldb storage
leveldb ;; leveldb handle
mx txcounter
blocked-transactions open-transactions pending-transactions hooks
batch-id batch batch-completion manager timer
ready? triggered?)
constructor: :init!)
(defmethod {:init! DbConnection}
(lambda (self name leveldb)
(def mx (make-mutex name)) ;; Mutex
(def txcounter 0) ;; Nat
(def hooks (make-hash-table)) ;; (Table (<-) <- Any)
(def blocked-transactions []) ;; (List Transaction)
(def open-transactions (make-hash-table)) ;; mutable (HashSet Transaction)
(def pending-transactions []) ;; (List Transaction)
(def batch-id 0) ;; Nat
(def batch (leveldb-writebatch)) ;; leveldb-writebatch
(def batch-completion (make-completion '(db-batch 0)))
(def ready? #t) ;; Bool
(def triggered? #f) ;; Bool
(def manager (db-manager self)) ;; Thread
(def timer #f) ;; (Or Thread '#f)
(struct-instance-init!
self name leveldb mx txcounter
blocked-transactions open-transactions pending-transactions hooks
batch-id batch batch-completion manager timer
ready? triggered?)))
(def current-db-connection (make-parameter #f))
(def (open-db-connection name (opts (leveldb-default-options)))
(def path (ensure-absolute-path name persistent-directory))
(create-directory* (path-parent path))
(DbConnection name (leveldb-open path opts)))
(def (open-db-connection! name (opts (leveldb-default-options)))
(current-db-connection (open-db-connection name opts)))
(def (close-db-connection! c)
(leveldb-close (DbConnection-leveldb c))
(thread-send (DbConnection-manager c) #f))
(def (close-db-connection c)
(with-db-lock (c)
(register-commit-hook! 'close (lambda _ (close-db-connection! c)) c)
(db-trigger! c))
(thread-join! (DbConnection-manager c)))
(def (call-with-db-connection fun name (opts (leveldb-default-options)))
(def c (open-db-connection name))
(try
(parameterize ((current-db-connection c))
(fun c))
(finally (close-db-connection c))))
(defrule (with-db-connection (c name ...) body ...)
(call-with-db-connection (lambda (c) body ...) name ...))
(def (ensure-db-connection name)
(def c (current-db-connection))
(if c
(assert! (equal? (DbConnection-name c) name))
(open-db-connection! name)))
;; Mark the current batch as triggered, because either some transaction must be committed,
;; or a timer has hit since content was added, or we're closing the database.
;; ASSUMES YOU'RE HOLDING THE DB-LOCK
;; : <- DbConnection
(def (db-trigger! c)
(if (and (DbConnection-ready? c) (zero? (hash-length (DbConnection-open-transactions c))))
(finalize-batch! c)
(set! (DbConnection-triggered? c) #t)))
;; : Real
(def deferred-db-trigger-interval-in-seconds .02)
;; ASSUMES YOU'RE HOLDING THE DB-LOCK
;; : <- DbConnection
(def (deferred-db-trigger! c)
(unless (DbConnection-timer c)
(let (batch-id (DbConnection-batch-id c))
(set! (DbConnection-timer c)
(spawn/name/logged
['timer batch-id]
(lambda () (thread-sleep! deferred-db-trigger-interval-in-seconds)
(with-db-lock (c)
(when (equal? batch-id (DbConnection-batch-id c))
(db-trigger! c)))))))))
(defrules with-db-lock ()
((_ (conn) body ...) (with-lock (DbConnection-mx conn) (lambda () body ...)))
((_ () body ...) (with-db-lock (current-db-connection) body ...)))
(def (call-with-db-lock fun (conn (current-db-connection)))
(with-db-lock (conn) (fun conn)))
;; status: blocked open pending complete
;; When opening a transaction, it may be blocked at first so the previous batch may be completed,
;; but by the time it is returned to the user, it is in open status;
;; when it is closed, it becomes pending until its batch is committed,
;; at which point it becomes complete and any thread sync'ing on it will be awakened.
(defstruct DbTransaction (connection txid status) transparent: #t)
(def current-db-transaction (make-parameter #f))
(def (DbTransaction-completion tx)
(def c (DbTransaction-connection tx))
(with-db-lock (c)
(case (DbTransaction-status tx)
((open pending)
(DbConnection-batch-completion c))
(else #f))))
;; : <- (OrFalse Completion)
(def (wait-completion completion)
(when completion (completion-wait! completion)))
;; Open Transaction
;; TODO: assert that the transaction_counter never wraps around?
;; Or check and block further transactions when it does, before resetting the counter? *)
;; TODO: commenting out the ready && triggered helps detect / enact deadlocks when running
;; tests, by having only one active transaction at a time; but then the hold can and
;; should be released as soon as "the" transaction is complete, unless we're already both
;; ready && triggered for the next batch commit. Have an option for that?
(def (open-transaction (c (current-db-connection)))
(defvalues (transaction completion)
(with-db-lock (c)
(let* ((txid (post-increment! (DbConnection-txcounter c)))
(blocked? (and (DbConnection-ready? c) (DbConnection-triggered? c)))
(status (if blocked? 'blocked 'open))
(transaction (DbTransaction c txid status)))
(if blocked?
(push! transaction (DbConnection-blocked-transactions c))
(hash-put! (DbConnection-open-transactions c) txid transaction))
(values transaction (and blocked? (DbConnection-batch-completion c))))))
(wait-completion completion) ;; wait without holding the lock
transaction)
;; For now, let's
;; * Disallow nested transaction / auto-transactions. We want a clear transaction owner, and
;; the type / signature of functions will ensure that there is always one.
;; * Return the result of the inner expression, after the transaction is closed but not committed.
;; If you need to synchronize on the transaction, be sure to return it or otherwise memorize it,
;; or use after-commit from within the body.
(def (call-with-tx fun (c #f) wait: (wait #f))
(awhen (t (current-db-transaction))
(error "Cannot nest transactions" t))
(def tx (open-transaction (or c (current-db-connection))))
(try
(parameterize ((current-db-transaction tx))
(fun tx))
(finally
(close-transaction tx)
(when wait (sync-transaction tx)))))
(defrule (with-tx (tx dbc ...) body ...)
(call-with-tx (lambda (tx) body ...) dbc ...))
(defrule (without-tx body ...)
(parameterize ((current-db-transaction #f)) body ...))
(def (call-with-committed-tx fun (c #f))
(call-with-tx fun c wait: #t))
(defrule (with-committed-tx (tx dbc ...) body ...)
(call-with-committed-tx (lambda (tx) body ...) dbc ...))
(defrule (after-commit (tx) body ...)
(without-tx (spawn/name/logged
['after-commit (DbTransaction-txid tx)]
(lambda () (completion-wait! (DbTransaction-completion tx)) body ...))))
;; Mark a transaction as ready to be committed.
;; Return a completion that will be posted when the transaction is committed to disk.
;; The system must otherwise ensure that the action that follows this promise
;; will be restarted by a new instance of this program in case the process crashes after this commit,
;; or is otherwise some client's responsibility to restart if the program acts as a server.
(def (close-transaction (tx (current-db-transaction)))
(match tx
((DbTransaction c txid status)
(with-db-lock (c)
(case status
((blocked open)
(set! (DbTransaction-status tx) 'pending)
(hash-remove! (DbConnection-open-transactions c) txid)
(push! tx (DbConnection-pending-transactions c))
(deferred-db-trigger! c)
(DbConnection-batch-completion c))
((pending)
(DbConnection-batch-completion c))
(else #f))))
(else (error "close-transaction: not a transaction" tx))))
;; Close a transaction, then wait for it to be committed.
(def (commit-transaction (transaction (current-db-transaction)))
(wait-completion (close-transaction transaction)))
;; Sync to a transaction being committed.
;; Thou Shalt Not sync with the end of a transaction from within another transaction,
;; or you may deadlock, since that other transaction might be part of the same batch.
;; Instead, thou shalt sync on it in a background thread, that will then run
;; the very same code as you would if you would resume the persistent activity,
;; and that code must be effectively idempotent.
(def (sync-transaction (transaction (current-db-transaction)))
(wait-completion (DbTransaction-completion transaction)))
;; Register post-commit finalizer actions to be run after this batch commits,
;; with the batch id as a parameter.
;; The hook is called synchronously, but it if you use asynchronous message passing,
;; it is possible that the hooks may be called out of order.
;; ASSUMES YOU'RE HOLDING THE DB-LOCK
;; Unit <- Any (<- Nat) DbConnection
(def (register-commit-hook! name hook (c (current-db-connection)))
(hash-put! (DbConnection-hooks c) name hook))
(def leveldb-sync-write-options (leveldb-write-options sync: #f))
(def (db-manager c)
(spawn/name/logged
['db-manager (DbConnection-name c)]
(fun (db-manager-1)
(let loop ()
(match (thread-receive)
([batch-id batch batch-completion hooks pending-transactions]
;; TODO: run the leveldb-write in a different OS thread.
(leveldb-write (DbConnection-leveldb c) batch leveldb-sync-write-options)
(for-each (lambda (tx) (set! (DbTransaction-status tx) 'complete))
pending-transactions)
(for-each (lambda (hook) (hook batch-id)) hooks)
(completion-post! batch-completion batch-id)
(with-db-lock (c)
(if (and (DbConnection-triggered? c) (zero? (hash-length (DbConnection-open-transactions c))))
(finalize-batch! c)
(set! (DbConnection-ready? c) #t)))
(loop))
(#f (void))
(x (error "foo" x)))))))
;; Fork a system thread to handle the commit;
;; when it's done, wakeup the wait-on-batch-commit completion
(def (finalize-batch! c)
(def batch-id (DbConnection-batch-id c))
(def batch (DbConnection-batch c))
(def batch-completion (DbConnection-batch-completion c))
(def hooks (hash-values (DbConnection-hooks c)))
(def blocked-transactions (DbConnection-blocked-transactions c))
(def pending-transactions (DbConnection-pending-transactions c))
(set! (DbConnection-batch-id c) (1+ batch-id))
(set! (DbConnection-batch c) (leveldb-writebatch))
(set! (DbConnection-batch-completion c) (make-completion `(db-batch , (DbConnection-batch-id c))))
(set! (DbConnection-pending-transactions c) [])
(set! (DbConnection-blocked-transactions c) [])
(set! (DbConnection-ready? c) #f)
(set! (DbConnection-triggered? c) #f)
(set! (DbConnection-timer c) #f)
(for-each (lambda (tx)
(set! (DbTransaction-status tx) 'open)
(hash-put! (DbConnection-open-transactions c) (DbTransaction-txid tx) tx))
blocked-transactions)
(thread-send (DbConnection-manager c) [batch-id batch batch-completion hooks pending-transactions]))
;; Get the batch id: not just for testing,
;; but also, within a transaction, to get the id to prepare a hook,
;; e.g. to send newly committed but previously unsent messages.
(def (get-batch-id (c (current-db-connection)))
(DbConnection-batch-id c))
(def (db-get key (tx (current-db-transaction)) (opts (leveldb-default-read-options)))
(leveldb-get (DbConnection-leveldb (DbTransaction-connection tx)) key opts))
(def (db-key? key (tx (current-db-transaction)) (opts (leveldb-default-read-options)))
(leveldb-key? (DbConnection-leveldb (DbTransaction-connection tx)) key opts))
(def (db-put! k v (tx (current-db-transaction)))
(def c (DbTransaction-connection tx))
(with-db-lock (c)
(leveldb-writebatch-put (DbConnection-batch c) k v)))
(def (db-put-many! l (tx (current-db-transaction)))
(def c (DbTransaction-connection tx))
(with-db-lock (c)
(let (batch (DbConnection-batch c))
(for-each (match <> ([k . v] (leveldb-writebatch-put batch k v))) l))))
(def (db-delete! k (tx (current-db-transaction)))
(def c (DbTransaction-connection tx))
(with-db-lock (c)
(leveldb-writebatch-delete (DbConnection-batch c) k)))
#;(trace! current-db-connection current-db-transaction
open-db-connection open-db-connection!
close-db-connection! close-db-connection call-with-db-connection
db-trigger! call-with-db-lock
open-transaction call-with-tx call-with-committed-tx close-transaction
commit-transaction register-commit-hook! db-manager finalize-batch!
get-batch-id db-get db-key? db-put! db-put-many! db-delete!)