This repository has been archived by the owner on Apr 16, 2018. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 31
/
cassandra.coffee
3717 lines (3439 loc) · 149 KB
/
cassandra.coffee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
###############################################################################
#
# SageMathCloud: A collaborative web-based interface to Sage, IPython, LaTeX and the Terminal.
#
# Copyright (C) 2014, William Stein
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
#########################################################################
#
# Interface to the Cassandra Database.
#
# *ALL* DB queries (using CQL, etc.) should be in this file, with
# *Cassandra/CQL agnostic wrapper functions defined here. E.g.,
# to find out if an email address is available, define a function
# here that does the CQL query.
# Well, calling "select" is ok, but don't ever directly write
# CQL statements.
#
# fs=require('fs'); a = new (require("cassandra").Salvus)(keyspace:'salvus', hosts:['10.1.1.2:9160'], username:'salvus', password:fs.readFileSync('data/secrets/cassandra/salvus').toString().trim(), cb:console.log)
# fs=require('fs'); a = new (require("cassandra").Salvus)(keyspace:'salvus', hosts:['10.1.1.2:9160'], username:'hub', password:fs.readFileSync('data/secrets/cassandra/hub').toString().trim(), cb:console.log)
#
# fs=require('fs'); a = new (require("cassandra").Salvus)(keyspace:'salvus', hosts:['localhost:8403'], username:'salvus', password:fs.readFileSync('data/secrets/cassandra/salvus').toString().trim(), cb:console.log)
#
# a = new (require("cassandra").Salvus)(keyspace:'salvus', hosts:['localhost'], cb:console.log)
#
#########################################################################
# This is used for project servers. [[um, -- except it isn't actually used at all anywhere! (oct 11, 2013)]]
MAX_SCORE = 3
MIN_SCORE = -3 # if hit, server is considered busted.
# recent times, used for recently_modified_projects
exports.RECENT_TIMES = RECENT_TIMES =
short : 5*60
day : 60*60*24
week : 60*60*24*7
month : 60*60*24*7*30
RECENT_TIMES_ARRAY = ({desc:desc,ttl:ttl} for desc,ttl of RECENT_TIMES)
misc = require('misc')
misc_node = require('misc_node')
PROJECT_GROUPS = misc.PROJECT_GROUPS
{to_json, from_json, defaults} = misc
required = defaults.required
fs = require('fs')
assert = require('assert')
async = require('async')
winston = require('winston') # https://github.com/flatiron/winston
cql = require("cassandra-driver")
Client = cql.Client # https://github.com/datastax/nodejs-driver
uuid = require('node-uuid')
{EventEmitter} = require('events')
moment = require('moment')
_ = require('underscore')
CONSISTENCIES = (cql.types.consistencies[k] for k in ['any', 'one', 'two', 'three', 'quorum', 'localQuorum', 'eachQuorum', 'all'])
higher_consistency = (consistency) ->
if not consistency?
return cql.types.consistencies.any
else if consistency == 1
consistency = 'one'
else if consistency == 2
consistency = 'two'
else if consistency == 3
consistency = 'three'
i = CONSISTENCIES.indexOf(consistency)
if i == -1
# unknown -- ?
return cql.types.consistencies.localQuorum
else
return CONSISTENCIES[Math.min(CONSISTENCIES.length-1,i+1)]
# the time right now, in iso format ready to insert into the database:
now = exports.now = () -> new Date()
# the time ms milliseconds ago, in iso format ready to insert into the database:
exports.milliseconds_ago = (ms) -> new Date(new Date() - ms)
exports.seconds_ago = (s) -> exports.milliseconds_ago(1000*s)
exports.minutes_ago = (m) -> exports.seconds_ago(60*m)
exports.hours_ago = (h) -> exports.minutes_ago(60*h)
exports.days_ago = (d) -> exports.hours_ago(24*d)
# inet type: see https://github.com/jorgebay/node-cassandra-cql/issues/61
exports.inet_to_str = (r) ->
if r instanceof cql.types.InetAddress
return r.toString()
else
return r
#########################################################################
PROJECT_COLUMNS = exports.PROJECT_COLUMNS = ['project_id', 'account_id', 'title', 'last_edited', 'description', 'public', 'bup_location', 'size', 'deleted', 'hide_from_accounts'].concat(PROJECT_GROUPS)
exports.PUBLIC_PROJECT_COLUMNS = ['project_id', 'title', 'last_edited', 'description', 'public', 'bup_location', 'size', 'deleted']
exports.create_schema = (conn, cb) ->
t = misc.walltime()
blocks = require('fs').readFileSync('db_schema.cql', 'utf8').split('CREATE')
f = (s, cb) ->
console.log(s)
if s.length > 0
conn.cql("CREATE "+s, [], ((e,r)->console.log(e) if e; cb(null,0)))
else
cb(null, 0)
async.mapSeries blocks, f, (err, results) ->
winston.info("created schema in #{misc.walltime()-t} seconds.")
winston.info(err)
cb(err)
class UUIDStore
set: (opts) ->
opts = defaults opts,
uuid : undefined
value : undefined
ttl : 0
consistency : undefined
cb : undefined
if not opts.uuid?
opts.uuid = uuid.v4()
else
if not misc.is_valid_uuid_string(opts.uuid)
throw "invalid uuid #{opts.uuid}"
@cassandra.update
table : @_table
where : {name:@opts.name, uuid:opts.uuid}
set : {value:@_to_db(opts.value)}
ttl : opts.ttl
consistency : opts.consistency
cb : opts.cb
return opts.uuid
# returns 0 if there is no ttl set; undefined if no object in table
get_ttl: (opts) =>
opts = defaults opts,
uuid : required
cb : required
@cassandra.select
table : @_table
where : {name:@opts.name, uuid:opts.uuid}
columns : ['ttl(value)']
objectify : false
cb : (err, result) =>
if err
opts.cb(err)
else
ttl = result[0]?[0]
if ttl == null
ttl = 0
opts.cb(err, ttl)
# change the ttl of an existing entry -- requires re-insertion, which wastes network bandwidth...
_set_ttl: (opts) =>
opts = defaults opts,
uuid : required
ttl : 0 # no ttl
cb : undefined
@get
uuid : opts.uuid
cb : (err, value) =>
if value?
@set
uuid : opts.uuid
value : value # note -- the implicit conversion between buf and string is *necessary*, sadly.
ttl : opts.ttl
cb : opts.cb
else
opts.cb?(err)
# Set ttls for all given uuids at once; expensive if needs to change ttl, but cheap otherwise.
set_ttls: (opts) =>
opts = defaults opts,
uuids : required # array of strings/uuids
ttl : 0
cb : undefined
if opts.uuids.length == 0
opts.cb?()
return
@cassandra.select
table : @_table
columns : ['ttl(value)', 'uuid']
where : {name:@opts.name, uuid:{'in':opts.uuids}}
objectify : true
cb : (err, results) =>
if err
opts.cb?(err)
else
f = (r, cb) =>
if r['ttl(value)'] != opts.ttl
@_set_ttl
uuid : r.uuid
ttl : opts.ttl
cb : cb
else
cb()
async.map(results, f, opts.cb)
# Set ttl only for one ttl; expensive if needs to change ttl, but cheap otherwise.
set_ttl: (opts) =>
opts = defaults opts,
uuid : required
ttl : 0 # no ttl
cb : undefined
@set_ttls
uuids : [opts.uuid]
ttl : opts.ttl
cb : opts.cb
get: (opts) ->
opts = defaults opts,
uuid : required
consistency : undefined
cb : required
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb("invalid uuid #{opts.uuid}")
@cassandra.select
table : @_table
columns : ['value']
where : {name:@opts.name, uuid:opts.uuid}
consistency : opts.consistency
cb : (err, results) =>
if err
opts.cb(err)
else
if results.length == 0
opts.cb(false, undefined)
else
r = results[0][0]
if r == null
opts.cb(false, undefined)
else
opts.cb(false, @_from_db(r))
delete: (opts) ->
opts = defaults opts,
uuid : required
cb : undefined
if not misc.is_valid_uuid_string(opts.uuid)
opts.cb?("invalid uuid #{opts.uuid}")
@cassandra.delete
table : @_table
where : {name:@opts.name, uuid:opts.uuid}
cb : opts.cb
delete_all: (opts={}) ->
opts = defaults(opts, cb:undefined)
@cassandra.delete
table : @_table
where : {name:@opts.name}
cb : opts.cb
length: (opts={}) ->
opts = defaults(opts, cb:undefined)
@cassandra.count
table : @_table
where : {name:@opts.name}
cb : opts.cb
all: (opts={}) ->
opts = defaults(opts, cb:required)
@cassandra.select
table : @_table
columns : ['uuid', 'value']
where : {name:@opts.name},
cb : (err, results) ->
obj = {}
for r in results
obj[r[0]] = @_from_db(r[1])
opts.cb(err, obj)
class UUIDValueStore extends UUIDStore
# c = new (require("cassandra").Salvus)(keyspace:'test'); s = c.uuid_value_store(name:'sage')
# uid = s.set(value:{host:'localhost', port:5000}, ttl:30, cb:console.log)
# uid = u.set(value:{host:'localhost', port:5000})
# u.get(uuid:uid, cb:console.log)
constructor: (@cassandra, opts={}) ->
@opts = defaults(opts, name:required)
@_table = 'uuid_value'
@_to_db = to_json
@_from_db = from_json
class UUIDBlobStore extends UUIDStore
# c = new (require("cassandra").Salvus)(keyspace:'salvus'); s = c.uuid_blob_store(name:'test')
# b = new Buffer("hi\u0000there"); uuid = s.set(value:b, ttl:300, cb:console.log)
# s.get(uuid: uuid, cb:(e,r) -> console.log(r))
constructor: (@cassandra, opts={}) ->
@opts = defaults(opts, name:required)
@_table = 'uuid_blob'
@_to_db = (x) -> x
@_from_db = (x) -> new Buffer(x, 'hex')
class KeyValueStore
# c = new (require("cassandra").Salvus)(); d = c.key_value_store('test')
# d.set(key:[1,2], value:[465, {abc:123, xyz:[1,2]}], ttl:5)
# d.get(key:[1,2], console.log) # but call it again in > 5 seconds and get nothing...
constructor: (@cassandra, opts={}) ->
@opts = defaults(opts, name:required)
set: (opts={}) =>
opts = defaults opts,
key : undefined
value : undefined
ttl : 0
consistency : undefined
cb : undefined
@cassandra.update
table : 'key_value'
where : {name:@opts.name, key:to_json(opts.key)}
set : {value:to_json(opts.value)}
ttl : opts.ttl
consistency : opts.consistency
cb : opts.cb
get: (opts={}) =>
opts = defaults opts,
key : undefined
timestamp : false # if specified, result is {value:the_value, timestamp:the_timestamp} instead of just value.
consistency : undefined
cb : undefined # cb(error, value)
if opts.timestamp
@cassandra.select
table : 'key_value'
columns : ['value']
timestamp : ['value']
where : {name:@opts.name, key:to_json(opts.key)}
consistency : opts.consistency
cb : (error, results) =>
if error
opts.cb?(error)
else
opts.cb?(undefined, if results?.length == 1 then {'value':from_json(results[0][0].value), 'timestamp':results[0][0].timestamp})
else
@cassandra.select
table:'key_value'
columns:['value']
where:{name:@opts.name, key:to_json(opts.key)}
cb:(error, results) =>
if error
opts.cb?(error)
else
opts.cb?(undefined, if results.length == 1 then from_json(results[0][0]))
delete: (opts={}) ->
opts = defaults(opts, key:undefined, cb:undefined)
@cassandra.delete(table:'key_value', where:{name:@opts.name, key:to_json(opts.key)}, cb:opts.cb)
delete_all: (opts={}) ->
opts = defaults(opts, cb:undefined)
@cassandra.delete(table:'key_value', where:{name:@opts.name}, cb:opts.cb)
length: (opts={}) ->
opts = defaults(opts, cb:undefined)
@cassandra.count(table:'key_value', where:{name:@opts.name}, cb:opts.cb)
all: (opts={}) =>
opts = defaults(opts, cb:undefined)
@cassandra.select
table:'key_value'
columns:['key', 'value']
where:{name:@opts.name}
cb: (error, results) =>
if error
opts.cb?(error)
else
opts.cb?(undefined, [from_json(r[0]), from_json(r[1])] for r in results)
# Convert individual entries in columns from cassandra formats to what we
# want to use everywhere in Salvus. For example, uuid's are converted to
# strings instead of their own special object type, since otherwise they
# convert to JSON incorrectly.
exports.from_cassandra = from_cassandra = (value, json) ->
if not value?
return undefined
# see https://github.com/datastax/nodejs-driver/blob/master/doc/upgrade-guide-2.0.md
if value instanceof cql.types.Uuid or value instanceof cql.types.TimeUuid or value instanceof cql.types.InetAddress
value = value.toString()
else if value instanceof cql.types.Integer or value instanceof cql.types.Long
value = value.toInt() # long type
else if value instanceof cql.types.BigDecimal
value = value.toNumber()
else if value instanceof Array # a set/list collection -- http://www.datastax.com/documentation/developer/nodejs-driver/2.0/nodejs-driver/reference/collections.html
value = (from_cassandra(x) for x in value)
else if value.constructor == Object # a map collection
x = {}
for k, v of value
x[k] = from_cassandra(v)
else
value = value.valueOf()
if json
value = from_json(value)
return value
class exports.Cassandra extends EventEmitter
constructor: (opts={}) -> # cb is called on connect
opts = defaults opts,
hosts : ['localhost']
cb : undefined
keyspace : undefined
username : undefined
password : undefined
query_timeout_s : 30 # any query that doesn't finish after this amount of time (due to cassandra/driver *bugs*) will be retried a few times (same as consistency causing retries)
query_max_retry : 3 # max number of retries
consistency : undefined
verbose : false # quick hack for debugging...
conn_timeout_ms : 4000 # Maximum time in milliseconds to wait for a connection from the pool.
@keyspace = opts.keyspace
@query_timeout_s = opts.query_timeout_s
@query_max_retry = opts.query_max_retry
if opts.hosts.length == 1
# the default consistency won't work if there is only one node.
opts.consistency = 1
@consistency = opts.consistency # the default consistency (for now)
#winston.debug("connect using: #{JSON.stringify(opts)}") # DEBUG ONLY!! output contains sensitive info (the password)!!!
@_opts = opts
@connect()
reconnect: (cb) =>
winston.debug("reconnect to database server")
if not @conn? or not @conn.shutdown?
winston.debug("directly connecting")
@connect(cb)
return
winston.debug("reconnect to database server -- first shutting down")
@conn.shutdown (err) =>
winston.debug("reconnect to database server -- shutdown returned #{err}")
delete @conn
@connect(cb)
connect: (cb) =>
winston.debug("connect: connecting to the database server")
console.log("connecting...")
opts = @_opts
if @conn?
@conn.shutdown?()
delete @conn
o =
contactPoints : opts.hosts
keyspace : opts.keyspace
queryOptions :
consistency : @consistency
prepare : true
fetchSize : 150000 # make huge so we get everything if possible. If result will be big, use the stream:true option to cql/select.
socketOptions :
connectTimeout : opts.conn_timeout_ms
if opts.username? and opts.password?
o.authProvider = new cql.auth.PlainTextAuthProvider(opts.username, opts.password)
@conn = new Client(o)
if opts.verbose
@conn.on 'log', (level, message) =>
winston.debug('database connection event: %s -- %j', level, message)
@conn.on 'error', (err) =>
winston.error(err.name, err.message)
@emit('error', err)
@conn.connect (err) =>
if err
winston.debug("failed to connect to database -- #{err}")
else
winston.debug("connected to database")
opts.cb?(err, @)
# CRITICAL -- we must not call the callback multiple times; note that this
# connect event happens even on *reconnect*, which will happen when the
# database connection gets dropped, e.g., due to restarting the database,
# network issues, etc.
opts.cb = undefined
# this callback is for convenience when re-connecting
cb?()
cb=undefined
_where: (where_key, vals, json=[]) ->
where = "";
for key, val of where_key
equals_fallback = true
for op in ['>', '<', '>=', '<=', '==', 'in', '']
if op == '' and equals_fallback
x = val
op = '=='
else
assert(val?, "val must be defined -- there's a bug somewhere: _where(#{to_json(where_key)}, #{to_json(vals)}, #{to_json(json)})")
x = val[op]
if x?
if key in json
x2 = to_json(x)
else
x2 = x
if op != ''
equals_fallback = false
if op == '=='
op = '=' # for cassandra
where += "#{key} #{op} ?"
vals.push(x2)
where += " AND "
return where.slice(0,-4) # slice off final AND.
_set: (properties, vals, json=[]) ->
set = "";
for key, val of properties
if key in json
val = to_json(val)
if val?
if misc.is_valid_uuid_string(val)
# The Helenus driver is completely totally
# broken regarding uuid's (their own UUID type
# doesn't work at all). (as of April 15, 2013) - TODO: revisit this since I'm not using Helenus anymore.
# This is of course scary/dangerous since what if x2 is accidentally a uuid!
set += "#{key}=#{val},"
else if typeof(val) != 'boolean'
set += "#{key}=?,"
vals.push(val)
else
# TODO: here we work around a driver bug :-(
set += "#{key}=#{val},"
else
set += "#{key}=null,"
return set.slice(0,-1)
close: () ->
@conn.close()
@emit('close')
###########################################################################################
# Set the count of entries in a table that we manually track.
# (Note -- I tried implementing this by deleting the entry then updating and that made
# the value *always* null no matter what. So don't do that.)
set_table_counter: (opts) =>
opts = defaults opts,
table : required
value : required
cb : required
current_value = undefined
async.series([
(cb) =>
@get_table_counter
table : opts.table
cb : (err, value) =>
current_value = value
cb(err)
(cb) =>
@update_table_counter
table : opts.table
delta : opts.value - current_value
cb : cb
], opts.cb)
# Modify the count of entries in a table that we manually track.
# The default is to add 1.
update_table_counter: (opts) =>
opts = defaults opts,
table : required
delta : 1
cb : required
query = "UPDATE counts SET count=count+? where table_name=?"
@cql
query : query
vals : [new cql.types.Long(opts.delta), opts.table]
cb : opts.cb
# Get count of entries in a table for which we manually maintain the count.
get_table_counter: (opts) =>
opts = defaults opts,
table : required
cb : required # cb(err, count)
@select
table : 'counts'
where : {table_name : opts.table}
columns : ['count']
objectify : false
cb : (err, result) =>
if err
opts.cb(err)
else
if result.length == 0
opts.cb(false, 0)
else
opts.cb(false, result[0][0])
# Compute a count directly from the table.
# ** This is highly inefficient in general and doesn't scale. PAIN. **
count: (opts) ->
opts = defaults opts,
table : required
where : {}
consistency : undefined
cb : required # cb(err, the count if delta=set=undefined)
query = "SELECT COUNT(*) FROM #{opts.table}"
vals = []
if not misc.is_empty_object(opts.where)
where = @_where(opts.where, vals)
query += " WHERE #{where}"
@cql
query : query
vals : vals
consistency : opts.consistency
cb : (err, results) =>
if err
opts.cb(err)
else
opts.cb(undefined, from_cassandra(results[0].get('count')))
update: (opts={}) ->
opts = defaults opts,
table : required
where : required
set : {}
ttl : 0
cb : undefined
consistency : undefined # default...
json : [] # list of columns to convert to JSON
vals = []
set = @_set(opts.set, vals, opts.json)
where = @_where(opts.where, vals, opts.json)
@cql
query : "UPDATE #{opts.table} USING ttl #{opts.ttl} SET #{set} WHERE #{where}"
vals : vals
consistency : opts.consistency
cb : opts.cb
delete: (opts={}) ->
opts = defaults opts,
table : undefined
where : {}
thing : ''
consistency : undefined # default...
cb : undefined
vals = []
where = @_where(opts.where, vals)
@cql
query : "DELETE #{opts.thing} FROM #{opts.table} WHERE #{where}"
vals : vals
consistency : opts.consistency
cb : opts.cb
select: (opts={}) =>
opts = defaults opts,
table : required # string -- the table to query
columns : required # list -- columns to extract
where : undefined # object -- conditions to impose; undefined = return everything
cb : required # callback(error, results)
objectify : false # if false results is a array of arrays (so less redundant); if true, array of objects (so keys redundant)
limit : undefined # if defined, limit the number of results returned to this integer
json : [] # list of columns that should be converted from JSON format
order_by : undefined # if given, adds an "ORDER BY opts.order_by"
consistency : undefined # default...
stream : false
allow_filtering : false
vals = []
query = "SELECT #{opts.columns.join(',')} FROM #{opts.table}"
if opts.where?
where = @_where(opts.where, vals, opts.json)
query += " WHERE #{where} "
if opts.limit?
query += " LIMIT #{opts.limit} "
if opts.order_by?
query += " ORDER BY #{opts.order_by} "
if opts.allow_filtering
query += " ALLOW FILTERING"
@cql
query : query
vals : vals
consistency : opts.consistency
stream : opts.stream
cb : (error, results) =>
if error
opts.cb(error); return
if opts.objectify
x = (misc.pairs_to_obj([col,from_cassandra(r.get(col), col in opts.json)] for col in opts.columns) for r in results)
else
x = ((from_cassandra(r.get(col), col in opts.json) for col in opts.columns) for r in results)
opts.cb(undefined, x)
# Exactly like select (above), but gives an error if there is not exactly one
# row in the table that matches the condition. Also, this returns the one
# rather than an array of length 0.
select_one: (opts={}) =>
cb = opts.cb
opts.cb = (err, results) ->
if err
cb(err)
else if results.length == 0
cb("No row in table '#{opts.table}' matched condition '#{misc.to_json(opts.where)}'")
else if results.length > 1
cb("More than one row in table '#{opts.table}' matched condition '#{misc.to_json(opts.where)}'")
else
cb(false, results[0])
@select(opts)
cql: (opts) =>
opts = defaults opts,
query : required
vals : []
consistency : @consistency
stream : false
fetch_size : 100 # only used for streaming
cb : undefined
#winston.debug("cql: '#{misc.trunc(opts.query,100)}', consistency=#{opts.consistency}, stream=#{opts.stream}")
cb = (err, results) =>
if err?
winston.error("cql ERROR: ('#{opts.query}',params=#{misc.to_json(opts.vals).slice(0,512)}) error = #{err}")
opts.cb?(err, results)
if opts.stream
stream_opts = {fetchSize: opts.fetch_size, autoPage:true, consistency: opts.consistency}
stream = @conn.stream(opts.query, opts.vals, stream_opts)
results = []
last_time = misc.walltime()
stream.on 'readable', () ->
while true
row = this.read()
if row
results.push(row)
if results.length % 250 == 0
t = misc.walltime()
if t - last_time > 1 # at most once per second
last_time = t
winston.debug("cql: '#{misc.trunc(opts.query,256)}' received #{results.length} results...")
else
break
stream.on 'end', () =>
cb(undefined, results)
stream.on 'error', (err) =>
cb(err)
else
@conn.execute opts.query, opts.vals, { consistency: opts.consistency }, (err, results) =>
if not err and results?
results = results.rows
cb(err, results)
cql0: (query, vals, consistency, cb) =>
winston.debug("cql: '#{query}'")
if typeof vals == 'function'
cb = vals
vals = []
consistency = undefined
if typeof consistency == 'function'
cb = consistency
consistency = undefined
if not consistency?
consistency = @consistency
done = false # set to true right before calling cb, so it can only be called once
g = (c) =>
@conn.execute query, vals, { consistency: consistency }, (error, results) =>
if not error
error = undefined # it comes back as null
if not results? # should never happen
error = "no error but no results"
if error?
winston.error("Query cql('#{query}',params=#{misc.to_json(vals).slice(0,1024)}) caused a CQL error:\n#{error}")
# TODO - this test for "ResponseError: Operation timed out" is HORRIBLE.
# The 'any of its parents' is because often when the server is loaded it rejects requests sometimes
# with "no permissions. ... any of its parents".
if error? and ("#{error}".indexOf("peration timed out") != -1 or "#{error}".indexOf("any of its parents") != -1)
winston.error(error)
winston.error("... so (probably) re-doing query")
c(error)
else
if not error
rows = results.rows
if not done
done = true
cb?(error, rows)
c()
f = (c) =>
failed = () =>
m = "query #{query}, params=#{misc.to_json(vals).slice(0,1024)}, timed out with no response at all after #{@query_timeout_s} seconds -- will likely retry"
winston.error(m)
@reconnect (err) =>
c?(m)
c = undefined # ensure only called once
_timer = setTimeout(failed, 1000*@query_timeout_s)
g (err) =>
clearTimeout(_timer)
c?(err)
c = undefined # ensure only called once
# If a query fails due to "Operation timed out", then we will keep retrying, up to @query_max_retry times, with exponential backoff.
# ** This is ABSOLUTELY critical, if we have a loaded system, slow nodes, want to use consistency level > 1, etc, **
# since otherwise all our client code would have to do this...
misc.retry_until_success
f : f
max_tries : @query_max_retry
max_delay : 15000
factor : 1.6
cb : (err) =>
if err
err = "query failed even after #{@query_max_retry} attempts -- giving up -- #{err}"
winston.debug(err)
if not done
done = true
cb?(err)
key_value_store: (opts={}) -> # key_value_store(name:"the name")
new KeyValueStore(@, opts)
uuid_value_store: (opts={}) -> # uuid_value_store(name:"the name")
new UUIDValueStore(@, opts)
uuid_blob_store: (opts={}) -> # uuid_blob_store(name:"the name")
new UUIDBlobStore(@, opts)
chunked_storage: (opts) => # id=uuid
opts.db = @
return new ChunkedStorage(opts)
class exports.Salvus extends exports.Cassandra
constructor: (opts={}) ->
@_touch_project_cache = {}
if not opts.keyspace?
opts.keyspace = 'salvus'
super(opts)
#####################################
# The cluster status monitor
#####################################
# returns array [{host:'10.x.y.z', ..., other data about compute node}, ...]
compute_status: (opts={}) =>
opts = defaults opts,
cb : required
@select_one
table : 'monitor_last'
columns : ['compute']
json : ['compute']
cb : (err, result) =>
if err
opts.cb(err)
else
opts.cb(undefined, result[0])
#####################################
# The log: we log important conceptually meaningful events
# here. This is something we will actually look at.
#####################################
log: (opts={}) ->
opts = defaults opts,
event : required # string
value : required # object (will be JSON'd)
ttl : undefined
cb : undefined
@update
table :'central_log'
set : {event:opts.event, value:to_json(opts.value)}
where : {'time':now()}
cb : opts.cb
get_log: (opts={}) ->
opts = defaults(opts,
start_time : undefined
end_time : undefined
cb : required
)
where = {}
# TODO -- implement restricting the range of times -- this
# isn't trivial because I haven't implemented ranges in
# @select yet, and I don't want to spend a lot of time on this
# right now. maybe just write query using CQL.
@select
table : 'central_log'
where : where
columns : ['time', 'event', 'value']
cb : (error, results) ->
if error
cb(error)
else
cb(false, ({time:r[0], event:r[1], value:from_json(r[2])} for r in results))
##-- search
# all_users: cb(err, array of {first_name:?, last_name:?, account_id:?, search:'names and email thing to search'})
#
# No matter how often all_users is called, it is only updated at most once every 5 minutes, since it is expensive
# to scan the entire database, and the client will typically make numerous requests within seconds for
# different searches. When some time elapses and we get a search, if we have an old cached list in memory, we
# use it and THEN start computing a new one -- so user queries are always answered nearly instantly, but only
# repeated queries will give an up to date result.
#
# Of course, caching means that newly created accounts, or modified account names,
# will not show up in searches for 1 minute. That's
# very acceptable.
#
# This obviously doesn't scale, and will need to be re-written to use some sort of indexing system, or
# possibly only allow searching on email address, or other ways. I don't know yet.
#
all_users: (cb) =>
if @_all_users_fresh?
cb(false, @_all_users); return
if @_all_users?
cb(false, @_all_users)
if @_all_users_computing? and @_all_users?
return
@_all_users_computing = true
@select
table : 'accounts'
columns : ['first_name', 'last_name', 'account_id']
objectify : true
stream : true # since result set may be huge
#consistency : 1 # NO LONGER NEEDED due to streaming support (was: since we really want optimal speed, and missing something temporarily is ok.)
cb : (err, results) =>
if err and not @_all_users?
cb(err); return
v = []
for r in results
if not r.first_name?
r.first_name = ''
if not r.last_name?
r.last_name = ''
search = (r.first_name + ' ' + r.last_name).toLowerCase()
obj = {account_id : r.account_id, first_name:r.first_name, last_name:r.last_name, search:search}
v.push(obj)
delete @_all_users_computing
if not @_all_users?
cb(false,v)
@_all_users = v
@_all_users_fresh = true
f = () =>
delete @_all_users_fresh
setTimeout(f, 5*60000) # cache for 5 minutes
user_search: (opts) =>
opts = defaults opts,
query : required # comma separated list of email addresses or strings such as 'foo bar' (find everything where foo and bar are in the name)
limit : undefined # limit on string queries; email query always returns 0 or 1 result per email address
cb : required # cb(err, list of {account_id:?, first_name:?, last_name:?, email_address:?}), where the
# email_address *only* occurs in search queries that are by email_address -- we do not reveal
# email addresses of users queried by name.
{string_queries, email_queries} = misc.parse_user_search(opts.query)
results = []
async.parallel([
(cb) =>
if email_queries.length == 0
cb(); return
# do email queries -- with exactly two targeted db queries (even if there are hundreds of addresses)
@select
table : 'email_address_to_account_id'
where : {email_address:{'in':email_queries}}
columns : ['account_id']
objectify : false
cb : (err, r) =>
if err
cb(err); return
if r.length == 0
cb(); return
@select
table : 'accounts'
columns : ['account_id', 'first_name', 'last_name', 'email_address']
where : {'account_id':{'in':(x[0] for x in r)}}
objectify : true
cb : (err, r) =>
if err
cb(err)
else
for x in r