diff --git a/clearmash/pipeline-spec.yaml b/clearmash/pipeline-spec.yaml index f59c1d6..3d08fd0 100644 --- a/clearmash/pipeline-spec.yaml +++ b/clearmash/pipeline-spec.yaml @@ -115,6 +115,7 @@ entities-sync: table: clearmash-entities item-id-column: item_id document-id-column: document_id + # sync to elasticsearch - run: ..datapackage_pipelines_mojp.common.processors.sync parameters: @@ -124,6 +125,15 @@ entities-sync: - run: dump.to_path parameters: out-path: ../data/clearmash/new-entities-sync + # dump geocode_location table + - run: dump.to_sql + parameters: + add-resource: dbs-geocode-location + table: geocode-location + document-id-column: id + location-column: geo_location + lat-column: + lan-column: entities-delete: description: "go over all items in ElasticSearch and delete any items which are not in entities or have display_allowed=false" @@ -149,3 +159,23 @@ entities-delete: id-field: item_id source: clearmash delete-all-input: true + +geocode: + # schedule: + # crontab: "0 0 * * *" + pipeline: + - run: ..datapackage_pipelines_mojp.common.processors.load_sql_resource + parameters: + input-resource: dbs-geocode-location + table: geocode_location + - run: ..datapackage_pipelines_mojp.common.processors.geocode + parameters: + input-resource: geocode_location + output-resource: entities-geo + geo-table: entities-geo + - run: dump.to_sql + parameters: + tables: + entities_geo: + resource-name: entities-geo + mode: update diff --git a/datapackage_pipelines_mojp/bagnowka/processors/convert.py b/datapackage_pipelines_mojp/bagnowka/processors/convert.py index 64fc780..c09803e 100644 --- a/datapackage_pipelines_mojp/bagnowka/processors/convert.py +++ b/datapackage_pipelines_mojp/bagnowka/processors/convert.py @@ -33,7 +33,8 @@ def bagnowka_row_to_dbs_row(self, bagnowka_row): "content_html_he": "", "related_documents": {}, "source_doc": bagnowka_row, - "images": bagnowka_row["pictures"] + "images": bagnowka_row["pictures"], + "geo_location": "" } return dbs_row diff --git a/datapackage_pipelines_mojp/clearmash/pipeline_funcs.py b/datapackage_pipelines_mojp/clearmash/pipeline_funcs.py index bba38a5..fa9607e 100644 --- a/datapackage_pipelines_mojp/clearmash/pipeline_funcs.py +++ b/datapackage_pipelines_mojp/clearmash/pipeline_funcs.py @@ -1,6 +1,7 @@ import logging, os from datapackage_pipelines_mojp.clearmash.processors.download import CLEARMASH_DOWNLOAD_SCHEMA from datapackage_pipelines_mojp.clearmash.processors.add_entity_ids import Processor as AddEntityIdsProcesor +from datapackage_pipelines_mojp.common.constants import GEOCODE_LOCATION_SCHEMA def get_override_item_ids_where(): @@ -28,3 +29,6 @@ def entities_schema(): def entity_ids_schema(): return AddEntityIdsProcesor._get_schema() + +def geocode_location_schema(): + return GEOCODE_LOCATION diff --git a/datapackage_pipelines_mojp/clearmash/processors/convert.py b/datapackage_pipelines_mojp/clearmash/processors/convert.py index 7d2c7d7..269a499 100644 --- a/datapackage_pipelines_mojp/clearmash/processors/convert.py +++ b/datapackage_pipelines_mojp/clearmash/processors/convert.py @@ -30,6 +30,7 @@ def _filter_resource(self, resource_descriptor, resource_data): if self._doc_show_filter(dbs_row): self._add_related_documents(dbs_row, cm_row) self._populate_image_fields(dbs_row, cm_row) + self._add_geocode_location(dbs_row, cm_row) yield dbs_row self._stats["num_converted_rows"] += 1 else: @@ -112,6 +113,18 @@ def _cm_row_to_dbs_row(self, cm_row): def _get_collection(self, cm_row): return cm_row["collection"] + def _add_geocode_location(self, dbs_row, cm_row): + # TODO: for collections other than 'places', get + # place from title and/or other fileds (maybe by comparing words in title to list of places?) + if self._get_collection(cm_row) == "places": + if dbs_row["title_en"]: + dbs_row["geo_location"] = dbs_row["title_en"] + else: + dbs_row["geo_location"] = "" + else: + dbs_row["geo_location"] = "" + + def _get_images_from_parsed_doc(self, item_id, parsed_doc): images = [] all_child_docs = self._get_clearmash_api().child_documents.get_for_parsed_doc(parsed_doc) diff --git a/datapackage_pipelines_mojp/common/constants.py b/datapackage_pipelines_mojp/common/constants.py index de8b331..d12729e 100644 --- a/datapackage_pipelines_mojp/common/constants.py +++ b/datapackage_pipelines_mojp/common/constants.py @@ -32,7 +32,7 @@ "description": COLLECTION_FIELD_DESCRIPTION}, {"name": "source_doc", "type": "object"}, {"name": "title", "type": "object", - "description": "languages other then he/en, will be flattened on elasticsearch to content_html_LANG"}, + "description": "languages other then he/en, will be flattened on elasticsearch to content_html_LANG"}, {"name": "title_he", "type": "string"}, {"name": "title_en", "type": "string"}, {"name": "content_html", "type": "object", @@ -45,7 +45,8 @@ "description": "url to the main thumbnail image"}, {"name": "related_documents", "type": "object", "description": "related documents of different types (source-specific)"}, - {"name": "images", "type": "array"},]} + {"name": "images", "type": "array"}, + {"name": "geo_location", "type": "string"}]} DBS_DOCS_SYNC_LOG_TABLE_SCHEMA = {"fields": [{"name": "source", "type": "string"}, {'name': 'id', 'type': 'string'}, @@ -54,3 +55,8 @@ {"name": "collection", "type": "string", "description": COLLECTION_FIELD_DESCRIPTION}, {"name": "sync_msg", "type": "string"}]} + +GEOCODE_LOCATION_SCHEMA = {"fields": [{"name": "document_id", "type": "sting"}, + {"name": "geo_location", "type": "string"}, + {"name": "lan", "type": "string"}, + {"name": "lan", "type": "string"}]} diff --git a/datapackage_pipelines_mojp/common/processors/geocode.py b/datapackage_pipelines_mojp/common/processors/geocode.py new file mode 100644 index 0000000..63e84c5 --- /dev/null +++ b/datapackage_pipelines_mojp/common/processors/geocode.py @@ -0,0 +1,123 @@ +from datapackage_pipelines.wrapper import ingest, spew +from datapackage_pipelines_mojp.common.constants import GEOCODE_LOCATION_SCHEMA +import requests, logging, datetime, os +import geocoder +from sqlalchemy import create_engine, MetaData, or_ +from sqlalchemy.orm import sessionmaker + + +# attributes in entity details that might contain the full address in a single string +LOCATION_ADDRESS_FIELDS = ["location"] + +# simple mapping of fields to their possible sources in the data +# we will take the first item from the list of possible sources that has some data +LOCATION_FIELDS_MAP = {"street": ["street"], + "house_number": ["street_number", "house_number"], + "city": ["city"], + "country": ["country"], + "po_box": ["pob", "zipcode", "postal_code", "pob_postal_code"],} + + +class GeoCodeEntities(object): + + def __init__(self, parameters, datapackage, resources): + self.parameters = parameters + self.datapackage = datapackage + self.resources = resources + self.locations_cache = {} + + @staticmethod + def get_schema(): + return GEOCODE_LOCATION_SCHEMA + + def initialize_db_session(self): + connection_string = os.environ.get("DPP_DB_ENGINE") + assert connection_string is not None, \ + "Couldn't connect to DB - " \ + "Please set your '%s' environment variable" % "DPP_DB_ENGINE" + engine = create_engine(connection_string) + return sessionmaker(bind=engine)() + + # returns a location string to geocode by + # returns None if no suitable location string is found + def get_location_string(self, entity_details): + for field in LOCATION_ADDRESS_FIELDS: + if entity_details.get(field) and len(entity_details[field]) > 3: + # for this type of fields, the address is the best match + return entity_details[field] + address = {} + for field, possible_sources in LOCATION_FIELDS_MAP.items(): + address[field] = "" + for source in possible_sources: + if entity_details.get(source) and len(entity_details[source]) > 0: + address[field] = entity_details[source] + break + location = "{street} {house_number}, {city}, {country}, {po_box}".format(**address) + return location if len(location) > 15 else None + + def filter_resources(self): + for resource_descriptor, resource in zip(self.datapackage["resources"], self.resources): + logging.info(resource_descriptor) + if resource_descriptor["name"] == self.parameters["output-resource"]: + yield self.filter_resource(resource) + else: + yield resource + + def geocoder_google(self, location): + return geocoder.google(location) + + def get_entity_location_details_from_db(self, session, table, entity_id, location): + got_entity_row, old_location, location_lat, location_lng = False, None, None, None + if table is not None: + if location: + rows = session.query(table).filter(or_(table.c.entity_id == entity_id, table.c.location == location)) + else: + rows = session.query(table).filter(or_(table.c.entity_id == entity_id)) + got_location_row = False + for row in rows: + if row.entity_id == entity_id: + old_location = row.location + got_entity_row = True + if row.location == location: + location_lat, location_lng = row.lat, row.lng + got_location_row = True + if not got_entity_row and not got_location_row: + raise Exception("Unexpected row: {}".format(row)) + return got_entity_row, old_location, location_lat, location_lng + + def filter_resource(self, resource): + session = self.initialize_db_session() + meta = MetaData(bind=session.connection()) + meta.reflect() + table = meta.tables.get(self.parameters["geo-table"]) + for row in resource: + entity_id, location = row["id"], self.get_location_string(row["details"]) + has_row, old_location, lat, lng = self.get_entity_location_details_from_db(session, table, entity_id, location) + if (not has_row # new entity - not geocoded, will be inserted + or (has_row and not location and old_location) # previously had a location, now doesn't + or (has_row and location != old_location)): # location changed + # need to update DB + if location and (not lat or not lng): + if location in self.locations_cache: + lat, lng = self.locations_cache[location] + else: + # new location, need to geocode + g = self.geocoder_google(location) + if g.ok: + lat, lng = g.latlng + else: + lat, lng = None, None + self.locations_cache[location] = lat, lng + # only yield items which need to be updated in DB + yield {"entity_id": entity_id, "location": location, + "lat": lat, "lng": lng} + + +if __name__ == "__main__": + parameters, datapackage, resources = ingest() + for i, resource in enumerate(datapackage["resources"]): + if resource["name"] == parameters["input-resource"]: + datapackage["resources"][i] = {"name": parameters["output-resource"], + "path": parameters["output-resource"]+".csv", + "schema": GeoCodeEntities.get_schema()} + spew(datapackage, GeoCodeEntities(parameters, datapackage, resources).filter_resources()) \ No newline at end of file diff --git a/datapackage_pipelines_mojp/common/processors/sync.py b/datapackage_pipelines_mojp/common/processors/sync.py index f5e0406..44e8596 100644 --- a/datapackage_pipelines_mojp/common/processors/sync.py +++ b/datapackage_pipelines_mojp/common/processors/sync.py @@ -22,7 +22,8 @@ def _get_schema(cls): def _filter_row(self, row): if not self._pre_validate_row(row): - self._warn_once("rows are skipped because they failed pre validation") + self._warn_once( + "rows are skipped because they failed pre validation") return None else: logging.info("{source}:{collection},{id}@{version}".format( @@ -36,6 +37,7 @@ def _filter_row(self, row): self._populate_language_fields(new_doc, row) self._populate_related_documents(new_doc, row) self._add_title_related_fields(new_doc) + self._add_geocode_location_fields(new_doc, row) self._validate_collection(new_doc) self._validate_slugs(new_doc) with temp_loglevel(logging.ERROR): @@ -49,7 +51,8 @@ def _filter_row(self, row): else: return self._add_doc(new_doc) except Exception: - logging.exception("unexpected exception, row={}".format(original_row)) + logging.exception( + "unexpected exception, row={}".format(original_row)) raise def _get_sync_response(self, doc, sync_msg): @@ -68,20 +71,24 @@ def _update_doc(self, new_doc, old_doc): self._update_doc_slugs(new_doc, old_doc) with temp_loglevel(logging.ERROR): self._es.index(index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE, - id="{}_{}".format(new_doc["source"], new_doc["source_id"]), - body=new_doc) + id="{}_{}".format( + new_doc["source"], new_doc["source_id"]), + body=new_doc) return self._get_sync_response(new_doc, "updated doc in ES") - def _update_doc_slugs(self, new_doc, old_doc): # aggregate the new and old doc slugs - so that we will never delete any existing slugs, only add new ones for lang in iso639.languages.part1: - old_slug = old_doc["slug_{}".format(lang)] if "slug_{}".format(lang) in old_doc else None + old_slug = old_doc["slug_{}".format(lang)] if "slug_{}".format( + lang) in old_doc else None if old_slug: - new_slug = new_doc["slug_{}".format(lang)] if "slug_{}".format(lang) in new_doc else None + new_slug = new_doc["slug_{}".format(lang)] if "slug_{}".format( + lang) in new_doc else None if new_slug: - new_slug = [new_slug] if isinstance(new_slug, str) else new_slug - old_slug = [old_slug] if isinstance(old_slug, str) else old_slug + new_slug = [new_slug] if isinstance( + new_slug, str) else new_slug + old_slug = [old_slug] if isinstance( + old_slug, str) else old_slug for s in old_slug: if s not in new_slug: new_slug.append(s) @@ -100,16 +107,18 @@ def _pre_validate_row(self, row): content_html_he = row.get("content_html_he", "") content_html_en = row.get("content_html_en", "") if ((content_html_he is not None and len(content_html_he) > 0) - or (content_html_en is not None and len(content_html_en) > 0)): + or (content_html_en is not None and len(content_html_en) > 0)): return True else: - self._warn_once("rows are skipped because they are missing content_html in hebrew or english (or both)") + self._warn_once( + "rows are skipped because they are missing content_html in hebrew or english (or both)") return False def _validate_collection(self, new_doc): if "collection" not in new_doc or new_doc["collection"] not in constants.ALL_KNOWN_COLLECTIONS: new_doc["collection"] = constants.COLLECTION_UNKNOWN - self._warn_once("rows get collection=unknown because they don't have a collection or the collection is unknown") + self._warn_once( + "rows get collection=unknown because they don't have a collection or the collection is unknown") def _initialize_new_doc(self, row, source_doc): # the source doc is used as the base for the final es doc @@ -143,7 +152,8 @@ def _add_slug(self, new_doc, title, lang): slug_parts.append(slug_collection) slug_parts.append(title.lower()) slugify = Slugify(translate=None, safe_chars='_') - slug = slugify(u'_'.join([p.replace("_", "-") for p in slug_parts])) + slug = slugify(u'_'.join([p.replace("_", "-") + for p in slug_parts])) new_doc["slug_{}".format(lang)] = slug def _validate_slugs(self, new_doc): @@ -159,7 +169,8 @@ def _validate_slugs(self, new_doc): # ensure every doc has at least 1 slug (in any language) if len(slugs) == 0: # add english slug comprised of doc id - self._warn_once("docs are added a default slug (probably due to missing title)") + self._warn_once( + "docs are added a default slug (probably due to missing title)") self._add_slug(new_doc, new_doc["source_id"], "en") slug = new_doc["slug_en"] slug = self._ensure_slug_uniqueness(slug, new_doc) @@ -169,8 +180,10 @@ def _validate_slugs(self, new_doc): new_doc["slugs"] = slugs def _ensure_slug_uniqueness(self, slug, doc): - body = {"query": {"constant_score": {"filter": {"term": {"slugs": slug}}}}} - results = self._es.search(index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE, body=body, ignore_unavailable=True) + body = {"query": {"constant_score": { + "filter": {"term": {"slugs": slug}}}}} + results = self._es.search( + index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE, body=body, ignore_unavailable=True) for hit in results["hits"]["hits"]: if hit["_id"] != "{}_{}".format(doc["source"], doc["source_id"]): return self._ensure_slug_uniqueness("{}-{}".format(slug, doc["source_id"]), doc) @@ -181,7 +194,8 @@ def _add_title_related_fields(self, new_doc): if "title_{}".format(lang) in new_doc: title = new_doc["title_{}".format(lang)] # lower-case title - new_doc["title_{}_lc".format(lang)] = title.lower() if title is not None else "" + new_doc["title_{}_lc".format(lang)] = title.lower( + ) if title is not None else "" # slug self._add_slug(new_doc, title, lang) # ensure there is a value for all suggest supported langs @@ -204,11 +218,20 @@ def _populate_language_fields(self, new_doc, row): # delete the combined json lang field from the new_doc del new_doc[lang_field] + def _add_geocode_location_fields(self, new_doc, row): + if "geo_location" in new_doc: + new_doc["location"] = [{"document_id": row.get("id")}, + {"geo_location": row["geo_location"]}, + {"lan": "", "lat": ""}] + del new_doc["geo_location"] + return new_doc["location"] + def _populate_related_documents(self, new_doc, row): if "related_documents" in new_doc: for k, v in new_doc["related_documents"].items(): new_doc["related_documents_{}".format(k)] = v del new_doc["related_documents"] + if __name__ == '__main__': CommonSyncProcessor.main() diff --git a/tests/clearmash/test_convert.py b/tests/clearmash/test_convert.py index 9329bfc..dd3ca73 100644 --- a/tests/clearmash/test_convert.py +++ b/tests/clearmash/test_convert.py @@ -124,7 +124,8 @@ def test_clearmash_convert(): "main_image_url": "", "main_thumbnail_image_url": "", "keys": ['source_doc'], - "images": [],}) + "images": [], + "geo_location": ""}) assert_dict(resource[1], {"collection": "places"}) assert_dict(resource[2], {"collection": "movies"}) assert_dict(resource[3], {"collection": "personalities"}) @@ -158,3 +159,18 @@ def test_clearmash_convert_place_with_related_photoUnits(): assert resource[0]["images"][:3] == [image("031017ece2cc49d2ba73311e336408a2"), image("1245931e49264167a801a8f31a24eaed"), image("49efc3eb16e44689b5dd9b4b078201ec"),] + +def test_geocode_location(): + # places sude have location + entity_ids = [{"item_id": 233953, "collection": "places"},] + resource = get_clearmash_convert_resource_data(get_downloaded_docs(entity_ids)) + assert_dict(resource[0], {'title_en': 'Moscow'}) + assert_dict(resource[0], {'geo_location': 'Moscow'}) + # other collections should have an empty string + entity_ids = [{"item_id": 203884, "collection": "photoUnits"},] + resource = get_clearmash_convert_resource_data(get_downloaded_docs(entity_ids)) + assert_dict(resource[0], {'title_en': 'The Rema Synagogue, Cracow, Poland. Model. Permanent Exhibitio'}) + assert_dict(resource[0], {'geo_location': ''}) + + + diff --git a/tests/test_sync.py b/tests/test_sync.py index 434a33e..8bf5815 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -62,7 +62,7 @@ def test_sync(): 'main_image_url', 'source_id', 'parsed_doc', 'title_he', 'document_id', 'content_html_he', 'title_he_suggest', 'item_url', 'last_synced', 'hours_to_next_download', 'last_downloaded', 'display_allowed', - 'images'}}) + 'images', 'location'}}) def test_sync_with_invalid_collection(): @@ -162,3 +162,12 @@ def test_sync_images(): assert doc["images"][:3] == [get_clearmash_image("031017ece2cc49d2ba73311e336408a2"), get_clearmash_image("1245931e49264167a801a8f31a24eaed"), get_clearmash_image("49efc3eb16e44689b5dd9b4b078201ec"),] + +def test_sync_locations(): + es = given_empty_elasticsearch_instance() + # an entity with some images + entity_ids = [{"item_id": 233953, "collection": "places"},] + input_doc = get_clearmash_convert_resource_data(get_clearmash_downloaded_docs(entity_ids))[0] + assert_sync_processor([input_doc]) + doc = es_doc(es, "clearmash", "233953") + assert doc["location"] == [{'document_id': '233953'}, {'geo_location': 'Moscow'}, {'lan': '', 'lat': ''}]