diff --git a/mPyPl/core.py b/mPyPl/core.py index ed9f668..bf5b40e 100644 --- a/mPyPl/core.py +++ b/mPyPl/core.py @@ -29,11 +29,15 @@ def apply(datastream, src_field, dst_field, func,eval_strategy=None): """ Applies a function to the specified field of the stream and stores the result in the specified field. Sample usage: `[1,2,3] | as_field('f1') | apply('f1','f2',lambda x: x*x) | select_field('f2') | as_list` + If `dst_field` is `None`, function is just executed on the source field(s), and result is not stored. + This is useful when there are side effects. """ def applier(x): - x[dst_field] = (lambda : __fnapply(x,src_field,func)) if lazy_strategy(eval_strategy) else __fnapply(x,src_field,func) - if eval_strategy: - x.set_eval_strategy(dst_field,eval_strategy) + r = (lambda : __fnapply(x,src_field,func)) if lazy_strategy(eval_strategy) else __fnapply(x,src_field,func) + if dst_field is not None and dst_field!='': + x[dst_field]=r + if eval_strategy: + x.set_eval_strategy(dst_field,eval_strategy) return x return datastream | select(applier) diff --git a/mPyPl/sink.py b/mPyPl/sink.py index 90b1177..825aa13 100644 --- a/mPyPl/sink.py +++ b/mPyPl/sink.py @@ -3,15 +3,18 @@ # Different sinks to consume mdict streams +from pipe import * import csv import json +@Pipe def write_csv(l,filename): - with open(filename,'wb') as f: + with open(filename,'a') as f: w = csv.writer(f) for x in l: w.writerow(x) +@Pipe def write_json(l,filename): with open(filename,'w') as f: f.write(json.dumps(l))