From 8a02f50da8a910c9f84f91c249b08f12a7718e7a Mon Sep 17 00:00:00 2001 From: Kip Parker Date: Wed, 22 Mar 2023 13:31:33 +0000 Subject: [PATCH 1/3] Update socrata plugin for python 3, make base url configurable --- README.md | 19 +++++++++++++++ ckanext/socrata/plugin.py | 50 ++++++++++++++++++++++++++++++++------- setup.py | 1 + 3 files changed, 62 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index acac378..9840f12 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,25 @@ This extension provides a CKAN Harvest plugin that consumes metadata from Socrat ckan.plugins = harvest socrata_harvester +## Usage + +Create a new harvest source of type "Socrata" and enter the URL of the Socrata catalog you want to harvest from. The default base url to retrieve catalogues is "https://api.us.socrata.com/api/catalog/v1". You can provide a config object to the harvester to change this base url. For example: + +```json +{ + "base_url": "https://api.eu.socrata.com/api/catalog/v1" +} +``` + +For local development, run + +```bash +ckan harvester gather-consumer +ckan harvester fetch-consumer +``` + +to see the harvest jobs being processed. + ## Copying and License This material is copyright (c) Open Knowledge International. diff --git a/ckanext/socrata/plugin.py b/ckanext/socrata/plugin.py index dbf4bf3..3fd23a4 100644 --- a/ckanext/socrata/plugin.py +++ b/ckanext/socrata/plugin.py @@ -2,14 +2,23 @@ import json import uuid -from urlparse import urlparse - import requests from dateutil.parser import parse -from simplejson.scanner import JSONDecodeError + +try: + from urllib.parse import urlparse +except ImportError: + from urlparse import urlparse + +try: + from json import JSONDecodeError +except ImportError: + from simplejson.scanner import JSONDecodeError + + from ckan import model -from ckan.lib.munge import munge_title_to_name, munge_tag +from ckan.lib.munge import munge_tag from ckan.plugins.core import implements import ckan.plugins.toolkit as toolkit from ckanext.harvest.interfaces import IHarvester @@ -209,7 +218,31 @@ def _build_package_dict(self, context, harvest_object): }] return package_dict + + def _set_config(self, config_str): + if config_str: + self.config = self.validate_config(config_str) + if 'base_api_endpoint' in self.config: + self.base_api_endpoint = self.config['base_api_endpoint'] + + log.debug('Using config: %r', self.config) + else: + self.config = {'base_api_endpoint':BASE_API_ENDPOINT} + + def validate_config(self, config): + if not config: + return config + config_obj = json.loads(config) + if 'base_api_endpoint' in config_obj: + try: + parsed = urlparse(config_obj['base_api_endpoint']) + except AttributeError: + raise ValueError('base_api_endpoint must be a valid URL') + if not parsed.scheme or not parsed.netloc: + raise ValueError('base_api_endpoint must be a valid URL') + return config_obj + def process_package(self, package, harvest_object): ''' Subclasses can override this method to perform additional processing on @@ -237,7 +270,7 @@ def _request_datasets_from_socrata(domain, limit=100, offset=0): api_request_url = \ '{0}?domains={1}&search_context={1}' \ '&only=datasets&limit={2}&offset={3}' \ - .format(BASE_API_ENDPOINT, domain, limit, offset) + .format(self.base_api_endpoint, domain, limit, offset) log.debug('Requesting {}'.format(api_request_url)) api_response = requests.get(api_request_url) @@ -266,8 +299,9 @@ def _page_datasets(domain, batch_number): _request_datasets_from_socrata(domain, batch_number, current_offset) if datasets is None or len(datasets) == 0: - raise StopIteration + break current_offset = current_offset + batch_number + log.debug(f'Continued with {current_offset}-{batch_number}') for dataset in datasets: yield dataset @@ -292,7 +326,7 @@ def _make_harvest_objs(datasets): log.debug('In SocrataHarvester gather_stage (%s)', harvest_job.source.url) - + self._set_config(harvest_job.source.config) domain = urlparse(harvest_job.source.url).hostname object_ids, guids = _make_harvest_objs(_page_datasets(domain, 100)) @@ -396,7 +430,7 @@ def import_stage(self, harvest_object): else: # We need to explicitly provide a package ID - package_dict['id'] = unicode(uuid.uuid4()) + package_dict['id'] = str(uuid.uuid4()) harvest_object.package_id = package_dict['id'] harvest_object.add() diff --git a/setup.py b/setup.py index 5e52857..88d389b 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,7 @@ # Specify the Python versions you support here. In particular, ensure # that you indicate whether you support Python 2, Python 3 or both. 'Programming Language :: Python :: 2.7', + 'Programming Language :: Python :: 3', ], From 3a51583b6b0b77ebf2fde1d3caf39a05d6eb5f22 Mon Sep 17 00:00:00 2001 From: Kip Parker Date: Wed, 22 Mar 2023 14:38:05 +0000 Subject: [PATCH 2/3] Name the data files (match the title) --- ckanext/socrata/plugin.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/ckanext/socrata/plugin.py b/ckanext/socrata/plugin.py index 3fd23a4..1606aa3 100644 --- a/ckanext/socrata/plugin.py +++ b/ckanext/socrata/plugin.py @@ -161,7 +161,8 @@ def _build_package_dict(self, context, harvest_object): 'owner_org': local_org, 'resources': [], } - + log.info(package_dict) + log.info(res) # Add tags package_dict['tags'] = \ [{'name': munge_tag(t)} @@ -214,20 +215,20 @@ def _build_package_dict(self, context, harvest_object): 'url': DOWNLOAD_ENDPOINT_TEMPLATE.format( domain=urlparse(harvest_object.source.url).hostname, resource_id=res['resource']['id']), - 'format': 'CSV' + 'format': 'CSV', + 'name': res['resource']['name'] }] return package_dict def _set_config(self, config_str): + self.config = {'base_api_endpoint':BASE_API_ENDPOINT} if config_str: - self.config = self.validate_config(config_str) + self.config = json.loads(config_str) if 'base_api_endpoint' in self.config: self.base_api_endpoint = self.config['base_api_endpoint'] log.debug('Using config: %r', self.config) - else: - self.config = {'base_api_endpoint':BASE_API_ENDPOINT} def validate_config(self, config): if not config: @@ -241,7 +242,7 @@ def validate_config(self, config): raise ValueError('base_api_endpoint must be a valid URL') if not parsed.scheme or not parsed.netloc: raise ValueError('base_api_endpoint must be a valid URL') - return config_obj + return config def process_package(self, package, harvest_object): ''' @@ -319,6 +320,7 @@ def _make_harvest_objs(datasets): extras=[HarvestObjectExtra( key='status', value='hi!')]) + log.debug('Content is {}'.format(json.dumps(d))) obj.save() obj_ids.append(obj.id) guids.append(d['resource']['id']) @@ -454,3 +456,5 @@ def import_stage(self, harvest_object): return False return True + + From 98d1cf400b58468025dbb4e280c73ebca13aef3a Mon Sep 17 00:00:00 2001 From: Kip Parker Date: Wed, 22 Mar 2023 16:27:50 +0000 Subject: [PATCH 3/3] remove logging --- ckanext/socrata/plugin.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ckanext/socrata/plugin.py b/ckanext/socrata/plugin.py index 1606aa3..d51c592 100644 --- a/ckanext/socrata/plugin.py +++ b/ckanext/socrata/plugin.py @@ -161,8 +161,6 @@ def _build_package_dict(self, context, harvest_object): 'owner_org': local_org, 'resources': [], } - log.info(package_dict) - log.info(res) # Add tags package_dict['tags'] = \ [{'name': munge_tag(t)} @@ -320,7 +318,6 @@ def _make_harvest_objs(datasets): extras=[HarvestObjectExtra( key='status', value='hi!')]) - log.debug('Content is {}'.format(json.dumps(d))) obj.save() obj_ids.append(obj.id) guids.append(d['resource']['id'])