diff --git a/.gitignore b/.gitignore index 7fdea58..d61ae81 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,7 @@ *.pyc *.egg-info + +build/ +imposm/parser/pbf/OSMPBF.so +imposm/parser/pbf/osm.pb.cc +imposm/parser/pbf/osm.pb.h \ No newline at end of file diff --git a/imposm/parser/pbf/multiproc.py b/imposm/parser/pbf/multiproc.py index b5a0048..77a86d1 100644 --- a/imposm/parser/pbf/multiproc.py +++ b/imposm/parser/pbf/multiproc.py @@ -42,13 +42,16 @@ class PBFMultiProcParser(object): relations_tag_filter = None def __init__(self, pool_size, nodes_queue=None, ways_queue=None, - relations_queue=None, coords_queue=None, marshal_elem_data=False): + relations_queue=None, coords_queue=None, marshal_elem_data=False, + with_metadata=False): self.pool_size = pool_size self.nodes_callback = nodes_queue.put if nodes_queue else None self.ways_callback = ways_queue.put if ways_queue else None self.relations_callback = relations_queue.put if relations_queue else None self.coords_callback = coords_queue.put if coords_queue else None self.marshal = marshal_elem_data + self.with_metadata = with_metadata + def parse(self, filename): pos_queue = multiprocessing.JoinableQueue(32) pool = [] diff --git a/imposm/parser/pbf/parser.py b/imposm/parser/pbf/parser.py index d4f8d01..0b1a33d 100644 --- a/imposm/parser/pbf/parser.py +++ b/imposm/parser/pbf/parser.py @@ -1,11 +1,11 @@ # Copyright 2011 Omniscale GmbH & Co. KG -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -35,20 +35,22 @@ class PBFParser(object): """ OSM PBF parser. - :param xxx_callback: + :param xxx_callback: callback functions for coords, nodes, ways and relations. Each callback function gets called with a list of multiple elements. - + :param xxx_filter: functions that can manipulate the tag dictionary. Nodes and relations without tags will not passed to the callback. - + :param marshal: return the data as a marshaled string """ def __init__(self, nodes_callback=None, ways_callback=None, - relations_callback=None, coords_callback=None, nodes_tag_filter=None, - ways_tag_filter=None, relations_tag_filter=None, marshal=False): + relations_callback=None, coords_callback=None, + nodes_tag_filter=None, ways_tag_filter=None, + relations_tag_filter=None, marshal=False, + with_metadata=False): self.nodes_callback = nodes_callback self.ways_callback = ways_callback self.relations_callback = relations_callback @@ -57,24 +59,28 @@ def __init__(self, nodes_callback=None, ways_callback=None, self.ways_tag_filter = ways_tag_filter self.relations_tag_filter = relations_tag_filter self.marshal = marshal - + self.with_metadata = with_metadata + if self.with_metadata: + raise Exception("PBFParser doesn't support parsing of metadata. " + "You set with_metadata on True") + def parse(self, filename, offset, size): """ Parse primitive block from `filename`. - + :param filename: path to PBF file :param offset: byte offset of the primitive block to parse :param size: size in bytes of the primitive block to parse """ reader = PrimitiveBlockParser(filename, offset, size) - + if self.nodes_callback or self.coords_callback: self.handle_nodes(reader) if self.ways_callback: self.handle_ways(reader) if self.relations_callback: self.handle_relations(reader) - + def handle_nodes(self, reader): nodes = [] coords = [] @@ -142,14 +148,14 @@ def decoded_stringtable(stringtable): class PrimitiveBlockParser(object): """ Low level PBF primitive block parser. - + Parses a single primitive block and handles OSM PBF internals like dense nodes, delta encoding, stringtables, etc. - + :param filename: path to PBF file :param offset: byte offset of the primitive block to parse :param size: size in bytes of the primitive block to parse - + """ def __init__(self, filename, blob_pos, blob_size): self.pos = filename, blob_pos, blob_size @@ -158,17 +164,17 @@ def __init__(self, filename, blob_pos, blob_size): self.primitive_block.ParseFromString(data) self.primitivegroup = self.primitive_block.primitivegroup self.stringtable = decoded_stringtable(self.primitive_block.stringtable.s) - + def __repr__(self): return '' % (self.pos, ) - + def _get_tags(self, element, pos): tags = {} key = None value = None keyflag = False if pos >= len(element): - return {}, pos + return {}, pos while True: key_val = element[pos] pos += 1 @@ -182,11 +188,11 @@ def _get_tags(self, element, pos): tags[self.stringtable[key]] = self.stringtable[value] keyflag = False return tags, pos - + def nodes(self): """ Return an iterator for all *nodes* in this primitive block. - + :rtype: iterator of ``(osm_id, tags, (lon, lat))`` tuples """ for group in self.primitivegroup: @@ -216,11 +222,11 @@ def nodes(self): for i in xrange(len(keys)): tags.append((self.stringtable[keys[i]], self.stringtable[vals[i]])) yield (node.id, tags, (node.lon, node.lat)) - + def ways(self): """ Return an iterator for all *ways* in this primitive block. - + :rtype: iterator of ``(osm_id, tags, [ref1, ref2, ...])`` tuples """ for group in self.primitivegroup: @@ -230,7 +236,7 @@ def ways(self): keys = way.keys vals = way.vals delta_refs = way.refs - + tags = {} for i in xrange(len(keys)): tags[self.stringtable[keys[i]]] = self.stringtable[vals[i]] @@ -240,13 +246,13 @@ def ways(self): ref += delta refs.append(ref) yield (way.id, tags, refs) - + def relations(self): """ Return an iterator for all *relations* in this primitive block. - + :rtype: iterator of ``(osm_id, tags, [(ref1, type, role), ...])`` tuples - + """ for group in self.primitivegroup: relations = group.relations @@ -266,13 +272,13 @@ def relations(self): for i in xrange(len(keys)): tags[self.stringtable[keys[i]]] = self.stringtable[vals[i]] yield (relation.id, tags, members) - + class PBFHeader(object): def __init__(self, filename, blob_pos, blob_size): data = read_blob_data(filename, blob_pos, blob_size) self.header_block = OSMPBF.HeaderBlock() self.header_block.ParseFromString(data) - + def required_features(self): return set(self.header_block.required_features) @@ -284,7 +290,7 @@ def read_blob_data(filename, blob_pos, blob_size): with open(filename, 'rb') as f: f.seek(blob_pos) blob_data = f.read(blob_size) - + blob = OSMPBF.Blob() blob.ParseFromString(blob_data) raw_data = blob.raw @@ -297,10 +303,10 @@ def read_blob_data(filename, blob_pos, blob_size): class PBFFile(object): """ OSM PBF file reader. - + Parses the low-level file structure with header sizes, offsets and blob headers. - + :param filename: path to the PBF file """ def __init__(self, filename): @@ -310,7 +316,7 @@ def __init__(self, filename): header_offsets = self._skip_header() self.header = PBFHeader(self.filename, header_offsets['blob_pos'], header_offsets['blob_size']) self.check_features() - + def check_features(self): missing_features = self.header.required_features().difference(SUPPORTED_FEATURES) if missing_features: @@ -318,32 +324,32 @@ def check_features(self): '%s requires features not implemented by this parser: %s' % (self.filename, ', '.join(missing_features)) ) - + def _skip_header(self): return self.blob_offsets().next() - + def seek(self, pos): self.next_blob_pos = pos - + def rewind(self): self.next_blob_pos = self.prev_blob_pos - + def blob_offsets(self): """ Returns an iterator of the blob offsets in this file. - + Each offsets is stored in a dictionary with: - + - `filename` the path of this PBF file. - `blob_pos` the byte offset - `blob_size` the size of this blob in bytes """ while True: self.file.seek(self.next_blob_pos) - + blob_header_size = self._blob_header_size() if not blob_header_size: break - + blob_size = self._blob_size(self.file.read(blob_header_size)) blob_pos = self.next_blob_pos + 4 + blob_header_size blob_header_pos=self.next_blob_pos, @@ -354,22 +360,22 @@ def blob_offsets(self): blob_header_pos=blob_header_pos, prev_blob_header_pos=prev_blob_header_pos, filename=self.filename) - + def primitive_block_parsers(self): """ Returns an iterator of PrimitiveBlockParser. """ for pos in self.blob_offsets(): yield PrimitiveBlockParser(self.filename, pos['blob_pos'], pos['blob_size']) - + def _blob_size(self, data): blob_header = OSMPBF.BlobHeader() blob_header.ParseFromString(data) return blob_header.datasize - + def _blob_header_size(self): bytes = self.file.read(4) - if bytes: + if bytes: return struct.unpack('!i', bytes)[0] return None @@ -393,5 +399,5 @@ def read_pbf(filename): times = t.repeat(r,n) avrg_times = [] for time in times: - avrg_times.append(time/n) + avrg_times.append(time/n) print "avrg time/call: %f" %(min(avrg_times)) diff --git a/imposm/parser/simple.py b/imposm/parser/simple.py index 09c74ee..de31cf0 100644 --- a/imposm/parser/simple.py +++ b/imposm/parser/simple.py @@ -1,11 +1,11 @@ # Copyright 2011 Omniscale GmbH & Co. KG -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,23 +25,25 @@ class OSMParser(object): """ High-level OSM parser. - + :param concurrency: number of parser processes to start. Defaults to the number of CPUs. - :param xxx_callback: + :param xxx_callback: callback functions for coords, nodes, ways and relations. Each callback function gets called with a list of multiple elements. See :ref:`callback concepts `. - + :param xxx_filter: functions that can manipulate the tag dictionary. Nodes and relations without tags will not passed to the callback. See :ref:`tag filter concepts `. - + """ - def __init__(self, concurrency=None, nodes_callback=None, ways_callback=None, - relations_callback=None, coords_callback=None, nodes_tag_filter=None, - ways_tag_filter=None, relations_tag_filter=None, marshal_elem_data=False): + def __init__(self, concurrency=None, nodes_callback=None, + ways_callback=None, relations_callback=None, + coords_callback=None, nodes_tag_filter=None, + ways_tag_filter=None, relations_tag_filter=None, + marshal_elem_data=False, with_metadata=False): self.concurrency = concurrency or default_concurrency() assert self.concurrency >= 1 self.nodes_callback = nodes_callback @@ -52,7 +54,8 @@ def __init__(self, concurrency=None, nodes_callback=None, ways_callback=None, self.ways_tag_filter = ways_tag_filter self.relations_tag_filter = relations_tag_filter self.marshal_elem_data = marshal_elem_data - + self.with_metadata = with_metadata + def parse(self, filename): """ Parse the given file. Detects the filetype based on the file suffix. @@ -64,14 +67,14 @@ def parse(self, filename): return self.parse_xml_file(filename) else: raise NotImplementedError('unknown file extension') - + def parse_pbf_file(self, filename): """ Parse a PBF file. """ from imposm.parser.pbf.multiproc import PBFMultiProcParser return self._parse(filename, PBFMultiProcParser) - + def parse_xml_file(self, filename): """ Parse a XML file. @@ -80,7 +83,7 @@ def parse_xml_file(self, filename): from imposm.parser.xml.multiproc import XMLMultiProcParser with fileinput(filename) as input: return self._parse(input, XMLMultiProcParser) - + def _parse(self, input, parser_class): queues_callbacks = {} if self.coords_callback: @@ -99,13 +102,14 @@ def _parse(self, input, parser_class): def parse_it(): setproctitle('imposm parser') queues = dict([(type, q) for type, (q, c) in queues_callbacks.items()]) - + parser = parser_class(self.concurrency, ways_queue=queues.get('ways'), coords_queue=queues.get('coords'), nodes_queue=queues.get('nodes'), relations_queue=queues.get('relations'), - marshal_elem_data=self.marshal_elem_data + marshal_elem_data=self.marshal_elem_data, + with_metadata=self.with_metadata ) parser.nodes_tag_filter = self.nodes_tag_filter parser.ways_tag_filter = self.ways_tag_filter @@ -113,10 +117,10 @@ def parse_it(): parser.parse(input) for q in queues.values(): q.put(None) - + proc = multiprocessing.Process(target=parse_it) proc.start() - + while queues_callbacks: processed = False for items_type, (queue, callback) in queues_callbacks.items(): diff --git a/imposm/parser/test/test_simple_parser.py b/imposm/parser/test/test_simple_parser.py index 24e6fa6..36ff3c6 100644 --- a/imposm/parser/test/test_simple_parser.py +++ b/imposm/parser/test/test_simple_parser.py @@ -1,12 +1,12 @@ # -:- encoding: utf8 -:- # Copyright 2011 Omniscale GmbH & Co. KG -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ import os from imposm.parser import OSMParser +from imposm.parser.util import OSMMetadata from nose.tools import eq_ class ParserTestBase(object): @@ -27,7 +28,7 @@ def __init__(self): self.coords = [] self.ways = [] self.relations = [] - + def parse_nodes(self, nodes): self.nodes.extend(nodes) def parse_coords(self, coords): @@ -46,54 +47,66 @@ def parse(self): nodes_tag_filter=self.nodes_filter, ways_tag_filter=self.ways_filter, relations_tag_filter=self.relations_filter, + with_metadata=True ) osm_filename = os.path.join(os.path.dirname(__file__), self.osm_filename) parser.parse(osm_filename) - + def test_parse_result(self): self.parse() eq_(len(self.nodes), 1) eq_(self.nodes[0], - (2, {'name': 'test', 'created_by': 'hand'}, (10.0, 51.0))) - + (2, {'name': 'test', 'created_by': 'hand'}, (10.0, 51.0), + OSMMetadata(1, 1, '2011-12-16T13:24:15Z', 1, 'testbot')) + ) + eq_(len(self.coords), 2) eq_(self.coords[0], (1, 10.0, 50.0)) eq_(self.coords[1], (2, 10.0, 51.0)) - + eq_(len(self.ways), 1) eq_(self.ways[0], - (3, {'highway': 'primary'}, [1, 2])) + (3, {'highway': 'primary'}, [1, 2], + OSMMetadata(3, 5, '2010-07-16T17:36:18Z', 2, 'testbot')) + ) eq_(len(self.relations), 1) eq_(self.relations[0], - (4, {'name': u'ܵlåû†é'}, [(123, 'way', 'outer'), (124, 'way', 'inner')])) + (4, {'name': u'ܵlåû†é'}, [(123, 'way', 'outer'), (124, 'way', 'inner')], + OSMMetadata(2, 4, '2010-05-20T19:38:47Z', 1, 'testbot')) + ) + class ParserTestBaseWithFilter(ParserTestBase): def nodes_filter(self, tags): for tag in tags.keys(): if tag != 'name': del tags[tag] - + ways_filter = nodes_filter def relations_filter(self, tags): tags.clear() - + def test_parse_result(self): self.parse() eq_(len(self.nodes), 1) eq_(self.nodes[0], - (2, {'name': 'test'}, (10.0, 51.0))) - + (2, {'name': 'test'}, (10.0, 51.0), + OSMMetadata(1, 1, '2011-12-16T13:24:15Z', 1, 'testbot')) + ) + eq_(len(self.coords), 2) eq_(self.coords[0], (1, 10.0, 50.0)) eq_(self.coords[1], (2, 10.0, 51.0)) - + eq_(len(self.ways), 1) eq_(self.ways[0], - (3, {}, [1, 2])) + (3, {}, [1, 2], + OSMMetadata(3, 5, '2010-07-16T17:36:18Z', 2, 'testbot')) + ) eq_(len(self.relations), 0) - + class TestXML(ParserTestBase): osm_filename = 'test.osm' diff --git a/imposm/parser/util.py b/imposm/parser/util.py index aae1f24..95e8887 100644 --- a/imposm/parser/util.py +++ b/imposm/parser/util.py @@ -1,11 +1,11 @@ # Copyright 2011 Omniscale GmbH & Co. KG -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,6 +15,7 @@ import contextlib import multiprocessing import subprocess +import collections try: from setproctitle import setproctitle @@ -47,5 +48,9 @@ def estimate_records(files): if f.endswith('.pbf'): fsize *= 15 # observed pbf compression factor on osm data records += fsize/200 - - return int(records) \ No newline at end of file + + return int(records) + +OSMMetadata = collections.namedtuple('OSMMetadata', [ + 'version', 'changeset', 'timestamp', 'user_id', 'user_name' +]) diff --git a/imposm/parser/xml/multiproc.py b/imposm/parser/xml/multiproc.py index ab6b92a..8f7153d 100644 --- a/imposm/parser/xml/multiproc.py +++ b/imposm/parser/xml/multiproc.py @@ -79,7 +79,8 @@ class XMLMultiProcParser(object): relations_tag_filter = None def __init__(self, pool_size, nodes_queue=None, ways_queue=None, - relations_queue=None, coords_queue=None, marshal_elem_data=False): + relations_queue=None, coords_queue=None, marshal_elem_data=False, + with_metadata=False): self.pool_size = pool_size self.pool = [] self.nodes_callback = nodes_queue.put if nodes_queue else None @@ -90,6 +91,7 @@ def __init__(self, pool_size, nodes_queue=None, ways_queue=None, self.mmap_pool = MMapPool(pool_size*8, xml_chunk_size*8) self.mmap_queue = multiprocessing.JoinableQueue(8) self.marshal_elem_data = marshal_elem_data + self.with_metadata = with_metadata def parse(self, stream): assert not self.pool @@ -102,6 +104,7 @@ def parse(self, stream): ways_tag_filter=self.ways_tag_filter, relations_tag_filter=self.relations_tag_filter, marshal_elem_data=self.marshal_elem_data, + with_metadata=self.with_metadata ) self.pool.append(proc) proc.start() diff --git a/imposm/parser/xml/parser.py b/imposm/parser/xml/parser.py index cf8b5aa..744267c 100644 --- a/imposm/parser/xml/parser.py +++ b/imposm/parser/xml/parser.py @@ -1,11 +1,11 @@ # Copyright 2011 Omniscale GmbH & Co. KG -# +# # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,12 +16,16 @@ from marshal import dumps +from imposm.parser.util import OSMMetadata from imposm.parser.xml.util import log_file_on_exception, iterparse + class XMLParser(object): def __init__(self, nodes_callback=None, ways_callback=None, - relations_callback=None, coords_callback=None, nodes_tag_filter=None, - ways_tag_filter=None, relations_tag_filter=None, marshal_elem_data=False): + relations_callback=None, coords_callback=None, + nodes_tag_filter=None, ways_tag_filter=None, + relations_tag_filter=None, marshal_elem_data=False, + with_metadata=False): self.nodes_callback = nodes_callback self.ways_callback = ways_callback self.relations_callback = relations_callback @@ -30,7 +34,24 @@ def __init__(self, nodes_callback=None, ways_callback=None, self.ways_tag_filter = ways_tag_filter self.relations_tag_filter = relations_tag_filter self.marshal_elem_data = marshal_elem_data - + self.with_metadata = with_metadata + + def _get_int_or_none(self, obj, key): + if key in obj: + return int(obj[key]) + else: + return None + + def get_metadata(self, elem): + meta = OSMMetadata( + version=self._get_int_or_none(elem.attrib, 'version'), + changeset=self._get_int_or_none(elem.attrib, 'changeset'), + timestamp=elem.attrib.get('timestamp'), + user_id=self._get_int_or_none(elem.attrib, 'uid'), + user_name=elem.attrib.get('user') + ) + return meta + def parse(self, xml): with log_file_on_exception(xml): coords = [] @@ -41,7 +62,7 @@ def parse(self, xml): refs = [] members = [] root, context = iterparse(xml) - + for event, elem in context: if event == 'start': continue if elem.tag == 'tag': @@ -56,6 +77,9 @@ def parse(self, xml): if tags and self.nodes_callback: if self.marshal_elem_data: nodes.append((osmid, dumps((tags, (x, y)), 2))) + elif self.with_metadata: + nodes.append((osmid, tags, (x, y), + self.get_metadata(elem))) else: nodes.append((osmid, tags, (x, y))) tags = {} @@ -70,6 +94,9 @@ def parse(self, xml): if self.ways_callback: if self.marshal_elem_data: ways.append((osm_id, dumps((tags, refs), 2))) + elif self.with_metadata: + ways.append((osm_id, tags, refs, + self.get_metadata(elem))) else: ways.append((osm_id, tags, refs)) refs = [] @@ -80,12 +107,19 @@ def parse(self, xml): self.relations_tag_filter(tags) if tags and self.relations_callback: if self.marshal_elem_data: - relations.append((osm_id, dumps((tags, members), 2))) + relations.append( + (osm_id, dumps((tags, members), 2)) + ) + elif self.with_metadata: + relations.append( + (osm_id, tags, members, + self.get_metadata(elem)) + ) else: relations.append((osm_id, tags, members)) members = [] tags = {} - + if len(coords) >= 512: self.coords_callback(coords) coords = [] @@ -100,7 +134,7 @@ def parse(self, xml): ways = [] root.clear() - + if self.coords_callback: self.coords_callback(coords) if self.nodes_callback: