Skip to content

Commit

Permalink
simplify row fetching of indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
lonvia committed Jul 13, 2024
1 parent f2540f3 commit 7343259
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 69 deletions.
59 changes: 19 additions & 40 deletions src/nominatim_db/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,34 +145,30 @@ async def _index(self, runner: runners.Runner, batch: int = 1) -> int:

if total_tuples > 0:
async with await psycopg.AsyncConnection.connect(
self.dsn, row_factory=psycopg.rows.dict_row) as aconn:
self.dsn, row_factory=psycopg.rows.dict_row) as aconn,\
QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
fetcher_time = 0.0
tstart = time.time()
async with aconn.cursor(name='places') as cur:
await cur.execute(runner.sql_get_objects())

async with QueryPool(self.dsn, self.num_threads, autocommit=True) as pool:
places_task = asyncio.create_task(_fetch_next_batch(cur, runner))
places = await places_task
while places is not None:
# asynchronously query the next batch
places_task = asyncio.create_task(_fetch_next_batch(cur, runner))

# And insert the current batch
for idx in range(0, len(places), batch):
part = places[idx:idx + batch]
LOG.debug("Processing places: %s", str(part))
await pool.put_query(*runner.index_places(part))
progress.add(len(part))

# get the results for the next batch
tstart = time.time()
places = await places_task
fetcher_time += time.time() - tstart
places = []
async for place in cur.stream(runner.sql_get_objects()):
fetcher_time += time.time() - tstart

LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
fetcher_time, pool.wait_time)
places.append(place)

if len(places) >= batch:
LOG.debug("Processing places: %s", str(places))
await pool.put_query(*runner.index_places(places))
progress.add(len(places))
places = []

tstart = time.time()

if places:
await pool.put_query(*runner.index_places(places))

LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
fetcher_time, pool.wait_time)

return progress.done()

Expand All @@ -187,20 +183,3 @@ def _prepare_indexing(self, runner: runners.Runner) -> int:
total_tuples = execute_scalar(conn, runner.sql_count_objects())
LOG.debug("Total number of rows: %i", total_tuples)
return cast(int, total_tuples)


async def _fetch_next_batch(cursor: psycopg.AsyncCursor[psycopg.rows.DictRow],
runner: runners.Runner) -> Optional[List[psycopg.rows.DictRow]]:
ids = await cursor.fetchmany(100)

if not ids:
return None

sql, params = runner.get_place_details(ids)

if sql:
async with cursor.connection.cursor() as cur:
await cur.execute(sql, params)
return await cur.fetchall()

return ids
43 changes: 14 additions & 29 deletions src/nominatim_db/indexer/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@ class Runner(Protocol):
def name(self) -> str: ...
def sql_count_objects(self) -> Query: ...
def sql_get_objects(self) -> Query: ...
def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ...
def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ...


SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
FROM placex, LATERAL placex_indexing_prepare(placex) as extra """)
UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"

class AbstractPlacexRunner:
""" Returns SQL commands for indexing of the placex table.
"""
SELECT_SQL = pysql.SQL('SELECT place_id FROM placex ')
UPDATE_LINE = "(%s, %s::hstore, %s::hstore, %s::int, %s::jsonb)"

def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
self.rank = rank
Expand All @@ -56,14 +57,7 @@ def _index_sql(self, num_places: int) -> pysql.Composed:
name = v.name, linked_place_id = v.linked_place_id
FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
WHERE place_id = v.id
""").format(_mk_valuelist(AbstractPlacexRunner.UPDATE_LINE, num_places))


def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
return ("""SELECT place_id, extra.*
FROM placex, LATERAL placex_indexing_prepare(placex) as extra
WHERE place_id = ANY(%s)""",
([p['place_id'] for p in ids], ))
""").format(_mk_valuelist(UPDATE_LINE, num_places))


def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
Expand All @@ -89,9 +83,9 @@ def sql_count_objects(self) -> pysql.Composed:
""").format(pysql.Literal(self.rank))

def sql_get_objects(self) -> pysql.Composed:
return self.SELECT_SQL + pysql.SQL(
"""WHERE indexed_status > 0 and rank_address = {}
ORDER BY geometry_sector
return SELECT_SQL + pysql.SQL(
"""WHERE placex.indexed_status > 0 and placex.rank_address = {}
ORDER BY placex.geometry_sector
""").format(pysql.Literal(self.rank))


Expand All @@ -111,10 +105,10 @@ def sql_count_objects(self) -> Query:
""").format(pysql.Literal(self.rank))

def sql_get_objects(self) -> Query:
return self.SELECT_SQL + pysql.SQL(
"""WHERE indexed_status > 0 and rank_search = {}
and class = 'boundary' and type = 'administrative'
ORDER BY partition, admin_level
return SELECT_SQL + pysql.SQL(
"""WHERE placex.indexed_status > 0 and placex.rank_search = {}
and placex.class = 'boundary' and placex.type = 'administrative'
ORDER BY placex.partition, placex.admin_level
""").format(pysql.Literal(self.rank))


Expand All @@ -134,19 +128,14 @@ def sql_count_objects(self) -> Query:
return """SELECT count(*) FROM location_property_osmline
WHERE indexed_status > 0"""


def sql_get_objects(self) -> Query:
return """SELECT place_id
return """SELECT place_id, get_interpolation_address(address, osm_id) as address
FROM location_property_osmline
WHERE indexed_status > 0
ORDER BY geometry_sector"""


def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
return ("""SELECT place_id, get_interpolation_address(address, osm_id) as address
FROM location_property_osmline WHERE place_id = ANY(%s)""",
([p['place_id'] for p in ids], ))


@functools.lru_cache(maxsize=1)
def _index_sql(self, num_places: int) -> pysql.Composed:
return pysql.SQL("""UPDATE location_property_osmline
Expand Down Expand Up @@ -184,10 +173,6 @@ def sql_get_objects(self) -> Query:
ORDER BY country_code, postcode"""


def get_place_details(self, ids: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
return '', []


def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
return (pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})""")
Expand Down

0 comments on commit 7343259

Please sign in to comment.