Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] #16 add geocode processor #88

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions clearmash/pipeline-spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ entities-sync:
table: clearmash-entities
item-id-column: item_id
document-id-column: document_id

# sync to elasticsearch
- run: ..datapackage_pipelines_mojp.common.processors.sync
parameters:
Expand All @@ -124,6 +125,15 @@ entities-sync:
- run: dump.to_path
parameters:
out-path: ../data/clearmash/new-entities-sync
# dump geocode_location table
- run: dump.to_sql
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it will be better to have the sync processor update the geocode_entities table

parameters:
add-resource: dbs-geocode-location
table: geocode-location
document-id-column: id
location-column: geo_location
lat-column:
lan-column:

entities-delete:
description: "go over all items in ElasticSearch and delete any items which are not in entities or have display_allowed=false"
Expand All @@ -149,3 +159,23 @@ entities-delete:
id-field: item_id
source: clearmash
delete-all-input: true

geocode:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this pipeline should be part of a common pipeline spec file, it's not specific to clearmash

# schedule:
# crontab: "0 0 * * *"
pipeline:
- run: ..datapackage_pipelines_mojp.common.processors.load_sql_resource
parameters:
input-resource: dbs-geocode-location
table: geocode_location
- run: ..datapackage_pipelines_mojp.common.processors.geocode
parameters:
input-resource: geocode_location
output-resource: entities-geo
geo-table: entities-geo
- run: dump.to_sql
parameters:
tables:
entities_geo:
resource-name: entities-geo
mode: update
3 changes: 2 additions & 1 deletion datapackage_pipelines_mojp/bagnowka/processors/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ def bagnowka_row_to_dbs_row(self, bagnowka_row):
"content_html_he": "",
"related_documents": {},
"source_doc": bagnowka_row,
"images": bagnowka_row["pictures"]
"images": bagnowka_row["pictures"],
"geo_location": ""
}
return dbs_row

Expand Down
4 changes: 4 additions & 0 deletions datapackage_pipelines_mojp/clearmash/pipeline_funcs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging, os
from datapackage_pipelines_mojp.clearmash.processors.download import CLEARMASH_DOWNLOAD_SCHEMA
from datapackage_pipelines_mojp.clearmash.processors.add_entity_ids import Processor as AddEntityIdsProcesor
from datapackage_pipelines_mojp.common.constants import GEOCODE_LOCATION_SCHEMA


def get_override_item_ids_where():
Expand Down Expand Up @@ -28,3 +29,6 @@ def entities_schema():

def entity_ids_schema():
return AddEntityIdsProcesor._get_schema()

def geocode_location_schema():
return GEOCODE_LOCATION
13 changes: 13 additions & 0 deletions datapackage_pipelines_mojp/clearmash/processors/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def _filter_resource(self, resource_descriptor, resource_data):
if self._doc_show_filter(dbs_row):
self._add_related_documents(dbs_row, cm_row)
self._populate_image_fields(dbs_row, cm_row)
self._add_geocode_location(dbs_row, cm_row)
yield dbs_row
self._stats["num_converted_rows"] += 1
else:
Expand Down Expand Up @@ -112,6 +113,18 @@ def _cm_row_to_dbs_row(self, cm_row):
def _get_collection(self, cm_row):
return cm_row["collection"]

def _add_geocode_location(self, dbs_row, cm_row):
# TODO: for collections other than 'places', get
# place from title and/or other fileds (maybe by comparing words in title to list of places?)
if self._get_collection(cm_row) == "places":
if dbs_row["title_en"]:
dbs_row["geo_location"] = dbs_row["title_en"]
else:
dbs_row["geo_location"] = ""
else:
dbs_row["geo_location"] = ""


def _get_images_from_parsed_doc(self, item_id, parsed_doc):
images = []
all_child_docs = self._get_clearmash_api().child_documents.get_for_parsed_doc(parsed_doc)
Expand Down
10 changes: 8 additions & 2 deletions datapackage_pipelines_mojp/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"description": COLLECTION_FIELD_DESCRIPTION},
{"name": "source_doc", "type": "object"},
{"name": "title", "type": "object",
"description": "languages other then he/en, will be flattened on elasticsearch to content_html_LANG"},
"description": "languages other then he/en, will be flattened on elasticsearch to content_html_LANG"},
{"name": "title_he", "type": "string"},
{"name": "title_en", "type": "string"},
{"name": "content_html", "type": "object",
Expand All @@ -45,7 +45,8 @@
"description": "url to the main thumbnail image"},
{"name": "related_documents", "type": "object",
"description": "related documents of different types (source-specific)"},
{"name": "images", "type": "array"},]}
{"name": "images", "type": "array"},
{"name": "geo_location", "type": "string"}]}

DBS_DOCS_SYNC_LOG_TABLE_SCHEMA = {"fields": [{"name": "source", "type": "string"},
{'name': 'id', 'type': 'string'},
Expand All @@ -54,3 +55,8 @@
{"name": "collection", "type": "string",
"description": COLLECTION_FIELD_DESCRIPTION},
{"name": "sync_msg", "type": "string"}]}

GEOCODE_LOCATION_SCHEMA = {"fields": [{"name": "document_id", "type": "sting"},
{"name": "geo_location", "type": "string"},
{"name": "lan", "type": "string"},
{"name": "lan", "type": "string"}]}
123 changes: 123 additions & 0 deletions datapackage_pipelines_mojp/common/processors/geocode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
from datapackage_pipelines.wrapper import ingest, spew
from datapackage_pipelines_mojp.common.constants import GEOCODE_LOCATION_SCHEMA
import requests, logging, datetime, os
import geocoder
from sqlalchemy import create_engine, MetaData, or_
from sqlalchemy.orm import sessionmaker


# attributes in entity details that might contain the full address in a single string
LOCATION_ADDRESS_FIELDS = ["location"]

# simple mapping of fields to their possible sources in the data
# we will take the first item from the list of possible sources that has some data
LOCATION_FIELDS_MAP = {"street": ["street"],
"house_number": ["street_number", "house_number"],
"city": ["city"],
"country": ["country"],
"po_box": ["pob", "zipcode", "postal_code", "pob_postal_code"],}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this stuff is specific to budget key - it's irrelevant for us

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought maybe to leave it incase we'd like to extend location to other items in the future, like a detailed address of a photo subject (e.g painting, synagogue, person's residence and so on). But I guess it could be added if and when needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this logic is specific only to open budget, even if we did need street number and this stuff - this code will not work..



class GeoCodeEntities(object):

def __init__(self, parameters, datapackage, resources):
self.parameters = parameters
self.datapackage = datapackage
self.resources = resources
self.locations_cache = {}

@staticmethod
def get_schema():
return GEOCODE_LOCATION_SCHEMA

def initialize_db_session(self):
connection_string = os.environ.get("DPP_DB_ENGINE")
assert connection_string is not None, \
"Couldn't connect to DB - " \
"Please set your '%s' environment variable" % "DPP_DB_ENGINE"
engine = create_engine(connection_string)
return sessionmaker(bind=engine)()

# returns a location string to geocode by
# returns None if no suitable location string is found
def get_location_string(self, entity_details):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function is not needed, we get the location string as-is from the source data

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I change this processor to extend the BaseProcessor, like the rest?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be better to have all processor use similar logic, but not critical

for field in LOCATION_ADDRESS_FIELDS:
if entity_details.get(field) and len(entity_details[field]) > 3:
# for this type of fields, the address is the best match
return entity_details[field]
address = {}
for field, possible_sources in LOCATION_FIELDS_MAP.items():
address[field] = ""
for source in possible_sources:
if entity_details.get(source) and len(entity_details[source]) > 0:
address[field] = entity_details[source]
break
location = "{street} {house_number}, {city}, {country}, {po_box}".format(**address)
return location if len(location) > 15 else None

def filter_resources(self):
for resource_descriptor, resource in zip(self.datapackage["resources"], self.resources):
logging.info(resource_descriptor)
if resource_descriptor["name"] == self.parameters["output-resource"]:
yield self.filter_resource(resource)
else:
yield resource

def geocoder_google(self, location):
return geocoder.google(location)

def get_entity_location_details_from_db(self, session, table, entity_id, location):
got_entity_row, old_location, location_lat, location_lng = False, None, None, None
if table is not None:
if location:
rows = session.query(table).filter(or_(table.c.entity_id == entity_id, table.c.location == location))
else:
rows = session.query(table).filter(or_(table.c.entity_id == entity_id))
got_location_row = False
for row in rows:
if row.entity_id == entity_id:
old_location = row.location
got_entity_row = True
if row.location == location:
location_lat, location_lng = row.lat, row.lng
got_location_row = True
if not got_entity_row and not got_location_row:
raise Exception("Unexpected row: {}".format(row))
return got_entity_row, old_location, location_lat, location_lng

def filter_resource(self, resource):
session = self.initialize_db_session()
meta = MetaData(bind=session.connection())
meta.reflect()
table = meta.tables.get(self.parameters["geo-table"])
for row in resource:
entity_id, location = row["id"], self.get_location_string(row["details"])
has_row, old_location, lat, lng = self.get_entity_location_details_from_db(session, table, entity_id, location)
if (not has_row # new entity - not geocoded, will be inserted
or (has_row and not location and old_location) # previously had a location, now doesn't
or (has_row and location != old_location)): # location changed
# need to update DB
if location and (not lat or not lng):
if location in self.locations_cache:
lat, lng = self.locations_cache[location]
else:
# new location, need to geocode
g = self.geocoder_google(location)
if g.ok:
lat, lng = g.latlng
else:
lat, lng = None, None
self.locations_cache[location] = lat, lng
# only yield items which need to be updated in DB
yield {"entity_id": entity_id, "location": location,
"lat": lat, "lng": lng}


if __name__ == "__main__":
parameters, datapackage, resources = ingest()
for i, resource in enumerate(datapackage["resources"]):
if resource["name"] == parameters["input-resource"]:
datapackage["resources"][i] = {"name": parameters["output-resource"],
"path": parameters["output-resource"]+".csv",
"schema": GeoCodeEntities.get_schema()}
spew(datapackage, GeoCodeEntities(parameters, datapackage, resources).filter_resources())
57 changes: 40 additions & 17 deletions datapackage_pipelines_mojp/common/processors/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def _get_schema(cls):

def _filter_row(self, row):
if not self._pre_validate_row(row):
self._warn_once("rows are skipped because they failed pre validation")
self._warn_once(
"rows are skipped because they failed pre validation")
return None
else:
logging.info("{source}:{collection},{id}@{version}".format(
Expand All @@ -36,6 +37,7 @@ def _filter_row(self, row):
self._populate_language_fields(new_doc, row)
self._populate_related_documents(new_doc, row)
self._add_title_related_fields(new_doc)
self._add_geocode_location_fields(new_doc, row)
self._validate_collection(new_doc)
self._validate_slugs(new_doc)
with temp_loglevel(logging.ERROR):
Expand All @@ -49,7 +51,8 @@ def _filter_row(self, row):
else:
return self._add_doc(new_doc)
except Exception:
logging.exception("unexpected exception, row={}".format(original_row))
logging.exception(
"unexpected exception, row={}".format(original_row))
raise

def _get_sync_response(self, doc, sync_msg):
Expand All @@ -68,20 +71,24 @@ def _update_doc(self, new_doc, old_doc):
self._update_doc_slugs(new_doc, old_doc)
with temp_loglevel(logging.ERROR):
self._es.index(index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE,
id="{}_{}".format(new_doc["source"], new_doc["source_id"]),
body=new_doc)
id="{}_{}".format(
new_doc["source"], new_doc["source_id"]),
body=new_doc)
return self._get_sync_response(new_doc, "updated doc in ES")


def _update_doc_slugs(self, new_doc, old_doc):
# aggregate the new and old doc slugs - so that we will never delete any existing slugs, only add new ones
for lang in iso639.languages.part1:
old_slug = old_doc["slug_{}".format(lang)] if "slug_{}".format(lang) in old_doc else None
old_slug = old_doc["slug_{}".format(lang)] if "slug_{}".format(
lang) in old_doc else None
if old_slug:
new_slug = new_doc["slug_{}".format(lang)] if "slug_{}".format(lang) in new_doc else None
new_slug = new_doc["slug_{}".format(lang)] if "slug_{}".format(
lang) in new_doc else None
if new_slug:
new_slug = [new_slug] if isinstance(new_slug, str) else new_slug
old_slug = [old_slug] if isinstance(old_slug, str) else old_slug
new_slug = [new_slug] if isinstance(
new_slug, str) else new_slug
old_slug = [old_slug] if isinstance(
old_slug, str) else old_slug
for s in old_slug:
if s not in new_slug:
new_slug.append(s)
Expand All @@ -100,16 +107,18 @@ def _pre_validate_row(self, row):
content_html_he = row.get("content_html_he", "")
content_html_en = row.get("content_html_en", "")
if ((content_html_he is not None and len(content_html_he) > 0)
or (content_html_en is not None and len(content_html_en) > 0)):
or (content_html_en is not None and len(content_html_en) > 0)):
return True
else:
self._warn_once("rows are skipped because they are missing content_html in hebrew or english (or both)")
self._warn_once(
"rows are skipped because they are missing content_html in hebrew or english (or both)")
return False

def _validate_collection(self, new_doc):
if "collection" not in new_doc or new_doc["collection"] not in constants.ALL_KNOWN_COLLECTIONS:
new_doc["collection"] = constants.COLLECTION_UNKNOWN
self._warn_once("rows get collection=unknown because they don't have a collection or the collection is unknown")
self._warn_once(
"rows get collection=unknown because they don't have a collection or the collection is unknown")

def _initialize_new_doc(self, row, source_doc):
# the source doc is used as the base for the final es doc
Expand Down Expand Up @@ -143,7 +152,8 @@ def _add_slug(self, new_doc, title, lang):
slug_parts.append(slug_collection)
slug_parts.append(title.lower())
slugify = Slugify(translate=None, safe_chars='_')
slug = slugify(u'_'.join([p.replace("_", "-") for p in slug_parts]))
slug = slugify(u'_'.join([p.replace("_", "-")
for p in slug_parts]))
new_doc["slug_{}".format(lang)] = slug

def _validate_slugs(self, new_doc):
Expand All @@ -159,7 +169,8 @@ def _validate_slugs(self, new_doc):
# ensure every doc has at least 1 slug (in any language)
if len(slugs) == 0:
# add english slug comprised of doc id
self._warn_once("docs are added a default slug (probably due to missing title)")
self._warn_once(
"docs are added a default slug (probably due to missing title)")
self._add_slug(new_doc, new_doc["source_id"], "en")
slug = new_doc["slug_en"]
slug = self._ensure_slug_uniqueness(slug, new_doc)
Expand All @@ -169,8 +180,10 @@ def _validate_slugs(self, new_doc):
new_doc["slugs"] = slugs

def _ensure_slug_uniqueness(self, slug, doc):
body = {"query": {"constant_score": {"filter": {"term": {"slugs": slug}}}}}
results = self._es.search(index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE, body=body, ignore_unavailable=True)
body = {"query": {"constant_score": {
"filter": {"term": {"slugs": slug}}}}}
results = self._es.search(
index=self._idx, doc_type=constants.PIPELINES_ES_DOC_TYPE, body=body, ignore_unavailable=True)
for hit in results["hits"]["hits"]:
if hit["_id"] != "{}_{}".format(doc["source"], doc["source_id"]):
return self._ensure_slug_uniqueness("{}-{}".format(slug, doc["source_id"]), doc)
Expand All @@ -181,7 +194,8 @@ def _add_title_related_fields(self, new_doc):
if "title_{}".format(lang) in new_doc:
title = new_doc["title_{}".format(lang)]
# lower-case title
new_doc["title_{}_lc".format(lang)] = title.lower() if title is not None else ""
new_doc["title_{}_lc".format(lang)] = title.lower(
) if title is not None else ""
# slug
self._add_slug(new_doc, title, lang)
# ensure there is a value for all suggest supported langs
Expand All @@ -204,11 +218,20 @@ def _populate_language_fields(self, new_doc, row):
# delete the combined json lang field from the new_doc
del new_doc[lang_field]

def _add_geocode_location_fields(self, new_doc, row):
if "geo_location" in new_doc:
new_doc["location"] = [{"document_id": row.get("id")},
{"geo_location": row["geo_location"]},
{"lan": "", "lat": ""}]
del new_doc["geo_location"]
return new_doc["location"]

def _populate_related_documents(self, new_doc, row):
if "related_documents" in new_doc:
for k, v in new_doc["related_documents"].items():
new_doc["related_documents_{}".format(k)] = v
del new_doc["related_documents"]


if __name__ == '__main__':
CommonSyncProcessor.main()
Loading