diff --git a/src/nominatim_db/indexer/indexer.py b/src/nominatim_db/indexer/indexer.py index 66ebbb47c..eb79ed819 100644 --- a/src/nominatim_db/indexer/indexer.py +++ b/src/nominatim_db/indexer/indexer.py @@ -7,7 +7,7 @@ """ Main work horse for indexing (computing addresses) the database. """ -from typing import cast +from typing import cast, List, Any import logging import time @@ -148,22 +148,26 @@ async def _index(self, runner: runners.Runner, batch: int = 1) -> int: fetcher_time = 0.0 tstart = time.time() async with aconn.cursor(name='places') as cur: - places = [] + query = runner.index_places_query(batch) + params: List[Any] = [] + num_places = 0 async for place in cur.stream(runner.sql_get_objects()): fetcher_time += time.time() - tstart - places.append(place) + params.extend(runner.index_places_params(place)) + num_places += 1 - if len(places) >= batch: - LOG.debug("Processing places: %s", str(places)) - await pool.put_query(*runner.index_places(places)) - progress.add(len(places)) - places = [] + if num_places >= batch: + LOG.debug("Processing places: %s", str(params)) + await pool.put_query(query, params) + progress.add(num_places) + params = [] + num_places = 0 tstart = time.time() - if places: - await pool.put_query(*runner.index_places(places)) + if num_places > 0: + await pool.put_query(runner.index_places_query(num_places), params) LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs", fetcher_time, pool.wait_time) diff --git a/src/nominatim_db/indexer/runners.py b/src/nominatim_db/indexer/runners.py index 82f35e581..0c0d1d16f 100644 --- a/src/nominatim_db/indexer/runners.py +++ b/src/nominatim_db/indexer/runners.py @@ -8,8 +8,7 @@ Mix-ins that provide the actual commands for the indexer for various indexing tasks. """ -from typing import Any, List, Tuple, Sequence -import functools +from typing import Any, Sequence from psycopg import sql as pysql from psycopg.abc import Query @@ -33,7 +32,8 @@ class Runner(Protocol): def name(self) -> str: ... def sql_count_objects(self) -> Query: ... def sql_get_objects(self) -> Query: ... - def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ... + def index_places_query(self, batch_size: int) -> Query: ... + def index_places_params(self, place: DictRow) -> Sequence[Any]: ... SELECT_SQL = pysql.SQL("""SELECT place_id, extra.* @@ -49,25 +49,22 @@ def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None: self.analyzer = analyzer - @functools.lru_cache(maxsize=1) - def _index_sql(self, num_places: int) -> pysql.Composed: + def index_places_query(self, batch_size: int) -> Query: return pysql.SQL( """ UPDATE placex SET indexed_status = 0, address = v.addr, token_info = v.ti, name = v.name, linked_place_id = v.linked_place_id FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti) WHERE place_id = v.id - """).format(_mk_valuelist(UPDATE_LINE, num_places)) + """).format(_mk_valuelist(UPDATE_LINE, batch_size)) - def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - values: List[Any] = [] - for place in places: - for field in ('place_id', 'name', 'address', 'linked_place_id'): - values.append(place[field]) - values.append(_analyze_place(place, self.analyzer)) - - return (self._index_sql(len(places)), values) + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], + place['name'], + place['address'], + place['linked_place_id'], + _analyze_place(place, self.analyzer)) class RankRunner(AbstractPlacexRunner): @@ -136,22 +133,17 @@ def sql_get_objects(self) -> Query: ORDER BY geometry_sector""" - @functools.lru_cache(maxsize=1) - def _index_sql(self, num_places: int) -> pysql.Composed: + def index_places_query(self, batch_size: int) -> Query: return pysql.SQL("""UPDATE location_property_osmline SET indexed_status = 0, address = v.addr, token_info = v.ti FROM (VALUES {}) as v(id, addr, ti) WHERE place_id = v.id - """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places)) - + """).format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size)) - def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - values: List[Any] = [] - for place in places: - values.extend((place[x] for x in ('place_id', 'address'))) - values.append(_analyze_place(place, self.analyzer)) - return (self._index_sql(len(places)), values) + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], place['address'], + _analyze_place(place, self.analyzer)) @@ -173,7 +165,11 @@ def sql_get_objects(self) -> Query: ORDER BY country_code, postcode""" - def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - return (pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 - WHERE place_id IN ({})""") - .format(pysql.SQL(',').join((pysql.Literal(i['place_id']) for i in places))), []) + def index_places_query(self, batch_size: int) -> Query: + return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 + WHERE place_id IN ({})""")\ + .format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size)))) + + + def index_places_params(self, place: DictRow) -> Sequence[Any]: + return (place['place_id'], )