Skip to content

Commit

Permalink
do not process places in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
lonvia committed Jul 13, 2024
1 parent 83edee1 commit b6cdd90
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
24 changes: 14 additions & 10 deletions src/nominatim_db/indexer/indexer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"""
Main work horse for indexing (computing addresses) the database.
"""
from typing import cast
from typing import cast, List, Any
import logging
import time

Expand Down Expand Up @@ -148,22 +148,26 @@ async def _index(self, runner: runners.Runner, batch: int = 1) -> int:
fetcher_time = 0.0
tstart = time.time()
async with aconn.cursor(name='places') as cur:
places = []
query = runner.index_places_query(batch)
params: List[Any] = []
num_places = 0
async for place in cur.stream(runner.sql_get_objects()):
fetcher_time += time.time() - tstart

places.append(place)
params.extend(runner.index_places_params(place))
num_places += 1

if len(places) >= batch:
LOG.debug("Processing places: %s", str(places))
await pool.put_query(*runner.index_places(places))
progress.add(len(places))
places = []
if num_places >= batch:
LOG.debug("Processing places: %s", str(params))
await pool.put_query(query, params)
progress.add(num_places)
params = []
num_places = 0

tstart = time.time()

if places:
await pool.put_query(*runner.index_places(places))
if num_places > 0:
await pool.put_query(runner.index_places_query(num_places), params)

LOG.info("Wait time: fetcher: %.2fs, pool: %.2fs",
fetcher_time, pool.wait_time)
Expand Down
52 changes: 24 additions & 28 deletions src/nominatim_db/indexer/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
Mix-ins that provide the actual commands for the indexer for various indexing
tasks.
"""
from typing import Any, List, Tuple, Sequence
import functools
from typing import Any, Sequence

from psycopg import sql as pysql
from psycopg.abc import Query
Expand All @@ -33,7 +32,8 @@ class Runner(Protocol):
def name(self) -> str: ...
def sql_count_objects(self) -> Query: ...
def sql_get_objects(self) -> Query: ...
def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]: ...
def index_places_query(self, batch_size: int) -> Query: ...
def index_places_params(self, place: DictRow) -> Sequence[Any]: ...


SELECT_SQL = pysql.SQL("""SELECT place_id, extra.*
Expand All @@ -49,25 +49,22 @@ def __init__(self, rank: int, analyzer: AbstractAnalyzer) -> None:
self.analyzer = analyzer


@functools.lru_cache(maxsize=1)
def _index_sql(self, num_places: int) -> pysql.Composed:
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL(
""" UPDATE placex
SET indexed_status = 0, address = v.addr, token_info = v.ti,
name = v.name, linked_place_id = v.linked_place_id
FROM (VALUES {}) as v(id, name, addr, linked_place_id, ti)
WHERE place_id = v.id
""").format(_mk_valuelist(UPDATE_LINE, num_places))
""").format(_mk_valuelist(UPDATE_LINE, batch_size))


def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
values: List[Any] = []
for place in places:
for field in ('place_id', 'name', 'address', 'linked_place_id'):
values.append(place[field])
values.append(_analyze_place(place, self.analyzer))

return (self._index_sql(len(places)), values)
def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'],
place['name'],
place['address'],
place['linked_place_id'],
_analyze_place(place, self.analyzer))


class RankRunner(AbstractPlacexRunner):
Expand Down Expand Up @@ -136,22 +133,17 @@ def sql_get_objects(self) -> Query:
ORDER BY geometry_sector"""


@functools.lru_cache(maxsize=1)
def _index_sql(self, num_places: int) -> pysql.Composed:
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL("""UPDATE location_property_osmline
SET indexed_status = 0, address = v.addr, token_info = v.ti
FROM (VALUES {}) as v(id, addr, ti)
WHERE place_id = v.id
""").format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", num_places))

""").format(_mk_valuelist("(%s, %s::hstore, %s::jsonb)", batch_size))

def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
values: List[Any] = []
for place in places:
values.extend((place[x] for x in ('place_id', 'address')))
values.append(_analyze_place(place, self.analyzer))

return (self._index_sql(len(places)), values)
def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'], place['address'],
_analyze_place(place, self.analyzer))



Expand All @@ -173,7 +165,11 @@ def sql_get_objects(self) -> Query:
ORDER BY country_code, postcode"""


def index_places(self, places: Sequence[DictRow]) -> Tuple[Query, Sequence[Any]]:
return (pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})""")
.format(pysql.SQL(',').join((pysql.Literal(i['place_id']) for i in places))), [])
def index_places_query(self, batch_size: int) -> Query:
return pysql.SQL("""UPDATE location_postcode SET indexed_status = 0
WHERE place_id IN ({})""")\
.format(pysql.SQL(',').join((pysql.Placeholder() for _ in range(batch_size))))


def index_places_params(self, place: DictRow) -> Sequence[Any]:
return (place['place_id'], )

0 comments on commit b6cdd90

Please sign in to comment.