diff --git a/src/nominatim_db/indexer/indexer.py b/src/nominatim_db/indexer/indexer.py index 15d355144..b641f8323 100644 --- a/src/nominatim_db/indexer/indexer.py +++ b/src/nominatim_db/indexer/indexer.py @@ -145,34 +145,30 @@ async def _index(self, runner: runners.Runner, batch: int = 1) -> int: if total_tuples > 0: async with await psycopg.AsyncConnection.connect( - self.dsn, row_factory=psycopg.rows.dict_row) as aconn: + self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\ + QueryPool(self.dsn, self.num_threads, autocommit=True) as pool: fetcher_time = 0.0 + tstart = time.time() async with aconn.cursor(name='places') as cur: - await cur.execute(runner.sql_get_objects()) - - async with QueryPool(self.dsn, self.num_threads, autocommit=True) as pool: - places_task = asyncio.create_task(_fetch_next_batch(cur, runner)) - places = await places_task - while places is not None: - # asynchronously query the next batch - places_task = asyncio.create_task(_fetch_next_batch(cur, runner)) - - # And insert the current batch - for idx in range(0, len(places), batch): - part = places[idx:idx + batch] - LOG.debug("Processing places: %s", str(part)) - await pool.put_query(*runner.index_places(part)) - progress.add(len(part)) - - # get the results for the next batch - tstart = time.time() - places = await places_task - fetcher_time += time.time() - tstart + places = [] + async for place in cur.stream(runner.sql_get_objects()): + fetcher_time += time.time() - tstart - LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs", - fetcher_time, pool.wait_time) + places.append(place) + + if len(places) >= batch: + LOG.debug("Processing places: %s", str(places)) + await pool.put_query(*runner.index_places(places)) + progress.add(len(places)) + places = [] + tstart = time.time() + if places: + await pool.put_query(*runner.index_places(places)) + + LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs", + fetcher_time, pool.wait_time) return progress.done() @@ -187,20 +183,3 @@ def _prepare_indexing(self, runner: runners.Runner) -> int: total_tuples = execute_scalar(conn, runner.sql_count_objects()) LOG.debug("Total number of rows: %i", total_tuples) return cast(int, total_tuples) - - -async def _fetch_next_batch(cursor: psycopg.AsyncCursor[psycopg.rows.DictRow], - runner: runners.Runner) -> Optional[List[psycopg.rows.DictRow]]: - ids = await cursor.fetchmany(100) - - if not ids: - return None - - sql, params = runner.get_place_details(ids) - - if sql: - async with cursor.connection.cursor() as cur: - await cur.execute(sql, params) - return await cur.fetchall() - - return ids diff --git a/src/nominatim_db/indexer/runners.py b/src/nominatim_db/indexer/runners.py index b774bbf9e..82f35e581 100644 --- a/src/nominatim_db/indexer/runners.py +++ b/src/nominatim_db/indexer/runners.py @@ -33,15 +33,16 @@ class Runner(Protocol): def name(self) -> str: ... def sql_count_objects(self) -> Query: ... def sql_get_objects(self) -> Query: ... - def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ... def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ... +SELECT_SQL = pysql.SQL("""SELECT place_id, extra.* + FROM placex, LATERAL placex_indexing_prepare(placex) as extra """) +UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)" + class AbstractPlacexRunner: """ Returns SQL commands for indexing of the placex table. """ - SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ') - UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)" def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None: self.rank = rank @@ -56,14 +57,7 @@ def _index_sql(self, num_places: int) -> pysql.Composed: name = v.name, linked_place_id = v.linked_place_id FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti) WHERE place_id = v.id - """).format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places)) - - - def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - return ("""SELECT place_id, extra.* - FROM placex, LATERAL placex_indexing_prepare(placex) as extra - WHERE place_id = ANY(%s)""", - ([p['place_id'] for p in ids], )) + """).format(_mk_valuelist(UPDATE_LINE, num_places)) def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: @@ -89,9 +83,9 @@ def sql_count_objects(self) -> pysql.Composed: """).format(pysql.Literal(self.rank)) def sql_get_objects(self) -> pysql.Composed: - return self.SELECT_SQL + pysql.SQL( - """WHERE indexed_status > 0 and rank_address = {} - ORDER BY geometry_sector + return SELECT_SQL + pysql.SQL( + """WHERE placex.indexed_status > 0 and placex.rank_address = {} + ORDER BY placex.geometry_sector """).format(pysql.Literal(self.rank)) @@ -111,10 +105,10 @@ def sql_count_objects(self) -> Query: """).format(pysql.Literal(self.rank)) def sql_get_objects(self) -> Query: - return self.SELECT_SQL + pysql.SQL( - """WHERE indexed_status > 0 and rank_search = {} - and class = 'boundary' and type = 'administrative' - ORDER BY partition, admin_level + return SELECT_SQL + pysql.SQL( + """WHERE placex.indexed_status > 0 and placex.rank_search = {} + and placex.class = 'boundary' and placex.type = 'administrative' + ORDER BY placex.partition, placex.admin_level """).format(pysql.Literal(self.rank)) @@ -134,19 +128,14 @@ def sql_count_objects(self) -> Query: return """SELECT count(*) FROM location_property_osmline WHERE indexed_status > 0""" + def sql_get_objects(self) -> Query: - return """SELECT place_id + return """SELECT place_id, get_interpolation_address(address, osm_id) as address FROM location_property_osmline WHERE indexed_status > 0 ORDER BY geometry_sector""" - def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - return ("""SELECT place_id, get_interpolation_address(address, osm_id) as address - FROM location_property_osmline WHERE place_id = ANY(%s)""", - ([p['place_id'] for p in ids], )) - - @functools.lru_cache(maxsize=1) def _index_sql(self, num_places: int) -> pysql.Composed: return pysql.SQL("""UPDATE location_property_osmline @@ -184,10 +173,6 @@ def sql_get_objects(self) -> Query: ORDER BY country_code, postcode""" - def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: - return '', [] - - def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: return (pysql.SQL("""UPDATE location_postcode SET indexed_status = 0 WHERE place_id IN ({})""")