Skip to content

Commit

Permalink
Merge pull request #2 from MAAP-Project/add-query-service-support
Browse files Browse the repository at this point in the history
Add support for query endpoint
  • Loading branch information
bsatoriu authored Aug 14, 2019
2 parents 60b8bc8 + 6b766a8 commit 6ffde78
Show file tree
Hide file tree
Showing 4 changed files with 351 additions and 2 deletions.
1 change: 1 addition & 0 deletions maap.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ algorithm_build = http://%(maap_host)s/api/algorithm/build
job_status = http://%(maap_host)s/api/job/status
wmts = http://%(maap_host)s/api/wmts
tiler_endpoint = https://8e9mu91qr6.execute-api.us-east-1.amazonaws.com/production
query_endpoint = https://%(maap_host)s/api/query/

[aws]
aws_access_key_id = ${AWS_ACCESS_KEY_ID}
Expand Down
5 changes: 5 additions & 0 deletions maap/errors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class QueryTimeout(Exception):
pass

class QueryFailure(Exception):
pass
93 changes: 91 additions & 2 deletions maap/maap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@
import requests
import json
import urllib.parse
import time
from mapboxgl.utils import *
from mapboxgl.viz import *
from datetime import datetime

import xml.etree.ElementTree as ET
from .Result import Collection, Granule
from .Dictlist import Dictlist
from .xmlParser import XmlDictConfig
from maap.utils.Presenter import Presenter
from .errors import QueryTimeout, QueryFailure

logger = logging.getLogger(__name__)

try:
from configparser import ConfigParser
Expand Down Expand Up @@ -48,6 +53,7 @@ def __init__(self):
self._WMTS = self.config.get("service", "wmts")
self._TILER_ENDPOINT = self.config.get("service", "tiler_endpoint")
self._MAAP_HOST = self.config.get("service", "maap_host")
self._QUERY_ENDPOINT = self.config.get("service", "query_endpoint")

self._AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID") or self.config.get("aws", "aws_access_key_id")
self._AWS_ACCESS_SECRET = os.environ.get("AWS_SECRET_ACCESS_KEY") or self.config.get("aws", "aws_secret_access_key")
Expand Down Expand Up @@ -101,7 +107,7 @@ def _get_search_results(self, url, limit, **kwargs):
:param kwargs: search parameters
:return: list of results (<Instance of Result>)
"""
logging.info("======== Waiting for response ========")
logger.info("======== Waiting for response ========")

page_num = 1
results = []
Expand Down Expand Up @@ -168,7 +174,6 @@ def getCallFromEarthdataQuery(self, query, variable_name='maap', limit=1000):

return result


def searchCollection(self, limit=100, **kwargs):
"""
Search the CMR collections
Expand All @@ -195,6 +200,90 @@ def getJobStatus(self, jobid):
)
return response

def executeQuery(self, src, query={}, poll_results=True, timeout=180, wait_interval=.5, max_redirects=5):
"""
Helper to execute query and poll results URL until results are returned
or timeout is reached.
src -- a dict-like object stipulating which dataset is to be queried.
Object must contain 'Collection' key. 'Collection' value must
contain 'ShortName' and 'VersionId' entries. Granule-related value
must contain a 'Collection' entry, complying with aforementioned
'Collection' object requirements.
query -- dict-like object describing parameters for query (default {}).
Currently supported parameters:
- where -- a dict-like object mapping fields to required values,
used for filtering query
- bbox -- optional GeoJSON-compliant bounding box (minX, minY,
maxX, maxY) by which to filter data (default [], meaning no
filter)
- fields -- optional list of fields to return in query response
(default [], returning all fields)
poll_results -- system will poll for results and return results response
if True, otherwise will return response from Query Service (default
True)
timeout -- max number of seconds to wait for response, only used if
results=True (default 180)
wait_interval -- number of seconds to wait between each poll for
results, only used if results=True (default .5)
max_redirectss -- max number of redirects to follow when scheduling
execution (default 5)
"""
url = self._QUERY_ENDPOINT
redirect_count = 0
while True:
response = requests.post(
url=url,
headers=dict(Accept='application/json'),
json=dict(src=src, query=query),
allow_redirects=False
)

# By default, requests follows POST redirects with GET request.
# Instead, we'll make the POST again to the new URL.
redirect_url = response.headers.get('Location', url)
if (redirect_url is not url and response.is_redirect and redirect_count < max_redirects):
logger.debug(f'Received redirect at {url}. Retrying query at {redirect_url}')
url = redirect_url
redirect_count += 1
else:
break

if not poll_results:
# Return the response of query execution
return response

response.raise_for_status()
if (response.is_redirect):
raise requests.HTTPError(
'Received redirect as query execution response '
'Is your the "query_endpoint" configuration correct?'
f'\n{response.status_code}: {response.text}'
)
execution = response.json()
results = execution['results']

# Poll results
start = datetime.now()
while (datetime.now() - start).seconds < timeout:
r = requests.get(url=results)

if r.status_code == 200:
# Return the response of query results
if r.headers.get('x-amz-meta-failed'):
raise QueryFailure(
f'The backing query service failed to process query:\n{r.text}'
)
return r

if r.status_code == 404:
continue

r.raise_for_status()
time.sleep(wait_interval)

raise QueryTimeout('Query results did not appear within {} seconds'.format(timeout))

def _get_browse(self, granule_ur):
response = requests.get(
url='{}/GetTile'.format(self._WMTS),
Expand Down
Loading

0 comments on commit 6ffde78

Please sign in to comment.