This repository has been archived by the owner on Dec 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 48
/
tasklets.py
1329 lines (1116 loc) · 43.4 KB
/
tasklets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright 2008 The ndb Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""A tasklet decorator.
Tasklets are a way to write concurrently running functions without
threads; tasklets are executed by an event loop and can suspend
themselves blocking for I/O or some other operation using a yield
statement. The notion of a blocking operation is abstracted into the
Future class, but a tasklet may also yield an RPC in order to wait for
that RPC to complete.
The @tasklet decorator wraps generator function so that when it is
called, a Future is returned while the generator is executed by the
event loop. Within the tasklet, any yield of a Future waits for and
returns the Future's result. For example::
@tasklet
def foo():
a = yield <some Future>
b = yield <another Future>
raise Return(a + b)
def main():
f = foo()
x = f.get_result()
print x
Note that blocking until the Future's result is available using
get_result() is somewhat inefficient (though not vastly -- it is not
busy-waiting). In most cases such code should be rewritten as a tasklet
instead::
@tasklet
def main_tasklet():
f = foo()
x = yield f
print x
Calling a tasklet automatically schedules it with the event loop::
def main():
f = main_tasklet()
eventloop.run() # Run until no tasklets left to do
f.done() # Returns True
As a special feature, if the wrapped function is not a generator
function, its return value is returned via the Future. This makes the
following two equivalent::
@tasklet
def foo():
return 42
@tasklet
def foo():
if False: yield # The presence of 'yield' makes foo a generator
raise Return(42) # Or, after PEP 380, return 42
This feature (inspired by Monocle) is handy in case you are
implementing an interface that expects tasklets but you have no need to
suspend -- there's no need to insert a dummy yield in order to make
the tasklet into a generator.
"""
import collections
import logging
import os
import sys
import types
import weakref
from .google_imports import apiproxy_stub_map
from .google_imports import apiproxy_rpc
from .google_imports import datastore
from .google_imports import datastore_errors
from .google_imports import datastore_pbs
from .google_imports import datastore_rpc
from .google_imports import namespace_manager
from .google_imports import callback as _request_callback
from . import eventloop
from . import utils
__all__ = ['Return', 'tasklet', 'synctasklet', 'toplevel', 'sleep',
'add_flow_exception', 'get_return_value',
'get_context', 'set_context',
'make_default_context', 'make_context',
'Future', 'MultiFuture', 'QueueFuture', 'SerialQueueFuture',
'ReducingFuture',
]
_logging_debug = utils.logging_debug
_CALLBACK_KEY = '__CALLBACK__'
# Python 2.5 compatability.
if hasattr(weakref, 'WeakSet'):
_set = weakref.WeakSet
else:
_set = set
def _is_generator(obj):
"""Helper to test for a generator object.
NOTE: This tests for the (iterable) object returned by calling a
generator function, not for a generator function.
"""
return isinstance(obj, types.GeneratorType)
class _State(utils.threading_local):
"""Hold thread-local state."""
def __init__(self):
super(_State, self).__init__()
self.current_context = None
self.all_generators = _set()
self.all_pending = set()
def set_context(self, ctx):
self.current_context = ctx
def add_generator(self, gen):
if _request_callback and _CALLBACK_KEY not in os.environ:
_request_callback.SetRequestEndCallback(self.reset)
os.environ[_CALLBACK_KEY] = '1'
_logging_debug('all_generators: add %s', gen)
self.all_generators.add(gen)
def add_pending(self, fut):
if _request_callback and _CALLBACK_KEY not in os.environ:
_request_callback.SetRequestEndCallback(self.reset)
os.environ[_CALLBACK_KEY] = '1'
_logging_debug('all_pending: add %s', fut)
self.all_pending.add(fut)
def remove_pending(self, fut, status='success'):
if fut in self.all_pending:
_logging_debug('all_pending: %s: remove %s', status, fut)
self.all_pending.remove(fut)
else:
_logging_debug('all_pending: %s: not found %s', status, fut)
def clear_all_generators(self):
if self.all_generators:
_logging_debug('all_generators: clear %s', self.all_generators)
for gen in self.all_generators:
gen.close()
self.all_generators.clear()
else:
_logging_debug('all_generators: clear no-op')
def clear_all_pending(self):
if self.all_pending:
_logging_debug('all_pending: clear %s', self.all_pending)
self.all_pending.clear()
else:
_logging_debug('all_pending: clear no-op')
def dump_all_pending(self, verbose=False):
pending = []
for fut in self.all_pending:
if verbose:
line = fut.dump() + ('\n' + '-' * 40)
else:
line = fut.dump_stack()
pending.append(line)
return '\n'.join(pending)
def reset(self, unused_req_id):
self.current_context = None
ev = eventloop.get_event_loop()
ev.clear()
self.clear_all_pending()
self.clear_all_generators()
_state = _State()
# Tuple of exceptions that should not be logged (except in debug mode).
_flow_exceptions = ()
def add_flow_exception(exc):
"""Add an exception that should not be logged.
The argument must be a subclass of Exception.
"""
global _flow_exceptions
if not isinstance(exc, type) or not issubclass(exc, Exception):
raise TypeError('Expected an Exception subclass, got %r' % (exc,))
as_set = set(_flow_exceptions)
as_set.add(exc)
_flow_exceptions = tuple(as_set)
def _init_flow_exceptions():
"""Internal helper to initialize _flow_exceptions.
This automatically adds webob.exc.HTTPException, if it can be imported.
"""
global _flow_exceptions
_flow_exceptions = ()
add_flow_exception(datastore_errors.Rollback)
try:
from webob import exc
except ImportError:
pass
else:
add_flow_exception(exc.HTTPException)
_init_flow_exceptions()
class Future(object):
"""A Future has 0 or more callbacks.
The callbacks will be called when the result is ready.
NOTE: This is somewhat inspired but not conformant to the Future interface
defined by PEP 3148. It is also inspired (and tries to be somewhat
compatible with) the App Engine specific UserRPC and MultiRpc classes.
"""
# TODO: Trim the API; there are too many ways to do the same thing.
# TODO: Compare to Monocle's much simpler Callback class.
# Constants for state property.
IDLE = apiproxy_rpc.RPC.IDLE # Not yet running (unused)
RUNNING = apiproxy_rpc.RPC.RUNNING # Not yet completed.
FINISHING = apiproxy_rpc.RPC.FINISHING # Completed.
# XXX Add docstrings to all methods. Separate PEP 3148 API from RPC API.
_geninfo = None # Extra info about suspended generator.
def __init__(self, info=None):
# TODO: Make done a method, to match PEP 3148?
# pylint: disable=invalid-name
__ndb_debug__ = 'SKIP' # Hide this frame from self._where
self._info = info # Info from the caller about this Future's purpose.
self._where = utils.get_stack()
self._context = None
self._reset()
def _reset(self):
self._done = False
self._result = None
self._exception = None
self._traceback = None
self._callbacks = []
self._immediate_callbacks = []
_state.add_pending(self)
self._next = None # Links suspended Futures together in a stack.
# TODO: Add a __del__ that complains if neither get_exception() nor
# check_success() was ever called? What if it's not even done?
def __repr__(self):
if self._done:
if self._exception is not None:
state = 'exception %s: %s' % (self._exception.__class__.__name__,
self._exception)
else:
state = 'result %r' % (self._result,)
else:
state = 'pending'
line = '?'
for line in self._where:
if 'tasklets.py' not in line:
break
if self._info:
line += ' for %s' % self._info
if self._geninfo:
line += ' %s' % self._geninfo
return '<%s %x created by %s; %s>' % (
self.__class__.__name__, id(self), line, state)
def dump(self):
return '%s\nCreated by %s' % (self.dump_stack(),
'\n called by '.join(self._where))
def dump_stack(self):
lines = []
fut = self
while fut is not None:
lines.append(str(fut))
fut = fut._next
return '\n waiting for '.join(lines)
def add_callback(self, callback, *args, **kwds):
if self._done:
eventloop.queue_call(None, callback, *args, **kwds)
else:
self._callbacks.append((callback, args, kwds))
def add_immediate_callback(self, callback, *args, **kwds):
if self._done:
callback(*args, **kwds)
else:
self._immediate_callbacks.append((callback, args, kwds))
def set_result(self, result):
if self._done:
raise RuntimeError('Result cannot be set twice.')
self._result = result
self._done = True
_state.remove_pending(self)
for callback, args, kwds in self._immediate_callbacks:
callback(*args, **kwds)
for callback, args, kwds in self._callbacks:
eventloop.queue_call(None, callback, *args, **kwds)
def set_exception(self, exc, tb=None):
if not isinstance(exc, BaseException):
raise TypeError('exc must be an Exception; received %r' % exc)
if self._done:
raise RuntimeError('Exception cannot be set twice.')
self._exception = exc
self._traceback = tb
self._done = True
_state.remove_pending(self, status='fail')
for callback, args, kwds in self._immediate_callbacks:
callback(*args, **kwds)
for callback, args, kwds in self._callbacks:
eventloop.queue_call(None, callback, *args, **kwds)
def done(self):
return self._done
@property
def state(self):
# This is just for compatibility with UserRPC and MultiRpc.
# A Future is considered running as soon as it is created.
if self._done:
return self.FINISHING
else:
return self.RUNNING
def wait(self):
if self._done:
return
ev = eventloop.get_event_loop()
while not self._done:
if not ev.run1():
logging.info('Deadlock in %s', self)
logging.info('All pending Futures:\n%s', _state.dump_all_pending())
_logging_debug('All pending Futures (verbose):\n%s',
_state.dump_all_pending(verbose=True))
self.set_exception(RuntimeError('Deadlock waiting for %s' % self))
def get_exception(self):
self.wait()
return self._exception
def get_traceback(self):
self.wait()
return self._traceback
def check_success(self):
self.wait()
if self._exception is not None:
raise self._exception.__class__, self._exception, self._traceback
def get_result(self):
self.check_success()
return self._result
# TODO: Have a tasklet that does this
@classmethod
def wait_any(cls, futures):
# TODO: Flatten MultiRpcs.
waiting_on = set(futures)
ev = eventloop.get_event_loop()
while waiting_on:
for f in waiting_on:
if f.state == cls.FINISHING:
return f
ev.run1()
return None
# TODO: Have a tasklet that does this
@classmethod
def wait_all(cls, futures):
# TODO: Flatten MultiRpcs.
waiting_on = set(futures)
ev = eventloop.get_event_loop()
while waiting_on:
waiting_on = set(f for f in waiting_on if f.state == cls.RUNNING)
ev.run1()
def _help_tasklet_along(self, ns, ds_conn, gen, val=None, exc=None, tb=None):
# XXX Docstring
info = utils.gen_info(gen)
# pylint: disable=invalid-name
__ndb_debug__ = info
try:
save_context = get_context()
save_namespace = namespace_manager.get_namespace()
save_ds_connection = datastore._GetConnection()
try:
set_context(self._context)
if ns != save_namespace:
namespace_manager.set_namespace(ns)
if ds_conn is not save_ds_connection:
datastore._SetConnection(ds_conn)
if exc is not None:
_logging_debug('Throwing %s(%s) into %s',
exc.__class__.__name__, exc, info)
value = gen.throw(exc.__class__, exc, tb)
else:
_logging_debug('Sending %r to %s', val, info)
value = gen.send(val)
self._context = get_context()
finally:
ns = namespace_manager.get_namespace()
ds_conn = datastore._GetConnection()
set_context(save_context)
if save_namespace != ns:
namespace_manager.set_namespace(save_namespace)
if save_ds_connection is not ds_conn:
datastore._SetConnection(save_ds_connection)
except StopIteration, err:
result = get_return_value(err)
_logging_debug('%s returned %r', info, result)
self.set_result(result)
return
except GeneratorExit:
# In Python 2.5, this derives from Exception, but we don't want
# to handle it like other Exception instances. So we catch and
# re-raise it immediately. See issue 127. http://goo.gl/2p5Pn
# TODO: Remove when Python 2.5 is no longer supported.
raise
except Exception, err:
_, _, tb = sys.exc_info()
if isinstance(err, _flow_exceptions):
# Flow exceptions aren't logged except in "heavy debug" mode,
# and then only at DEBUG level, without a traceback.
_logging_debug('%s raised %s(%s)',
info, err.__class__.__name__, err)
elif utils.DEBUG and logging.getLogger().level < logging.DEBUG:
# In "heavy debug" mode, log a warning with traceback.
# (This is the same condition as used in utils.logging_debug().)
logging.warning('%s raised %s(%s)',
info, err.__class__.__name__, err, exc_info=True)
else:
# Otherwise, log a warning without a traceback.
logging.warning('%s raised %s(%s)', info, err.__class__.__name__, err)
self.set_exception(err, tb)
return
else:
_logging_debug('%s yielded %r', info, value)
if isinstance(value, (apiproxy_stub_map.UserRPC,
datastore_rpc.MultiRpc)):
# TODO: Tail recursion if the RPC is already complete.
eventloop.queue_rpc(value, self._on_rpc_completion,
value, ns, ds_conn, gen)
return
if isinstance(value, Future):
# TODO: Tail recursion if the Future is already done.
if self._next:
raise RuntimeError('Future has already completed yet next is %r' %
self._next)
self._next = value
self._geninfo = utils.gen_info(gen)
_logging_debug('%s is now blocked waiting for %s', self, value)
value.add_callback(self._on_future_completion, value, ns, ds_conn, gen)
return
if isinstance(value, (tuple, list)):
# Arrange for yield to return a list of results (not Futures).
info = 'multi-yield from %s' % utils.gen_info(gen)
mfut = MultiFuture(info)
try:
for subfuture in value:
mfut.add_dependent(subfuture)
mfut.complete()
except GeneratorExit:
raise
except Exception, err:
_, _, tb = sys.exc_info()
mfut.set_exception(err, tb)
mfut.add_callback(self._on_future_completion, mfut, ns, ds_conn, gen)
return
if _is_generator(value):
# TODO: emulate PEP 380 here?
raise NotImplementedError('Cannot defer to another generator.')
raise RuntimeError('A tasklet should not yield a plain value: '
'%.200s yielded %.200r' % (info, value))
def _on_rpc_completion(self, rpc, ns, ds_conn, gen):
try:
result = rpc.get_result()
except GeneratorExit:
raise
except Exception, err:
_, _, tb = sys.exc_info()
self._help_tasklet_along(ns, ds_conn, gen, exc=err, tb=tb)
else:
self._help_tasklet_along(ns, ds_conn, gen, result)
def _on_future_completion(self, future, ns, ds_conn, gen):
if self._next is future:
self._next = None
self._geninfo = None
_logging_debug('%s is no longer blocked waiting for %s', self, future)
exc = future.get_exception()
if exc is not None:
self._help_tasklet_along(ns, ds_conn, gen,
exc=exc, tb=future.get_traceback())
else:
val = future.get_result() # This won't raise an exception.
self._help_tasklet_along(ns, ds_conn, gen, val)
def sleep(dt):
"""Public function to sleep some time.
Example:
yield tasklets.sleep(0.5) # Sleep for half a sec.
"""
fut = Future('sleep(%.3f)' % dt)
eventloop.queue_call(dt, fut.set_result, None)
return fut
class MultiFuture(Future):
"""A Future that depends on multiple other Futures.
This is used internally by 'v1, v2, ... = yield f1, f2, ...'; the
semantics (e.g. error handling) are constrained by that use case.
The protocol from the caller's POV is::
mf = MultiFuture()
mf.add_dependent(<some other Future>) -OR- mf.putq(<some value>)
mf.add_dependent(<some other Future>) -OR- mf.putq(<some value>)
.
. (More mf.add_dependent() and/or mf.putq() calls)
.
mf.complete() # No more dependents will be added.
.
. (Time passes)
.
results = mf.get_result()
Now, results is a list of results from all dependent Futures in
the order in which they were added.
It is legal to add the same dependent multiple times.
Callbacks can be added at any point.
From a dependent Future POV, there's nothing to be done: a callback
is automatically added to each dependent Future which will signal
its completion to the MultiFuture.
Error handling: if any dependent future raises an error, it is
propagated to mf. To force an early error, you can call
mf.set_exception() instead of mf.complete(). After this you can't
call mf.add_dependent() or mf.putq() any more.
"""
def __init__(self, info=None):
# pylint: disable=invalid-name
__ndb_debug__ = 'SKIP' # Hide this frame from self._where
self._full = False
self._dependents = set()
self._results = []
super(MultiFuture, self).__init__(info=info)
def __repr__(self):
# TODO: This may be invoked before __init__() returns,
# from Future.__init__(). Beware.
line = super(MultiFuture, self).__repr__()
lines = [line]
for fut in self._results:
lines.append(fut.dump_stack().replace('\n', '\n '))
return '\n waiting for '.join(lines)
# TODO: Maybe rename this method, since completion of a Future/RPC
# already means something else. But to what?
def complete(self):
if self._full:
raise RuntimeError('MultiFuture cannot complete twice.')
self._full = True
if not self._dependents:
self._finish()
# TODO: Maybe don't overload set_exception() with this?
def set_exception(self, exc, tb=None):
self._full = True
super(MultiFuture, self).set_exception(exc, tb)
def _finish(self):
if not self._full:
raise RuntimeError('MultiFuture cannot finish until completed.')
if self._dependents:
raise RuntimeError('MultiFuture cannot finish whilst waiting for '
'dependents %r' % self._dependents)
if self._done:
raise RuntimeError('MultiFuture done before finishing.')
try:
result = [r.get_result() for r in self._results]
except GeneratorExit:
raise
except Exception, err:
_, _, tb = sys.exc_info()
self.set_exception(err, tb)
else:
self.set_result(result)
def putq(self, value):
if isinstance(value, Future):
fut = value
else:
fut = Future()
fut.set_result(value)
self.add_dependent(fut)
def add_dependent(self, fut):
if isinstance(fut, list):
mfut = MultiFuture()
map(mfut.add_dependent, fut)
mfut.complete()
fut = mfut
elif not isinstance(fut, Future):
raise TypeError('Expected Future, received %s: %r' % (type(fut), fut))
if self._full:
raise RuntimeError('MultiFuture cannot add a dependent once complete.')
self._results.append(fut)
if fut not in self._dependents:
self._dependents.add(fut)
fut.add_callback(self._signal_dependent_done, fut)
def _signal_dependent_done(self, fut):
self._dependents.remove(fut)
if self._full and not self._dependents and not self._done:
self._finish()
class QueueFuture(Future):
"""A Queue following the same protocol as MultiFuture.
However, instead of returning results as a list, it lets you
retrieve results as soon as they are ready, one at a time, using
getq(). The Future itself finishes with a result of None when the
last result is ready (regardless of whether it was retrieved).
The getq() method returns a Future which blocks until the next
result is ready, and then returns that result. Each getq() call
retrieves one unique result. Extra getq() calls after the last
result is already returned return EOFError as their Future's
exception. (I.e., q.getq() returns a Future as always, but yieding
that Future raises EOFError.)
NOTE: Values can also be pushed directly via .putq(value). However
there is no flow control -- if the producer is faster than the
consumer, the queue will grow unbounded.
"""
# TODO: Refactor to share code with MultiFuture.
def __init__(self, info=None):
self._full = False
self._dependents = set()
self._completed = collections.deque()
self._waiting = collections.deque()
# Invariant: at least one of _completed and _waiting is empty.
# Also: _full and not _dependents <==> _done.
super(QueueFuture, self).__init__(info=info)
# TODO: __repr__
def complete(self):
if self._full:
raise RuntimeError('MultiFuture cannot complete twice.')
self._full = True
if not self._dependents:
self.set_result(None)
self._mark_finished()
def set_exception(self, exc, tb=None):
self._full = True
super(QueueFuture, self).set_exception(exc, tb)
if not self._dependents:
self._mark_finished()
def putq(self, value):
if isinstance(value, Future):
fut = value
else:
fut = Future()
fut.set_result(value)
self.add_dependent(fut)
def add_dependent(self, fut):
if not isinstance(fut, Future):
raise TypeError('fut must be a Future instance; received %r' % fut)
if self._full:
raise RuntimeError('QueueFuture add dependent once complete.')
if fut not in self._dependents:
self._dependents.add(fut)
fut.add_callback(self._signal_dependent_done, fut)
def _signal_dependent_done(self, fut):
if not fut.done():
raise RuntimeError('Future not done before signalling dependant done.')
self._dependents.remove(fut)
exc = fut.get_exception()
tb = fut.get_traceback()
val = None
if exc is None:
val = fut.get_result()
if self._waiting:
waiter = self._waiting.popleft()
self._pass_result(waiter, exc, tb, val)
else:
self._completed.append((exc, tb, val))
if self._full and not self._dependents and not self._done:
self.set_result(None)
self._mark_finished()
def _mark_finished(self):
if not self.done():
raise RuntimeError('Future not done before marking as finished.')
while self._waiting:
waiter = self._waiting.popleft()
self._pass_eof(waiter)
def getq(self):
fut = Future()
if self._completed:
exc, tb, val = self._completed.popleft()
self._pass_result(fut, exc, tb, val)
elif self._full and not self._dependents:
self._pass_eof(fut)
else:
self._waiting.append(fut)
return fut
def _pass_eof(self, fut):
if not self._done:
raise RuntimeError('QueueFuture cannot pass EOF until done.')
exc = self.get_exception()
if exc is not None:
tb = self.get_traceback()
else:
exc = EOFError('Queue is empty')
tb = None
self._pass_result(fut, exc, tb, None)
def _pass_result(self, fut, exc, tb, val):
if exc is not None:
fut.set_exception(exc, tb)
else:
fut.set_result(val)
class SerialQueueFuture(Future):
"""Like QueueFuture but maintains the order of insertion.
This class is used by Query operations.
Invariants:
- At least one of _queue and _waiting is empty.
- The Futures in _waiting are always pending.
(The Futures in _queue may be pending or completed.)
In the discussion below, add_dependent() is treated the same way as
putq().
If putq() is ahead of getq(), the situation is like this:
putq()
v
_queue: [f1, f2, ...]; _waiting: []
^
getq()
Here, putq() appends a Future to the right of _queue, and getq()
removes one from the left.
If getq() is ahead of putq(), it's like this:
putq()
v
_queue: []; _waiting: [f1, f2, ...]
^
getq()
Here, putq() removes a Future from the left of _waiting, and getq()
appends one to the right.
When both are empty, putq() appends a Future to the right of _queue,
while getq() appends one to the right of _waiting.
The _full flag means that no more calls to putq() will be made; it
is set by calling either complete() or set_exception().
Calling complete() signals that no more putq() calls will be made.
If getq() is behind, subsequent getq() calls will eat up _queue
until it is empty, and after that will return a Future that passes
EOFError (note that getq() itself never raises EOFError). If getq()
is ahead when complete() is called, the Futures in _waiting are all
passed an EOFError exception (thereby eating up _waiting).
If, instead of complete(), set_exception() is called, the exception
and traceback set there will be used instead of EOFError.
"""
def __init__(self, info=None):
self._queue = collections.deque()
self._waiting = collections.deque()
super(SerialQueueFuture, self).__init__(info=info)
# TODO: __repr__
def complete(self):
while self._waiting:
waiter = self._waiting.popleft()
waiter.set_exception(EOFError('Queue is empty'))
# When the writer is complete the future will also complete. If there are
# still pending queued futures, these futures are themselves in the pending
# list, so they will eventually be executed.
self.set_result(None)
def set_exception(self, exc, tb=None):
super(SerialQueueFuture, self).set_exception(exc, tb)
while self._waiting:
waiter = self._waiting.popleft()
waiter.set_exception(exc, tb)
def putq(self, value):
if isinstance(value, Future):
fut = value
else:
if self._waiting:
waiter = self._waiting.popleft()
waiter.set_result(value)
return
fut = Future()
fut.set_result(value)
self.add_dependent(fut)
def add_dependent(self, fut):
if not isinstance(fut, Future):
raise TypeError('fut must be a Future instance; received %r' % fut)
if self._done:
raise RuntimeError('SerialQueueFuture cannot add dependent '
'once complete.')
if self._waiting:
waiter = self._waiting.popleft()
fut.add_callback(_transfer_result, fut, waiter)
else:
self._queue.append(fut)
def getq(self):
if self._queue:
fut = self._queue.popleft()
else:
fut = Future()
if self._done:
err = self.get_exception()
if err is not None:
tb = self.get_traceback()
else:
err = EOFError('Queue is empty')
tb = None
fut.set_exception(err, tb)
else:
self._waiting.append(fut)
return fut
def _transfer_result(fut1, fut2):
"""Helper to transfer result or errors from one Future to another."""
exc = fut1.get_exception()
if exc is not None:
tb = fut1.get_traceback()
fut2.set_exception(exc, tb)
else:
val = fut1.get_result()
fut2.set_result(val)
class ReducingFuture(Future):
"""A Queue following the same protocol as MultiFuture.
However the result, instead of being a list of results of dependent
Futures, is computed by calling a 'reducer' tasklet. The reducer tasklet
takes a list of values and returns a single value. It may be called
multiple times on sublists of values and should behave like
e.g. sum().
NOTE: The reducer input values may be reordered compared to the
order in which they were added to the queue.
"""
# TODO: Refactor to reuse some code with MultiFuture.
def __init__(self, reducer, info=None, batch_size=20):
self._reducer = reducer
self._batch_size = batch_size
self._full = False
self._dependents = set()
self._completed = collections.deque()
self._queue = collections.deque()
super(ReducingFuture, self).__init__(info=info)
# TODO: __repr__
def complete(self):
if self._full:
raise RuntimeError('ReducingFuture cannot complete twice.')
self._full = True
if not self._dependents:
self._mark_finished()
def set_exception(self, exc, tb=None):
self._full = True
self._queue.clear()
super(ReducingFuture, self).set_exception(exc, tb)
def putq(self, value):
if isinstance(value, Future):
fut = value
else:
fut = Future()
fut.set_result(value)
self.add_dependent(fut)
def add_dependent(self, fut):
if self._full:
raise RuntimeError('ReducingFuture cannot add dependent once complete.')
self._internal_add_dependent(fut)
def _internal_add_dependent(self, fut):
if not isinstance(fut, Future):
raise TypeError('fut must be a Future; received %r' % fut)
if fut not in self._dependents:
self._dependents.add(fut)
fut.add_callback(self._signal_dependent_done, fut)
def _signal_dependent_done(self, fut):
if not fut.done():
raise RuntimeError('Future not done before signalling dependant done.')
self._dependents.remove(fut)
if self._done:
return # Already done.
try:
val = fut.get_result()
except GeneratorExit:
raise
except Exception, err:
_, _, tb = sys.exc_info()
self.set_exception(err, tb)
return
self._queue.append(val)
if len(self._queue) >= self._batch_size:
todo = list(self._queue)
self._queue.clear()
try:
nval = self._reducer(todo)
except GeneratorExit:
raise
except Exception, err:
_, _, tb = sys.exc_info()
self.set_exception(err, tb)
return
if isinstance(nval, Future):
self._internal_add_dependent(nval)
else:
self._queue.append(nval)
if self._full and not self._dependents:
self._mark_finished()
def _mark_finished(self):
if not self._queue:
self.set_result(None)