Skip to content

Commit

Permalink
Allow processors to log and spew objects larger than 64K
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed May 23, 2017
1 parent 4afffbe commit e317bc7
Show file tree
Hide file tree
Showing 9 changed files with 173 additions and 5 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ __pycache__/

# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
Expand Down
13 changes: 13 additions & 0 deletions datapackage_pipelines/lib/internal/sink.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import collections

from datapackage_pipelines.wrapper import ingest, spew

params, dp, res_iter = ingest()


def sink(res_iter_):
for res in res_iter_:
collections.deque(res, maxlen=0)
yield []

spew(dp, sink(res_iter))
14 changes: 13 additions & 1 deletion datapackage_pipelines/manager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,17 @@

from .runners import runner_config

SINK = os.path.join(os.path.dirname(__file__),
'..', 'lib', 'internal', 'sink.py')

async def enqueue_errors(step, process, queue):
out = process.stderr
while True:
line = await out.readline()
try:
line = await out.readline()
except ValueError:
logging.error('Received a too long log line (>64KB), truncated')
continue
if line == b'':
break
line = line.decode('utf8').rstrip()
Expand Down Expand Up @@ -125,6 +131,12 @@ async def construct_process_pipeline(pipeline_steps, pipeline_cwd, errors):
error_aggregator = \
asyncio.ensure_future(dequeue_errors(error_queue, errors))

pipeline_steps.append({
'run': '(sink)',
'executor': SINK,
'_cache_hash': pipeline_steps[-1]['_cache_hash']
})

for i, step in enumerate(pipeline_steps):

new_rfd, wfd = os.pipe()
Expand Down
5 changes: 5 additions & 0 deletions tests/env/common/pipeline-common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from datapackage_pipelines.wrapper import ingest, spew

params, datapackage, res_iter = ingest()
datapackage['profile'] = 'tabular-data-package'
spew(datapackage, res_iter)
26 changes: 26 additions & 0 deletions tests/env/dummy/big-outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import logging
import itertools
import os

from datapackage_pipelines.wrapper import ingest, spew

params, dp, res_iter = ingest()

big_string = 'z'*64*1024

logging.info('Look at me %s', big_string)

dp['name'] = 'a'
dp['resources'].append({
'name': 'aa%f' % os.getpid(),
'path': 'data/bla.csv',
'schema': {
'fields': [
{'name': 'a', 'type': 'string'}
]
}
})

res = iter([{'a': big_string}])

spew(dp, itertools.chain(res_iter, [res]))
68 changes: 68 additions & 0 deletions tests/env/dummy/pipeline-spec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
pipeline-test-basic:
pipeline:
-
run: add_metadata
parameters:
name: 'al-treasury-spending'
title: 'Albania Treasury Service'
granularity: transactional
countryCode: AL
homepage: 'http://spending.data.al/en/treasuryservice/list/year/2014/inst_code/1005001'
-
run: add_resource
parameters:
name: "treasury"
url: "https://raw.githubusercontent.com/openspending/fiscal-data-package-demos/master/al-treasury-spending/data/treasury.csv"
schema:
fields:
-
name: "Budget Institution"
type: string
-
name: "Supplier"
type: string
-
name: "Treasury Branch"
type: string
-
name: "Value"
type: number
-
name: "Date registered"
type: date
-
name: "Date executed"
type: date
-
name: "Receipt No"
type: string
-
name: "Kategori Shpenzimi"
type: string
-
name: "Receipt Description"
type: string
-
run: stream_remote_resources
-
run: pipeline-test-supplier-titleize
parameters:
key: Supplier
-
run: ..extract-year
parameters:
from-key: "Date executed"
to-key: "Year"
-
run: ..common.pipeline-common
-
run: dump.to_zip
parameters:
out-file: dump.zip


pipeline-test-big-outputs:
pipeline:
- run: big-outputs
- run: big-outputs

17 changes: 17 additions & 0 deletions tests/env/dummy/pipeline-test-supplier-titleize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from datapackage_pipelines.wrapper import ingest, spew

params, datapackage, res_iter = ingest()

key = params['key']


def process_resources(_res_iter):
for res in _res_iter:
def process_res(_res):
for line in _res:
if key in line:
line[key] = line[key].title()
yield line
yield process_res(res)

spew(datapackage, process_resources(res_iter))
27 changes: 27 additions & 0 deletions tests/env/extract-year.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from datapackage_pipelines.wrapper import ingest, spew

params, datapackage, res_iter = ingest()

from_key = params['from-key']
to_key = params['to-key']


def process_resources(_res_iter):
for res in _res_iter:
def process_res(_res):
for line in _res:
if from_key in line:
line[to_key] = line[from_key].year
yield line
yield process_res(res)


for resource in datapackage['resources']:
if len(list(filter(lambda field: field['name'] == from_key, resource.get('schema',{}).get('fields',[])))) > 0:
resource['schema']['fields'].append({
'name': to_key,
'osType': 'date:fiscal-year',
'type': 'integer'
})

spew(datapackage, process_resources(res_iter))
7 changes: 4 additions & 3 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@


def test_pipeline():
'''Tests that what we want for open data is correct.'''
'''Tests a few pipelines.'''
for spec in pipelines():
if spec.pipeline_id == './tests/env/dummy/pipeline-test':
execute_pipeline(spec, use_cache=False)
if spec.pipeline_id.startswith('./tests/env/dummy/pipeline-test'):
success, _ = execute_pipeline(spec, use_cache=False)
assert success

0 comments on commit e317bc7

Please sign in to comment.