diff --git a/Dockerfile b/Dockerfile index 6528aae..eba13ff 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,11 +13,3 @@ RUN make test RUN make install WORKDIR /project -FROM python:3 as dagster -RUN apt-get update -RUN apt-get install -y unzip curl -RUN pip install dagster dagster-aws dagster-shell dagit -RUN curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" -RUN unzip awscliv2.zip -RUN ./aws/install -WORKDIR /project/etl \ No newline at end of file diff --git a/README.md b/README.md index 549b1b4..00d9b95 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,23 @@ # xdb -Exchange database +Antelope Exchange database -This repo contains a deployable antelope server that is exposed via a REST API. All the exchange data are static, so a REST model is appropriate. +This is a REST HTTP server for hosting LCA data according to the [Antelope interface](https://github.com/AntelopeLCA/antelope). + This repo contains a deployable antelope server that is exposed via a REST API. All the exchange data are static, so a REST model is appropriate. + +The server is linked to an authentication and authorization mechanism that would evaluate each request in terms of the requester's access level. +Every query must be accompanied by an authorization token that has been computed as indicated in +[xdb_tokens.py](https://github.com/AntelopeLCA/antelope/blob/virtualize/antelope/xdb_tokens.py). + + + +## Run the server + +The +From the root directory, run: + + $ ANTELOPE_CATALOG_ROOT=/data/LCI/my_container uvicorn api:app --host 0.0.0.0 --reload + -The server should be linked to an authentication and authorization mechanism that would evaluate each request in terms of the requestor's access level. ## config diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..489dadd --- /dev/null +++ b/api/__init__.py @@ -0,0 +1 @@ +from .api import app diff --git a/api/antelope_api.md b/api/antelope_api.md index 591c66d..2d4b37f 100644 --- a/api/antelope_api.md +++ b/api/antelope_api.md @@ -31,14 +31,50 @@ A tuple of (origin, external reference) specifies a distinct entity. In the eve ### server-wide queries + APIv2_ROOT/ - return server metadata, incl list of origins APIv2_ROOT/origins - return list of known origins When I think of more, I'll put them here. + +### Basic Entity queries + +These types of queries do not depend on any particular interface access. + +Entity-specific queries: + + APIv2_ROOT/[origin]/[entity id] - return a thorough description of the entity + APIv2_ROOT/[origin]/[entity id]/reference - return a unitary reference* + APIv2_ROOT/[origin]/[process id]/references - return list of reference exchanges + APIv2_ROOT/[origin]/[flow id]/unit - unit string of the flow's reference quantity + APIv2_ROOT/[origin]/[flow id]/context - the flow's full context as a list (or empty list) + +* A quantity's reference is a unit (string); a flow's reference is a quantity record. Processes are constituted to have zero or more reference exchanges, though most have only one. If a process has a single reference exchange (or if a unitary reference is somehow designated), it will be returned; otherwise a 404 is returned with the message "No Reference" or "Multiple References". + +On the other hand, non-processes with unitary references can always be returned as single-entry lists, so the `references` query will never return an error for a valid entity. + +The basic interface can also be used to compute LCIA results, provided (a) the xdb server has access to a background +implementation for the named process; (b) the xdb server recognizes the quantity and can query against it and (c) the +query credentials are authorized by the qdb that is consulted for the query. The query does NOT require background +or even exchange access to run this route, but if background access is not present, then only a summary LCIA resut +(i.e. no details) will be returned. + +quantity is known to the local qdb: + + APIv2_ROOT/[origin]/[process id]/lcia/[quantity id] - perform LCIA on process LCI + APIv2_ROOT/[origin]/[process id]/[ref flow]/lcia/[quantity id] + +quantity is known in a remote qdb (xdb must be able to resolve the resource) + + APIv2_ROOT/[origin]/[process id]/lcia/[quantity origin]/[quantity id] + APIv2_ROOT/[origin]/[process id]/[ref flow]/lcia/[quantity origin]/[quantity id] + + ### Index queries Origin-specific queries: + APIv2_ROOT/[origin]/ - list entity records; query to search APIv2_ROOT/[origin]/processes APIv2_ROOT/[origin]/flows @@ -48,33 +84,27 @@ Origin-specific queries: # would these be better as /processes/count? APIv2_ROOT/[origin]/count - dict containing count of all entity types - APIv2_ROOT/[origin]/count/ - int reporting count of specified entity type + +These are not implemented at the API, but could be: + + APIv2_ROOT/[origin]/count/ - int reporting count of specified entity type APIv2_ROOT/[origin]/count/processes - /count/process synonym APIv2_ROOT/[origin]/count/flows - /count/flow synonym APIv2_ROOT/[origin]/count/quantities APIv2_ROOT/[origin]/count/contexts APIv2_ROOT/[origin]/count/flowables - APIv2_ROOT/[origin]/synonyms/[term] - list synonyms for the specified term - APIv2_ROOT/[origin]/synonyms?term=term - "" "" - APIv2_ROOT/[origin]/get_context/[term] - return canonical full context for term, as a list - APIv2_ROOT/[origin]/get_context?term=term - "" "" - Entity-specific queries: - APIv2_ROOT/[origin]/[entity id] - return a thorough description of the entity - APIv2_ROOT/[origin]/[entity id]/reference - return a unitary reference* - APIv2_ROOT/[origin]/[process id]/references - return list of reference exchanges - APIv2_ROOT/[origin]/[flow id]/unit - unit string of the flow's reference quantity - APIv2_ROOT/[origin]/[flow id]/context - the flow's full context as a list (or empty list) APIv2_ROOT/[origin]/[flow id]/targets - return reference exchanges containing the flow - APIv2_ROOT/[origin]/[context]/parent - context's parent or none - APIv2_ROOT/[origin]/[context]/sense - context's parent or none - APIv2_ROOT/[origin]/[context]/subcontexts - list of subcontexts -* A quantity's reference is a unit (string); a flow's reference is a quantity record. Processes are constituted to have zero or more reference exchanges, though most have only one. If a process has a single reference exchange (or if a unitary reference is somehow designated), it will be returned; otherwise a 404 is returned with the message "No Reference" or "Multiple References". + APIv2_ROOT/[origin]/contexts/[context] - get_context() implementation - includes parent, sense, subcontexts -On the other hand, non-processes with unitary references can always be returned as single-entry lists, so the `references` query will never return an error for a valid entity. +These are not implemented at the API, but could be: + + APIv2_ROOT/[origin]/contexts/[context]/parent - context's parent or none + APIv2_ROOT/[origin]/contexts/[context]/sense - context's sense or none + APIv2_ROOT/[origin]/contexts/[context]/subcontexts - list of subcontexts ### Documentary queries @@ -144,88 +174,77 @@ Only in cases where processes have a single designated reference exchange, may t All background aspect queries return lists of exchanges, either reference exchanges (value always 1) or dependent exchanges (normalized to reference exchange). The "aspects" are as follows: - APIv2_ROOT/[origin]/[process id]/[ref flow]/consumers - [reference exchanges] - APIv2_ROOT/[origin]/[process id]/[ref flow]/dependencies - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/emissions - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/cutoffs - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/lci - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/sys_lci - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/foreground - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/ad - [exchange values] - APIv2_ROOT/[origin]/[process id]/[ref flow]/bf - [exchange values] + APIv2_ROOT/[origin]/[process id]/[ref flow]/consumers - [reference exchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/dependencies - [IntermediateExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/emissions - [ElementaryExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/cutoffs - [CutoffExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/lci - [ElementaryExchanges] + [CutoffExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/sys_lci - [ElementaryExchanges] + [CutoffExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/foreground - [exchange values] + APIv2_ROOT/[origin]/[process id]/[ref flow]/ad - [IntermediateExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/bf - [ElementaryExchanges] + [CutoffExchanges] + APIv2_ROOT/[origin]/[process id]/[ref flow]/lci/[dep flow] - [ElementaryExchanges] Only flows terminated to *elementary* contexts are emissions; other flows (both unterminated and terminated to intermediate contexts) are "cutoffs". +### Quantity queries + +There are two layers to the quantity engine: the native layer, which includes only strict mappings and does not perform +any reconciliation, and the qdb layer, which does reconcile the native data with a set of canonical terms. The +native-layer queries are answered by the individual archives and their TermManagers, while the qdb-layer queries +are answered by the catalog's LciaEngine. The qdb layer is also (someday) going to be implemented by a stand-alone +qdb which implements a graph database. + +Some things worth noting: + * The native-layer quantity queries are a subset of the full quantity interface, i.e. some queries only make sense + in the context of reconciliation across data sources (for instance, the factors POST method). + * The native layer is nominally read-only (comes from a static XDB data source), but the qdb layer can grow + * Identification of elementary flows is not reliable for native queries, only for qdb queries (because it depends + on reconciling contexts) + +**Native-layer quantity queries** + + APIv2_ROOT/[origin]/synonyms?term=term - list synonyms for the specified term + APIv2_ROOT/[origin]/contexts/[term] - return canonical full context for term + + APIv2_ROOT/[origin]/[flow id]/profile - list characterizations for the flow + APIv2_ROOT/[origin]/[flow id]/cf/[quantity id] - return the characterization value as a float (or 0.0) + + APIv2_ROOT/[origin]/[quantity id]/norm - return a normalization dict + APIv2_ROOT/[origin]/[quantity id]/factors - list characterizations for the quantity + APIv2_ROOT/[origin]/[quantity id]/convert/[flow id] - return a QuantityConversion + APIv2_ROOT/[origin]/[quantity id]/convert/[flowable]/[ref quantity] - return a QuantityConversion + + APIv2_ROOT/[origin]/[quantity id]/lcia {POST} - perform LCIA on POSTDATA = list of exchange refs + + ## Summary of return types: - * String - * Integer - * Float - * EntityRecord - origin, entity ID, entity type, name - * RichEntityRecord - EntityRecord + search key, search value (*for use in answering a search query*) - * Context - name, parent, sense - * Reference Exchange - origin, process, flow, direction, locale[, comment] - * Exchange - origin, process, flow, termination, locale[, comment] - * ExchangeValue - Exchange + value - * ExteriorFlow - origin, flow, direction, termination +Meta-types + * ServerMeta - info about the xdb + * OriginMeta - available / authorized interfaces + * OriginCount - maybe part of OriginMeta? + +Basic/Index types + * Entity - origin, entity ID, entity type, properties + * FlowEntity - Entity + context, locale, referenceQuantity + * Context - name, parent, elementary, sense, subcontexts + +Exchange/Background types + * ExteriorFlow - origin, flow, direction (W/R/T interior), context + * Exchange - origin, process, flow, direction, termination, type, comment, str + * ReferenceExchange - reference=True, termination=None + * ReferenceValue - ReferenceExchange + value + * ExchangeValues - Exchange + multiple values, one per reference, + uncertainty + * AllocatedExchange - Exchange + ref_flow + value + uncertainty + +Quantity types + * Characterization - origin, flowable, ref quantity, query quantity, context, dict of locale:value + * Normalization - origin, quantity, dict of locale: value + * QuantityConversion - basically a QRResult: origin, flowable, ref qty, q qty, context, locale, value + * LciaDetailedResult + * LciaAggregation + * LciaResult I think that's all of them. -## A key question for Return Data - -For these queries, I have the decision of whether and how to state the entity's origin in the response. The client must know the origin because the origin is part of the request-- thus re-stating it wastes bandwidth? or is it not a concern bc of gzip? - -Query: `APIv2_ROOT/my.data.source/processes?name=aluminium` - -Option 1: explicit, full: - - [ - { - "origin": "my.data.source", - "entityId": "4xad", - "entityType": "process", - "name": "Aluminium casting plant" - }, - { - "origin": "my.data.source", - "entityId": "4xae", - "entityType": "process", - "name": "Aluminium smelting plant" - }, - ... - ] - -Option 1: explicit, nested - - { - "origin": "my.data.source", - "processes": [ - { - "entityId": "4xad", - "entityType": "process", - "name": "Aluminium casting plant" - }, - { - "entityId": "4xae", - "entityType": "process", - "name": "Aluminium smelting plant" - }, - .... - ] - } - -Option 3: unspecified (implicit, most compact): - - [ - { - "entityId": "4xad", - "entityType": "process", - "name": "Aluminium casting plant" - }, - { - "entityId": "4xae", - "entityType": "process", - "name": "Aluminium smelting plant" - }, - ... - ] diff --git a/api/api.py b/api/api.py index 09df2ce..1a38f01 100644 --- a/api/api.py +++ b/api/api.py @@ -1,10 +1,28 @@ -from models.response import Origin, Entity +from antelope_core.models import (OriginMeta, OriginCount, Entity, FlowEntity, Context, Exchange, ReferenceExchange, + ExchangeValues, ReferenceValue, DetailedLciaResult, SummaryLciaResult, + AllocatedExchange, Characterization, Normalizations, + generate_pydantic_exchanges) -from fastapi import FastAPI -from typing import List +from antelope_core.entities import MetaQuantityUnit +from antelope_core.auth import AuthorizationGrant, JwtGrant + +from .models.response import ServerMeta, PostTerm + +from .runtime import PUBLIC_ORIGINS, UNRESTRICTED_GRANTS, cat, search_entities, do_lcia, PUBKEYS +from .qdb import qdb_router + +from antelope import EntityNotFound, MultipleReferences, NoReference, check_direction, EXCHANGE_TYPES, IndexRequired + +from fastapi import FastAPI, HTTPException, Depends +from fastapi.security import OAuth2PasswordBearer +from jose import JWTError, jwt + +from typing import List, Optional import logging import os + + LOGLEVEL = os.environ.get('LOGLEVEL', default='WARNING').upper() logging.basicConfig(level=LOGLEVEL) @@ -14,28 +32,704 @@ description="API for the exchange database" ) -@app.get("/", response_model=List[Origin]) -def get_origin(): - return [{ - "id": 123, - "name": "fake origin", - "description": "origin is fake" - }] - - -@app.get("/{origin}/", response_model=List[Entity]) -def get_entities(origin: int): - return [{ - "entity_id": 123, - "entity_type": "entity type", - "name": f"belongs to {origin}" - }] - -@app.get("/{origin}/{entity}/", response_model=Entity) -def get_entity(origin: int, entity: int): - return { - "entity_id": entity, - "entity_type": "entity type", - "name": f"belongs to origin:{origin}" - } +app.include_router(qdb_router) + + +oauth2_scheme = OAuth2PasswordBearer(auto_error=False, tokenUrl="token") # the tokenUrl argument appears to do nothing + +""" +Our implied auth database requires the following tables (basically all the stuff in runtime.py): + +PUBKEYS of public keys associated with valid token issuers (revocable / managed) +ORIGINS and their maintainers +ISSUES which pair an origin, issuer, and interface (enum), possibly with other auth metadata + +Issues with the master issuer can be considered "public" interfaces, because of a policy that the auth server will +issue a token for a public interface to any user (subject to usage constraints) + +[or: issues with None issuer do not require tokens] + +The db content is presently provided in digested form as PUBKEYS (containing 1 entry), PUBLIC_ORIGINS and +UNRESTRICTED_GRANTS, with all other grants being assumed to be delivered by JWT (nonstatically) + +qdb is mainly where usage is metered for public data + +but also, the issuing of tokens is metered- a free user will get say 15 or 25 sessions/month + + + + +""" + + +def get_token_grants(token: Optional[str]): + if token is None: + return [] + payload = jwt.decode(token, 'a fish', options={'verify_signature': False}) + try: + pub = PUBKEYS[payload['iss']] # this tells us the issuer signed this token + except KeyError: + raise HTTPException(401, detail='Issuer %s unknown' % payload['iss']) + try: + valid_payload = jwt.decode(token, pub, algorithms=['RS256']) + except JWTError: + raise HTTPException(401, detail='Token failed verification') + # however, we also need to test whether the issuer is trusted with the requested origin(s). otherwise one + # compromised key would allow a user to issue a token for any origin + return AuthorizationGrant.from_jwt(JwtGrant(**valid_payload)) + + +@app.get("/", response_model=ServerMeta) +def get_server_meta(token: Optional[str] = Depends(oauth2_scheme)): + grants = get_token_grants(token) + sm = ServerMeta.from_app(app) + for org in PUBLIC_ORIGINS: + sm.origins.append(org) + for org in sorted(set(k.origin for k in grants)): + sm.authorized_origins.append(org) + return sm + + +def _get_authorized_query(origin, token): + """ + The main point of this is to ask the auth server / oauth grant / etc what the supplied credentials authorize + with respect to the stated origin. + + The way I had imagined this was: + the service we offer is trusted [in two senses] computation in a private setting. The user has paid to activate + the container (like a holosuite / DS9) and they have access to: free stuff, ecoinvent, other subscriptions, + their private data, private data that's been shared with them. all that is specified by semantic origin, which + has the format: + + [origin]/query + [origin]/[dataset]/query + + and origin specifies the provider, resource, and scope (e.g. version) of the query. + + so, e.g. ecoinvent.3.4.cutoff/uuid0123-4567-.../exchanges/flow9876-5432-* + + Auth grants look like (see antelope_core.auth): + + AuthorizationGrant(ResponseModel): + auth: apiToken + origin: str + access: [one of READONLY_INTERFACE_TYPES] + # read: True # existence of the grant implies read access + values: bool # access to numerical data [exchange values; characterization results] + update: bool # permission to update data [within the specified interface] + + under consideration: + masq: bool # whether to conceal sub-origins + + Somehow we need / want to implement a sort of mapping where detailed origins are presented as higher-level + origins for external query. For instance, an XDB may have the following origins on hand: + - lcacommons.uslci.fy2020.q3 + - lcacommons.uslci.fy2020.q4 + - lcacommons.uslci.fy2021.q1 + but we would like to configure only one of them to be the origin of record in response to 'lcacommons.uslci' + queries. Ultimately this may depend on the user's authorization-- certain users may have access to all the + different version, but others will only get 'lcacommons.uslci' + + We also need to decide whether to "masquerade" the true origin or not, i.e. if a user is authorized for + 'lcacommons.uslci' and the origin of record is 'lcacommons.uslci.fy2021.q1', do the queries include the true + origin or the authorized one? I would tend toward the true origin. this is the masq[uerade] question. + + :param origin: + :return: a catalog query, with an authorized_interfaces attribute that returns: a set of authorizations. spec tbd. + """ + auth_grants = get_token_grants(token) + grants = UNRESTRICTED_GRANTS + auth_grants + q = cat.query(origin, grants=grants) # XdbCatalog will return an XdbQuery which auto-enforces grants !! + # q.authorized_interfaces = set([k.split(':')[1] for k in cat.interfaces if k.startswith(origin)]) + return q + + +@app.get("/origins", response_model=List[str]) +def get_origins(token: Optional[str] = Depends(oauth2_scheme)): + auth_grants = get_token_grants(token) + all_origins = PUBLIC_ORIGINS + list(set(k.origin for k in auth_grants)) + return [org for org in all_origins if cat.known_origin(org)] + + +def _origin_meta(origin, token): + """ + It may well be that OriginMeta needs to include config information (at minimum, context hints)- in which + case the meta object should be constructed from resources, not from blackbox queries. we shall see + :param origin: + :return: + """ + is_lcia = _get_authorized_query(origin, token).is_lcia_engine() + return { + "origin": origin, + "is_lcia_engine": is_lcia, + "interfaces": list(set([k.split(':')[1] for k in cat.interfaces if k.startswith(origin)])) + } + + +@app.get("/{origin}/", response_model=List[OriginMeta]) +def get_origin(origin: str, token: Optional[str] = Depends(oauth2_scheme)): + """ + TODO: reconcile the AVAILABLE origins with the CONFIGURED origins and the AUTHORIZED origins + :param origin: + :param token: + :return: + """ + return [_origin_meta(org, token) for org in cat.origins if org.startswith(origin)] + + +@app.get("/{origin}/synonyms", response_model=List[str]) +@app.get("/{origin}/synonyms/{term}", response_model=List[str]) +def get_synonyms(origin:str, term: str, token: Optional[str] = Depends(oauth2_scheme)): + return _get_authorized_query(origin, token).synonyms(term) + + +@app.post("/{origin}/synonyms", response_model=List[str]) +def post_synonyms(origin:str, post_term: PostTerm, token: Optional[str] = Depends(oauth2_scheme)): + return _get_authorized_query(origin, token).synonyms(post_term.term) + + + +@app.get("/{origin}/count", response_model=List[OriginCount]) +def get_origin(origin:str, token: Optional[str] = Depends(oauth2_scheme)): + return list(_get_origin_counts(origin, token)) + + + +def _get_origin_counts(origin: str, token: Optional[str]): + for org in cat.origins: + if not org.startswith(origin): + continue + try: + q = _get_authorized_query(org, token) + yield { + 'origin': org, + 'count': { + 'processes': q.count('process'), + 'flows': q.count('flow'), + 'quantities': q.count('quantity'), + 'flowables': len(list(q.flowables())), + 'contexts': len(list(q.contexts())) + } + } + except IndexRequired: + pass + +''' +Index Interface +''' +@app.get("/{origin}/processes") # , response_model=List[Entity]) +def search_processes(origin:str, + name: Optional[str]=None, + classifications: Optional[str]=None, + spatialscope: Optional[str]=None, + comment: Optional[str]=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + kwargs = {'name': name, + 'classifications': classifications, + 'spatialscope': spatialscope, + 'comment': comment} + return list(search_entities(query, 'processes', **kwargs)) + +@app.get("/{origin}/flows", response_model=List[FlowEntity]) +def search_flows(origin:str, + name: Optional[str]=None, + casnumber: Optional[str]=None, + token: Optional[str] = Depends(oauth2_scheme)): + kwargs = {'name': name, + 'casnumber': casnumber} + query = _get_authorized_query(origin, token) + return list(search_entities(query, 'flows', **kwargs)) + +@app.get("/{origin}/quantities", response_model=List[Entity]) +def search_quantities(origin:str, + name: Optional[str]=None, + referenceunit: Optional[str]=None, + token: Optional[str] = Depends(oauth2_scheme)): + kwargs = {'name': name, + 'referenceunit': referenceunit} + query = _get_authorized_query(origin, token) + return list(search_entities(query, 'quantities', **kwargs)) + +@app.get("/{origin}/lcia_methods", response_model=List[Entity]) +@app.get("/{origin}/lciamethods", response_model=List[Entity]) +def search_lcia_methods(origin:str, + name: Optional[str]=None, + referenceunit: Optional[str]=None, + method: Optional[str]=None, + category: Optional[str]=None, + indicator: Optional[str]=None, + token: Optional[str] = Depends(oauth2_scheme)): + kwargs = {'name': name, + 'referenceunit': referenceunit, + 'method': method, + 'category': category, + 'indicator': indicator} + query = _get_authorized_query(origin, token) + return list(search_entities(query, 'lcia_methods', **kwargs)) + + +@app.get("/{origin}/lcia", response_model=List[Entity]) +def get_meta_quantities(origin, + name: Optional[str] = None, + method: Optional[str] = None, + token: Optional[str] = Depends(oauth2_scheme)): + kwargs = {'name': name, + 'method': method} + query = _get_authorized_query(origin, token) + return list(search_entities(query, 'quantities', unit=MetaQuantityUnit.unitstring, **kwargs)) + + +@app.get("/{origin}/contexts", response_model=List[Context]) +def get_contexts(origin: str, elementary: bool=None, sense=None, parent=None, + token: Optional[str] = Depends(oauth2_scheme)): + q = _get_authorized_query(origin, token) + if parent is not None: + parent = q.get_context(parent) + cxs = [Context.from_context(cx) for cx in q.contexts()] + if elementary is not None: + cxs = filter(lambda x: x.elementary == elementary, cxs) + if sense is not None: + cxs = filter(lambda x: x.sense == sense, cxs) + if parent is not None: + cxs = filter(lambda x: x.parent == parent, cxs) + return list(cxs) + + +@app.get("/{origin}/contexts/{context}", response_model=Context) +def get_context(origin: str, context: str, + token: Optional[str] = Depends(oauth2_scheme)): + q = _get_authorized_query(origin, token) + return Context.from_context(q.get_context(context)) + + +@app.get("/{origin}/{flow}/targets", response_model=List[ReferenceExchange]) +def get_targets(origin, flow, direction: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + if direction is not None: + direction = check_direction(direction) + return list(ReferenceExchange.from_exchange(x) for x in _get_authorized_query(origin, token).targets(flow, direction=direction)) + + +''' +Basic interface +(Listed after index interface b/c route resolution precedence +''' +def _get_typed_entity(q, entity, etype=None): + try: + e = q.get(entity) + except EntityNotFound: + raise HTTPException(404, "Entity %s not found" % entity) + if e is None: + raise HTTPException(404, "Entity %s is None" % entity) + if etype is None or e.entity_type == etype: + return e + raise HTTPException(400, detail="entity %s is not a %s" % (entity, etype)) + + +@app.get("/{origin}/{entity}", response_model=Entity) +def get_entity(origin: str, entity: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + e = _get_typed_entity(query, entity) + if e.entity_type == 'process': + ent = Entity.from_entity(e) + ent.properties[e.reference_field] = [ReferenceExchange.from_exchange(x) for x in e.reference_entity] + elif e.entity_type == 'flow': + ent = FlowEntity.from_flow(e) + else: + ent = Entity.from_entity(e) + ent.properties[e.reference_field] = str(e.reference_entity) + for p in e.properties(): + try: + ent.properties[p] = e[p] + except KeyError as err: + ent.properties[p] = err + return ent + + +@app.get("/{origin}/processes/{entity}/", response_model=Entity) +def get_named_process(origin: str, entity: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + return _get_typed_entity(query, entity, 'process') + +@app.get("/{origin}/flows/{entity}/", response_model=Entity) +def get_named_flow(origin: str, entity: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + return _get_typed_entity(query, entity, 'flow') + +@app.get("/{origin}/quantities/{entity}/", response_model=Entity) +@app.get("/{origin}/flowproperties/{entity}/", response_model=Entity) +@app.get("/{origin}/flow_properties/{entity}/", response_model=Entity) +@app.get("/{origin}/lciamethods/{entity}/", response_model=Entity) +@app.get("/{origin}/lcia_methods/{entity}/", response_model=Entity) +def get_named_quantity(origin: str, entity: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + return _get_typed_entity(query, entity, 'quantity') + + +@app.get("/{origin}/{entity}/reference") # SHOOP +def get_unitary_reference(origin, entity, token: Optional[str] = Depends(oauth2_scheme)): + """ + Response model varies with entity type (!) + Quantity: reference is unit + Flow: reference is quantity (unit) + Process: reference is ReferenceExchange or an exception + :param origin: + :param entity: + :param token: + :return: + """ + query = _get_authorized_query(origin, token) + ent = _get_typed_entity(query, entity) + if ent.entity_type == 'quantity': + return ent.unit + elif ent.entity_type == 'process': + try: + rx = ReferenceExchange.from_exchange(ent.reference()) + except MultipleReferences: + raise HTTPException(404, f"Process {entity} has multiple references") + except NoReference: + raise HTTPException(404, f"Process {entity} has no references") + return rx + + else: # if ent.entity_type == 'flow': + return Entity.from_entity(ent.reference_entity) + + +@app.get("/{origin}/processes/{entity}/references") +@app.get("/{origin}/flows/{entity}/references") +@app.get("/{origin}/quantities/{entity}/references") +@app.get("/{origin}/flowproperties/{entity}/references") +@app.get("/{origin}/{entity}/references") +def get_references(origin, entity, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + ent = _get_typed_entity(query, entity) + if ent.entity_type == 'process': + return list(ReferenceValue.from_rx(rx) for rx in ent.references()) + else: + return [get_unitary_reference(origin, entity)] + + +@app.get("/{origin}/{entity}/uuid", response_model=str) +def get_uuid(origin, entity, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + ent = _get_typed_entity(query, entity) + return ent.uuid + +@app.get("/{origin}/{entity}/properties", response_model=List[str]) # SHOOP +def get_properties(origin, entity, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + ent = _get_typed_entity(query, entity) + return list(ent.properties()) + + + +@app.get("/{origin}/{entity}/doc/{item}") # SHOOP +def get_item(origin, entity, item, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + ent = _get_typed_entity(query, entity) + return ent[item] + + +@app.get("/{origin}/flows/{entity}/unit") # SHOOP +@app.get("/{origin}/flowproperties/{entity}/unit") # SHOOP +@app.get("/{origin}/quantities/{entity}/unit") # SHOOP +@app.get("/{origin}/{entity}/unit") # SHOOP +def get_unit(origin, entity, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + return getattr(_get_typed_entity(query, entity), 'unit') + + +@app.get("/{origin}/flows/{flow}/context", response_model=Context) +@app.get("/{origin}/{flow}/context", response_model=Context) +def get_flow_context(origin, flow, token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + f = _get_typed_entity(query, flow, 'flow') + return get_context(origin, f.context[-1]) # can't remember if flow is already context-equipped + + +def _get_rx_by_ref_flow(p, ref_flow): + """ + This is really a bad request because either a path param or a query param was missing + :param p: + :param ref_flow: could be None if p has a unitary reference + :return: + """ + try: + return p.reference(ref_flow) + except MultipleReferences: + raise HTTPException(400, f"Process {p} has multiple references") + except NoReference: + raise HTTPException(400, f"Process {p} has no references") + + +@app.get("/{origin}/{process}/lcia/{quantity}", response_model=List[DetailedLciaResult]) # SHOOP +@app.get("/{origin}/{process}/lcia/{qty_org}/{quantity}", response_model=List[DetailedLciaResult]) +@app.get("/{origin}/{process}/{ref_flow}/lcia/{quantity}", response_model=List[DetailedLciaResult]) +@app.get("/{origin}/{process}/{ref_flow}/lcia/{qty_org}/{quantity}", response_model=List[DetailedLciaResult]) +def get_remote_lcia(origin: str, process: str, quantity: str, ref_flow: str=None, qty_org: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + """ + + :param origin: + :param process: + :param quantity: + :param ref_flow: [None] if process has multiple references, one must be specified + :param qty_org: [None] if + :param token: + :return: + """ + pq = _get_authorized_query(origin, token) + p = pq.get(process) + rx = _get_rx_by_ref_flow(p, ref_flow) + lci = list(p.lci(rx)) + + if qty_org is None: + try: + qq = cat.lcia_engine.get_canonical(quantity) + except EntityNotFound: + raise HTTPException(404, f"Quantity {quantity} not found") + query = _get_authorized_query(qq.origin, token) + else: + query = _get_authorized_query(qty_org, token) + qq = query.get_canonical(quantity) + + ress = do_lcia(query, qq, lci) + + if 'exchange' in pq.authorized_interfaces(): + return [DetailedLciaResult.from_lcia_result(p, res) for res in ress] + else: + return [SummaryLciaResult.from_lcia_result(p, res) for res in ress] + + +"""TO WRITE: +/{origin}/{flow}/locale ??? + +/{origin}/{flow}/emitters + +""" +''' +exchange interface +''' +@app.get("/{origin}/{process}/exchanges", response_model=List[Exchange]) # SHOOP +def get_exchanges(origin, process, type: str=None, flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + if type and (type not in EXCHANGE_TYPES): + raise HTTPException(400, detail=f"Cannot understand type {type}") + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + exch = p.exchanges(flow=flow) + return list(generate_pydantic_exchanges(exch, type=type)) + + +@app.get("/{origin}/{process}/exchanges/{flow}", response_model=List[ExchangeValues]) # SHOOP +def get_exchange_values(origin, process, flow: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + exch = p.exchange_values(flow=flow) + return list(ExchangeValues.from_ev(x) for x in exch) + + +@app.get("/{origin}/{process}/inventory", response_model=List[AllocatedExchange]) +@app.get("/{origin}/{process}/{ref_flow}/inventory", response_model=List[AllocatedExchange]) +def get_inventory(origin, process, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rx = _get_rx_by_ref_flow(p, ref_flow) + + inv = p.inventory(rx) + return list(AllocatedExchange.from_inv(x, rx.flow.external_ref) for x in inv) + + +''' +background interface + +Routes that depend on topological ordering + +/{origin}/foreground [ReferenceExchange] +/{origin}/background [ReferenceExchange] +/{origin}/interior [ReferenceExchange] +/{origin}/exterior [ExteriorFlow] + +/{origin}/{process}/{ref_flow}/dependencies [AllocatedExchange]; term is node +/{origin}/{process}/{ref_flow}/emissions [AllocatedExchange]; term is context +## fair question whether cutoffs should include non-elementary contexts- +## related: are cutoffs a subset of emissions, disjoint, or intersect? +Standard usage: "emissions" are elementary, "cutoffs" are dangling. Problem is, +that predicates the response on having an LciaEngine to resolve the contexts, and +present Background impl does not: it asks its local _index. now we could just give +every term manager the set of default contexts... what other differences exist? +/{origin}/{process}/{ref_flow}/cutoffs [AllocatedExchange]; term is None + +/{origin}/{process}/{ref_flow}/consumers [ReferenceExchange] +/{origin}/{process}/{ref_flow}/foreground [one ReferenceExchange followed by AllocatedExchanges] + +/{origin}/{process}/{ref_flow}/ad [AllocatedExchange] +/{origin}/{process}/{ref_flow}/bf [AllocatedExchange] +/{origin}/{process}/{ref_flow}/lci [AllocatedExchange] +/{origin}/{process}/{ref_flow}/lci/{ext_flow} [AllocatedExchange] + +/{origin}/{process}/{ref_flow}/syslci [POST] [AllocatedExchange] + +''' + +@app.get('/{origin}/{process}/{ref_flow}/ad', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/ad', response_model=List[AllocatedExchange]) +def get_ad(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.ad(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/bf', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/bf', response_model=List[AllocatedExchange]) +def get_bf(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.bf(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/dependencies', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/dependencies', response_model=List[AllocatedExchange]) +def get_dependencies(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.dependencies(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/emissions', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/emissions', response_model=List[AllocatedExchange]) +def get_emissions(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + """ + This returns a list of elementary exchanges from the process+reference flow pair. + :param origin: + :param process: + :param ref_flow: + :param token: + :return: + """ + + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.emissions(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/cutoffs', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/cutoffs', response_model=List[AllocatedExchange]) +def get_cutoffs(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.cutoffs(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/lci', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/lci', response_model=List[AllocatedExchange]) +def get_lci(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(AllocatedExchange.from_inv(x, ref_flow=rf.flow.external_ref) for x in p.lci(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/consumers', response_model=List[AllocatedExchange]) +@app.get('/{origin}/{process}/consumers', response_model=List[AllocatedExchange]) +def get_consumers(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + return list(ReferenceExchange.from_exchange(x) for x in p.consumers(ref_flow=rf)) + + +@app.get('/{origin}/{process}/{ref_flow}/foreground', response_model=List[ExchangeValues]) +@app.get('/{origin}/{process}/foreground', response_model=List[ExchangeValues]) +def get_foreground(origin: str, process: str, ref_flow: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + p = _get_typed_entity(query, process, 'process') + rf = _get_rx_by_ref_flow(p, ref_flow) + fg = p.foreground(ref_flow=rf) + rtn = [ReferenceValue.from_rx(next(fg))] + for dx in fg: + rtn.append(ExchangeValues.from_ev(dx)) + return rtn + + +''' +quantity interface +key question: is_lcia_engine() bool returns whether the source performs input harmonization, e.g. whether it supports +the POST routes: + APIv2ROOT/[origin]/[quantity id]/factors POST flow specs (or exterior flows?)- map flow spec to factors + APIv2ROOT/[origin]/[quantity]/lcia POST exchanges- returns an LciaResult + +Now, a source-specific quantity interface, which is necessary to expose non-harmonized quantity data from +xdb data sources. + +The main characteristic of these routes is that they use each archive's term manager instead of the catalog's quantity +manager. + +Applicable routes: + + Origin-specific (implemented above) + APIv2_ROOT/[origin]/synonyms?term=term - list synonyms for the specified term + APIv2_ROOT/[origin]/contexts/[term] - return canonical full context for term + + Entity-specific + APIv2_ROOT/[origin]/[flow id]/profile - list characterizations for the flow + APIv2_ROOT/[origin]/[flow id]/cf/[quantity id] - return the characterization value as a float (or 0.0) + + APIv2_ROOT/[origin]/[quantity id]/norm - return a normalization dict + APIv2_ROOT/[origin]/[quantity id]/factors - list characterizations for the quantity + APIv2_ROOT/[origin]/[quantity id]/convert/[flow id] - return a QuantityConversion + APIv2_ROOT/[origin]/[quantity id]/convert/[flowable]/[ref quantity] - return a QuantityConversion + + APIv2_ROOT/[origin]/[quantity id]/lcia {POST} - perform LCIA on POSTDATA = list of exchange refs + +''' + +@app.get('/{origin}/{flow_id}/profile', response_model=List[Characterization]) +def get_flow_profile(origin: str, flow_id: str, quantity: str=None, context: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + f = _get_typed_entity(query, flow_id, 'flow') + if quantity is not None: + quantity = _get_typed_entity(query, quantity, 'quantity') + if context is not None: + context = query.get_context(context) + return [Characterization.from_cf(cf) for cf in f.profile(quantity=quantity, context=context)] + + +@app.get('/{origin}/{quantity_id}/norm', response_model=Normalizations) +def get_quantity_norms(origin:str, quantity_id: str, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + q = _get_typed_entity(query, quantity_id, 'quantity') + return Normalizations.from_q(q) + + +@app.get('/{origin}/{quantity_id}/factors', response_model=List[Characterization]) +@app.get('/{origin}/{quantity_id}/factors/{flowable}', response_model=List[Characterization]) +def get_quantity_norms(origin:str, quantity_id: str, flowable: str=None, + token: Optional[str] = Depends(oauth2_scheme)): + query = _get_authorized_query(origin, token) + q = _get_typed_entity(query, quantity_id, 'quantity') + enum = q.factors(flowable=flowable) + return list(Characterization.from_cf(cf) for cf in enum) diff --git a/api/models/response.py b/api/models/response.py index f710caa..b4277fa 100644 --- a/api/models/response.py +++ b/api/models/response.py @@ -1,18 +1,54 @@ +""" +DEPRECATED-- these models now live in antelope_core +""" from pydantic import BaseModel from pydantic.typing import List +from antelope_core.models import QuantityConversion -class ResponseModel(BaseModel): - # There's good reason for having this child class later on. - # It is to allow for global response model configuration via inheritance. - pass +class ServerMeta(BaseModel): + title: str + version: str + description: str + origins: List[str] + authorized_origins: List[str] + + @classmethod + def from_app(cls, app): + obj = cls(title=app.title, + version=app.version, + description=app.description, + origins=list(), + authorized_origins=list()) + return obj -class Entity(ResponseModel): - entity_id: int - entity_type: str - name: str -class Origin(ResponseModel): - id: int - name: str +class QdbMeta(BaseModel): + title: str description: str + + @classmethod + def from_cat(cls, cat): + lcia = cat.lcia_engine + return cls(title=lcia.__class__.__name__, description="Antelope LCIA implementation") + + +class PostTerm(BaseModel): + term: str + + +class PostFactors(BaseModel): + """ + Client POSTs a list of FlowSpecs; server returns a list of characterizations that match the specs, grouped + by flow external_ref (so as to be cached in the flow's chars_seen). + + The challenge here is with contexts: in order for lookup_cf to find the CF, it needs to be cached with a + local context; but in order for the results to be portable/reproducible, the QR results should report the + canonical contexts. So, we add a context field where we reproduce the posted context. + """ + flow_id: str + context: List[str] + factors: List[QuantityConversion] + + def add_qr_result(self, qrr): + self.factors.append(QuantityConversion.from_qrresult(qrr)) diff --git a/api/qdb.py b/api/qdb.py new file mode 100644 index 0000000..5a7597e --- /dev/null +++ b/api/qdb.py @@ -0,0 +1,257 @@ +""" +LCIA Operational Models + +0. Foreground Characterization (lcia on hand) +User has a foreground with a local catalog +User creates local flow refs in a model +user obtains a qdb address for an LCIA indicator (origin + external ref, or simple UUID) +user POSTS a list of FlowSpec models to /{origin}/{lcia_ref}/factors (or /qdb/{lcia_ref}/factors) +the LCIA engine returns a list of quantity relation results for the supplied flows, in the format: +flow_id; +[QRResults] +and for each QRresult, the context is the one PASSED, not the canonical one used for lookup. This ensures that the +client can map its contexts onto the results. The client can then cache all those scores on its flow refs and +perform its own local LCIA. + +0a. Foreground LCIA (lcia on hand) +Client requests an lci from an xdb (which is cached); characterizes the set of flows received via 0; and then +performs local LCIA on the background data + +1. Background LCIA aka "spooky action at a distance" (lcia on xdb) +Perfect for EPDs, proprietary data sets, and results reporting +xdb hosts the entity about which the score is desired +client specifies both the entity and the LCIA method by reference, either locally (external ref e.g. uuid) or fully +qualified (also with origin). +xdb generates the LCI and then performs local LCIA (as step 0a above) on its designated qdb. That qdb has to +recognize the query link, and the query must be authorized to use the qdb. [Note: this has only been achieved with +an integrated qdb, not yet with an 'air gapped' qdb- but presumably it would be equivalent to 0+0a] +The xdb receives the results and can decide what level of detail to return to the user. + +2. LCIA of exchanges (lcia on qdb) +User generates a list of exchanges from an LCI or other means, and POSTS the exchanges to the quantity database, which +intakes them and performs the LCIA and returns the results. Here again the LCIA Details must include FlowSpecs of the +inputs (e.g. context + locale), not the canonical/GLO values, so that the client can map them back to its posted +exchanges. The locale of the conversion is reported in the QRResult. + +""" + +from api.models.response import QdbMeta, PostFactors +from .runtime import cat, search_entities, do_lcia +from fastapi import APIRouter, HTTPException +from typing import List, Optional + +import re + +from antelope import EntityNotFound, UnknownOrigin, CatalogRef, ExchangeRef, ConversionReferenceMismatch, NoFactorsFound +from antelope_core.models import Entity, Context, Characterization, DetailedLciaResult, UnallocatedExchange, FlowSpec +from antelope_core.entities import MetaQuantityUnit, LcFlow, LcProcess + +lcia = cat.lcia_engine + +qdb_router = APIRouter(prefix="/qdb") + +@qdb_router.get("/", response_model=QdbMeta) +def get_qdb_meta(): + return QdbMeta.from_cat(cat) + + +@qdb_router.get("/synonyms", response_model=List[str]) +def get_synonyms(term: str): + return lcia.synonyms(term) + + +def _filter_canonical_quantities(**kwargs): + name = kwargs.pop('name', None) + + sargs = {k: v for k, v in filter(lambda x: x[1] is not None, kwargs.items())} + + for q in lcia.quantities(search=name): + for k, v in sargs.items(): + if not q.has_property(k): + continue + else: + if not bool(re.search(v, q[k], flags=re.IGNORECASE)): + continue + yield q + + +@qdb_router.get("/quantities", response_model=List[Entity]) +@qdb_router.get("/flowproperties", response_model=List[Entity]) +@qdb_router.get("/flow_properties", response_model=List[Entity]) +def search_quantities(name: Optional[str]=None, + unit: Optional[str]=None, + method: Optional[str]=None, + category: Optional[str]=None, + indicator: Optional[str]=None): + kwargs = {'name': name, + 'referenceunit': unit, + 'method': method, + 'category': category, + 'indicator': indicator} + return list(Entity.from_entity(k) for k in _filter_canonical_quantities(**kwargs)) + + +@qdb_router.get("/lcia_methods", response_model=List[Entity]) +@qdb_router.get("/lciamethods", response_model=List[Entity]) +def search_lcia_methods(name: Optional[str]=None, + unit: Optional[str]=None, + method: Optional[str]=None, + category: Optional[str]=None, + indicator: Optional[str]=None): + kwargs = {'name': name, + 'referenceunit': unit, + 'method': method, + 'category': category, + 'indicator': indicator} + + return list(Entity.from_entity(k) for k in filter(lambda x: x.is_lcia_method, + _filter_canonical_quantities(**kwargs))) + + +@qdb_router.get("/lcia", response_model=List[Entity]) +def get_meta_quantities(name: Optional[str] = None, + method: Optional[str] = None): + kwargs = {'name': name, + 'method': method} + query = cat.query('local.qdb') + return list(search_entities(query, 'quantities', unit=MetaQuantityUnit.unitstring, **kwargs)) + + +@qdb_router.get("/contexts", response_model=List[Context]) +def get_contexts(elementary: bool=None, sense=None, parent=None): + if parent is not None: + parent = lcia[parent] + cxs = [Context.from_context(cx) for cx in lcia.contexts()] + if elementary is not None: + cxs = filter(lambda x: x.elementary == elementary, cxs) + if sense is not None: + cxs = filter(lambda x: x.sense == sense, cxs) + if parent is not None: + cxs = filter(lambda x: x.parent == parent, cxs) + return list(cxs) + + +@qdb_router.get("/contexts/{context}", response_model=Context) +def get_contexts(context: str): + return Context.from_context(lcia[context]) + + +def _get_canonical(origin, quantity): + try: + if origin is None: + q = lcia.get_canonical(quantity) + else: + cat.query(origin).get(quantity) # registers it with qdb + q = lcia.get_canonical(quantity) + except EntityNotFound: + raise HTTPException(404, f"quantity {quantity} not found") + except UnknownOrigin: + raise HTTPException(404, f"Unknown origin {origin}") + return q + + +@qdb_router.get("/{quantity}/factors", response_model=List[Characterization]) +@qdb_router.get("/{origin}/{quantity}/factors", response_model=List[Characterization]) +@qdb_router.get("/{quantity}/factors/{flowable}", response_model=List[Characterization]) +@qdb_router.get("/{origin}/{quantity}/factors/{flowable}", response_model=List[Characterization]) +def factors_for_quantity(quantity: str, origin: str=None, flowable: str=None, context: str=None): + q = _get_canonical(origin, quantity) + if context is not None: + context = lcia[context] + return [Characterization.from_cf(cf) for cf in lcia.factors_for_quantity(q, flowable=flowable, context=context)] + + +@qdb_router.get("/{quantity}", response_model=Entity) +@qdb_router.get("/{origin}/{quantity}", response_model=Entity) +@qdb_router.get("/load/{origin}/{quantity}", response_model=Entity) +def load_quantity(quantity: str, origin: str=None): + q = _get_canonical(origin, quantity) + ent = Entity.from_entity(q) + for p in q.properties(): + ent.properties[p] = q[p] + return ent + + +def _lcia_exch_ref(p, x): + """ + This turns a provided inventory exchange into an input argument for LCIA + at the other end, the + :param p: + :param x: + :return: + """ + if p.origin is None: + p.origin = x.origin + if x.flow.external_ref is not None: + try: + flow = cat.get_qdb_entity(x.origin, x.flow.external_ref) + except KeyError: + """ + Need to use: external_ref, quantity_ref, flowable, context, locale. + Need to think about including CAS number (or synonyms) in FlowSpec as optional params + """ + ref_q = _get_canonical(x.origin, x.flow.quantity_ref) + flow = CatalogRef.from_query(x.flow.external_ref, cat._qdb.query, 'flow', masquerade=x.origin, + name=x.flow.flowable, reference_entity=ref_q, + context=tuple(x.flow.context), locale=x.flow.locale) + cat.register_entity_ref(flow) + else: + # no ref, so nothing to anchor the flow to- we use it just for the lookup + ref_q = _get_canonical(x.origin, x.flow.quantity_ref) + flow = LcFlow.new(x.flow.flowable, ref_q, + context=tuple(x.flow.context), locale=x.flow.locale) + if x.type == 'context': + term = tuple(x.context) + else: + term = x.termination + return ExchangeRef(p, flow, x.direction, value=x.value, termination=term) + + +@qdb_router.post('/{quantity_id}/do_lcia', response_model=List[DetailedLciaResult]) +def post_lcia_exchanges(quantity_id: str, exchanges: List[UnallocatedExchange], locale: str=None): + """ + + no param origin: for now, let's say you can only post lcia to canonical quantities (i.e. /load first) + :param quantity_id: + :param exchanges: NOTE: the UnallocatedExchange model is identical to the ExchangeRef + :param locale: used by implementation + :return: + """ + q = _get_canonical(None, quantity_id) + p = LcProcess.new('LCIA POST') + inv = [_lcia_exch_ref(p, x) for x in exchanges] + ress = do_lcia(lcia, q, inv, locale=locale) + return [DetailedLciaResult.from_lcia_result(p, res) for res in ress] + + +@qdb_router.post('/{quantity_id}/factors', response_model=List[PostFactors]) +def post_flow_specs(quantity_id: str, flow_specs: List[FlowSpec]): + """ + + :param quantity_id: + :param flow_specs: + :return: + """ + qq = _get_canonical(None, quantity_id) + lookup_results = [] + if qq.unit == MetaQuantityUnit.unitstring and qq.has_property('impactCategories'): + """ + q is actually an LCIA method- we want to give back ALL the factors + This needs DRY (in runtime.py) + """ + qs = [_get_canonical(qq.origin, k) for k in qq['impactCategories']] + else: + qs = [qq] + for fs in flow_specs: + res = PostFactors(flow_id=fs.external_ref, context=fs.context, factors=[]) + for q in qs: + """ + This smacks of DRY too + """ + try: + qr = q.quantity_relation(fs.flowable, fs.quantity_ref, tuple(fs.context), fs.locale) + except (ConversionReferenceMismatch, NoFactorsFound): + continue + res.add_qr_result(qr) + lookup_results.append(res) + return lookup_results diff --git a/api/runtime.py b/api/runtime.py new file mode 100644 index 0000000..c741a5a --- /dev/null +++ b/api/runtime.py @@ -0,0 +1,60 @@ +from etl import run_static_catalog, CAT_ROOT +from fastapi import HTTPException +from antelope_core.models import Entity, FlowEntity +from antelope_core.auth import AuthorizationGrant +from antelope_core.entities import MetaQuantityUnit + +from antelope_manager.authorization import MASTER_ISSUER, open_public_key + + +_ETYPES = ('processes', 'flows', 'quantities', 'lcia_methods', 'contexts') # this should probably be an import + + +UNRESTRICTED_GRANTS = [ + AuthorizationGrant(user='public', origin='lcacommons', access='index', values=True, update=False), + AuthorizationGrant(user='public', origin='lcacommons', access='exchange', values=True, update=False), + AuthorizationGrant(user='public', origin='lcacommons', access='background', values=True, update=False), + AuthorizationGrant(user='public', origin='qdb', access='quantity', values=True, update=False), + AuthorizationGrant(user='public', origin='openlca', access='index', values=True, update=False), + AuthorizationGrant(user='public', origin='openlca', access='quantity', values=True, update=False), +] + + +PUBLIC_ORIGINS = list(set(k.origin for k in UNRESTRICTED_GRANTS)) + + +cat = run_static_catalog(CAT_ROOT, list(PUBLIC_ORIGINS)) + + +PUBKEYS = {MASTER_ISSUER: open_public_key()} + + +def search_entities(query, etype, count=50, **kwargs): + sargs = {k:v for k, v in filter(lambda x: x[1] is not None, kwargs.items())} + if etype not in _ETYPES: + raise HTTPException(404, "Invalid entity type %s" % etype) + try: + it = getattr(query, etype)(**sargs) + except AttributeError: + raise HTTPException(404, "Unknown entity type %s" % etype) + sargs.pop('unit', None) # special arg that gets passed to quantity method but does not work as a property + for e in it: + if not e.origin.startswith(query.origin): # return more-specific + continue + if etype == 'flows': + yield FlowEntity.from_flow(e, **sargs) + else: + yield Entity.from_entity(e, **sargs) + count -= 1 + if count <= 0: + break + + +def do_lcia(query, qq, lci, locale=None): + if qq.unit == MetaQuantityUnit.unitstring and qq.has_property('impactCategories'): + qs = [query.get_canonical(k) for k in qq['impactCategories']] + else: + qs = [qq] + + # check authorization for detailed Lcia + return [q.do_lcia(lci, locale=locale) for q in qs] diff --git a/etl/__init__.py b/etl/__init__.py index e69de29..93da055 100644 --- a/etl/__init__.py +++ b/etl/__init__.py @@ -0,0 +1,45 @@ +import os + +from antelope_core import FileAccessor +from patoolib import repack_archive + +from etl.solids.run_static_catalog import run_static_catalog + +from manager.antelope_manager.static.index_and_order import is_7z, IndexAndOrder + + +def de7zip(existing7z): + """ + A 2.57 GiB xml database (ei3.7.1 cutoff) occupies 59 MiB in 7z and 226MiB in zip. However, access times to the + 7z are very slow. + + This function uses patoolib.repack_archive to unpack an existing file from 7z and repack in standard zip. + + Requires ~10min for the above db (thinkpad i5-5300U @ 2.30GHz, prolly much faster on the cloud + + :param existing7z: + :return: + """ + if is_7z(existing7z): + basename, ext = os.path.splitext(existing7z) + newname = basename + '.zip' + repack_archive(existing7z, newname) + os.remove(existing7z) + + + +CONFIG_ORIGINS = TEST_ORIGINS = ('ecoinvent.3.7.1.cutoff', ) + +CAT_ROOT = os.environ.get('ANTELOPE_CATALOG_ROOT') ## help?? + + +def preprocess_resources(data_root, origins): + aws = FileAccessor(data_root) + assert set(aws.origins) == set(CONFIG_ORIGINS) + + for origin in origins: + src = next(aws.gen_sources(origin, 'exchange')) + aio = IndexAndOrder(aws, origin, src) + aio.run() + + diff --git a/etl/file_accessor.py b/etl/file_accessor.py deleted file mode 100644 index 4ddbcb3..0000000 --- a/etl/file_accessor.py +++ /dev/null @@ -1,56 +0,0 @@ -import os -import json -from antelope_core import LcResource - - -DEFAULT_PRIORITIES = { - 'exchange': 20, - 'index': 50, - 'background': 80 -} - - -class AwsFileAccessor(object): - - def __init__(self, load_path): - self._path = os.path.abspath(load_path) # this has the benefits of collapsing '..' and trimming trailing '/' - self._origins = os.listdir(self._path) - - @property - def origins(self): - for k in self._origins: - yield k - - @staticmethod - def get_config(source): - cfg = os.path.join(os.path.dirname(source), 'config.json') - if os.path.exists(cfg): - with open(cfg) as fp: - config = json.load(fp) - else: - config = dict() - return config - - def gen_sources(self, org, iface): - iface_path = os.path.join(self._path, org, iface) - if not os.path.exists(iface_path): - return - for ds_type in os.listdir(iface_path): - ds_path = os.path.join(iface_path, ds_type) - if not os.path.isdir(ds_path): - continue - for fn in os.listdir(ds_path): - if fn == 'config.json': - continue - # if we want to order sources, this is the place to do it - source = os.path.join(ds_path, fn) - yield source - - def create_resource(self, source): - if not source.startswith(self._path): - raise ValueError('Path not contained within our filespace') - rel_source = source[len(self._path)+1:] - org, iface, ds_type, fn = rel_source.split(os.path.sep) # note os.pathsep is totally different - cfg = self.get_config(source) - priority = cfg.pop('priority', DEFAULT_PRIORITIES[iface]) - return LcResource(org, source, ds_type, interfaces=iface, priority=priority, **cfg) diff --git a/etl/libs/__init__.py b/etl/libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/etl/libs/common.py b/etl/libs/common.py deleted file mode 100644 index d7b75fc..0000000 --- a/etl/libs/common.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -import json - - -class AntelopeFileCrawler(object): - - def __init__(self, cat, load_path): - self._cat = cat - self._path = load_path - self._origins = os.listdir(load_path) - - @property - def origins(self): - for k in self._origins: - yield k - - @staticmethod - def _get_config(iface_path): - cfg = os.path.join(iface_path, 'config.json') - if os.path.exists(cfg): - with open(cfg) as fp: - config = json.load(fp) - else: - config = dict() - return config - - def _new_resources(self, org, iface): - iface_path = os.path.join(self._path, org, iface) - if not os.path.exists(iface_path): - return - config = self._get_config(iface_path) - priority = config.pop('priority', 50) - for ds_type in os.listdir(iface_path): - if ds_type == 'config.json': - continue - ds_path = os.path.join(iface_path, ds_type) - for i, fn in enumerate(os.listdir(ds_path)): - source = os.path.join(ds_path, fn) - ds_pri = priority + i - res = self._cat.new_resource(org, source, ds_type, interfaces=iface, priority=ds_pri, **config) - res.check(self._cat) - - def _load_origin(self, org): - """ - First we add exchange resources- the - :param org: - :return: - """ - for iface in ('exchange', 'index', 'background'): - self._new_resources(org, iface) - - def load_data(self): - for org in self.origins: - self._load_origin(org) - self._cat.query(org).check_bg() diff --git a/etl/libs/meter_reader.py b/etl/libs/meter_reader.py new file mode 100644 index 0000000..f35d7d3 --- /dev/null +++ b/etl/libs/meter_reader.py @@ -0,0 +1,61 @@ +""" +The course of action here is as follows: +Everything is already reduced to _get_authorized_query + +so here all we need to do is look at the authorization header, which should be "bearer cjXny.." where the token is +a JWT. Now- we had been assuming to use the origin specified in the JWT to determine which public key to use, but +that is not going to work, as we don't get the- well maybe we do. sure. we can look at the payload without validating +the signature, determine the query origin, and then check our public key list for that origin. + +Then we try to verify the token and if the signature matches, we parse the grants field for origins and interfaces. +We create a stack of auth grant pydantic objects, and then just query them for permission + +so.. we need a table of query counters. +and that is nontrivial, because the query isn't known at the time of auth- I mean, it is, but not when the token is +being validated. + +The grant needs to be: userid, signer, +""" +from antelope import ValuesAccessRequired, UpdateAccessRequired +from antelope_core.auth import AuthorizationGrant + +from .models import QueryCounter, BillingCounter, UsageReport + + +class MeterReader(object): + """ + Keep a set of QueryCounters and allow them to increment + """ + def __init__(self): + self._counter = dict() # key = user,origin,interface; value = QueryCounter + self._billing = dict() # "", value = BillingCounter + + def _access(self, grant: AuthorizationGrant): + key = (grant.user, grant.origin, grant.access) + if key not in self._counter: + self._counter[key] = QueryCounter(user=grant.user, origin=grant.origin, interface=grant.access) + self._billing[key] = BillingCounter(user=grant.user, origin=grant.origin, interface=grant.access) + return self._counter[key] + + def access(self, grant: AuthorizationGrant): + count = self._access(grant).query_access() + if count % 1000 == 0: + print('grant %s passed %d queries' % (grant.display, count)) + + def values(self, grant): + if grant.values is False: + raise ValuesAccessRequired + self._access(grant).query_values() + + def update(self, grant): + if grant.update is False: + raise UpdateAccessRequired + self._access(grant).query_update() + + def invoice_user(self, user): + for k, counter in self._counter.items(): + if k.user == user: + billing = self._billing[k] + u = UsageReport.from_counters(counter, billing) + billing.apply(counter) + yield u diff --git a/etl/libs/models.py b/etl/libs/models.py new file mode 100644 index 0000000..ed72935 --- /dev/null +++ b/etl/libs/models.py @@ -0,0 +1,50 @@ +from pydantic import BaseModel + + +class QueryCounter(BaseModel): + user: str + origin: str + interface: str + access: int = 0 + values: int = 0 + update: int = 0 + + def query_access(self): + self.access += 1 + return self.access + + def query_values(self): + self.values += 1 + return self.query_access() + + def query_update(self): + self.update += 1 + return self.query_access() + + def usage(self): + return self.access, self.values, self.update + + +class UsageReport(QueryCounter): + @classmethod + def from_counters(cls, counter, billing): + usage = tuple(k - l for k, l in zip(counter.usage(), billing.billed())) + return cls(user=counter.user, origin=counter.origin, interface=counter.interface, + access=usage[0], values=usage[1], update=usage[2]) + + +class BillingCounter(BaseModel): + access: int = 0 + values: int = 0 + update: int = 0 + + def apply_query(self, qc): + self.access = qc.access + self.values = qc.values + self.update = qc.update + + def billed(self): + return self.access, self.values, self.update + + + # keep a stack of these and increment them- UPON QUERY, not upon token validation diff --git a/etl/libs/resource_loader.py b/etl/libs/resource_loader.py new file mode 100644 index 0000000..533b8cc --- /dev/null +++ b/etl/libs/resource_loader.py @@ -0,0 +1,37 @@ +from antelope import BackgroundRequired +from antelope_core import FileAccessor + + +class ResourceLoader(FileAccessor): + """ + This class crawls the synced AWS directory and adds all the specified resources to the catalog provided as + an input argument. + + After loading all pre-initialized resources, the class runs check_bg on each origin to auto-generate a + background ordering within the catalog's filespace if one does not already exist. + """ + + def _load_origin(self, cat, org): + """ + First we add exchange resources- the + :param org: + :return: + """ + for iface in ('exchange', 'index', 'background', 'quantity'): + for i, source in enumerate(self.gen_sources(org, iface)): + res = self.create_resource(source) + cat.add_resource(res) + res.check(cat) + try: + return cat.query(org).check_bg() + except BackgroundRequired: + return False + + def load_resources(self, cat, origin=None): + if origin is None: + status = [] + for org in self.origins: + status.append(self._load_origin(cat, org)) + else: + status = self._load_origin(cat, origin) + return status diff --git a/etl/libs/xdb_catalog.py b/etl/libs/xdb_catalog.py new file mode 100644 index 0000000..64a51d2 --- /dev/null +++ b/etl/libs/xdb_catalog.py @@ -0,0 +1,14 @@ +""" +An LcCatalog subclass that yields XdbQueries +""" + +from antelope_core import StaticCatalog +from .xdb_query import XdbQuery +from .meter_reader import MeterReader + +class XdbCatalog(StaticCatalog): + def __init__(self, *args, **kwargs): + super(XdbCatalog, self).__init__(*args, **kwargs) + self.meter = MeterReader() + + _query_type = XdbQuery diff --git a/etl/libs/xdb_query.py b/etl/libs/xdb_query.py new file mode 100644 index 0000000..fe2bb62 --- /dev/null +++ b/etl/libs/xdb_query.py @@ -0,0 +1,41 @@ +""" +A CatalogQuery subclass that enforces access limitations +""" + +from antelope_core.catalog_query import CatalogQuery +from antelope.interfaces.iexchange import EXCHANGE_VALUES_REQUIRED +from antelope.interfaces.ibackground import BACKGROUND_VALUES_REQUIRED + + +_VALUES_REQUIRED = EXCHANGE_VALUES_REQUIRED.union(BACKGROUND_VALUES_REQUIRED) + + +_AUTH_NOT_REQUIRED = {'is_lcia_engine'} + + +class InterfaceNotAuthorized(Exception): + pass + + +class XdbQuery(CatalogQuery): + def __init__(self, origin, catalog=None, grants=(), **kwargs): + super(XdbQuery, self).__init__(origin, catalog=catalog, **kwargs) + self._grants = {g.access: g for g in grants if origin.startswith(g.origin)} # we only store one grant per iface. so don't give us more. + + def authorized_interfaces(self): + return set(self._grants.keys()) + + def _perform_query(self, itype, attrname, exc, *args, strict=False, **kwargs): + if attrname not in _AUTH_NOT_REQUIRED: + try: + grant = self._grants[itype] + except KeyError: + raise InterfaceNotAuthorized(self.origin, itype) + + if attrname in _VALUES_REQUIRED: + self._catalog.meter.values(grant) + else: + self._catalog.meter.access(grant) + + return super(XdbQuery, self)._perform_query(itype, attrname, exc, *args, strict=strict, **kwargs) + diff --git a/etl/resources.md b/etl/resources.md index f5c8498..dafebc8 100644 --- a/etl/resources.md +++ b/etl/resources.md @@ -40,7 +40,7 @@ The current plan is to store data resources on S3, and then sync those files ove initialized. The data structure on S3 follows the REST API to one level down: s3://antelope-data/qdb -- qdb-specific content - s3://antelope-data/[origin]/[interface]/config.json -- configuration info + s3://antelope-data/[origin]/[interface]/[provider type]/config.json -- configuration info s3://antelope-data/[origin]/[interface]/[provider type]/[source] -- the data source The container configuration will provide a list of `origins`; for each origin the container initializer should diff --git a/etl/solids/build_static_catalog.py b/etl/solids/build_static_catalog.py new file mode 100644 index 0000000..0dc1d7d --- /dev/null +++ b/etl/solids/build_static_catalog.py @@ -0,0 +1,13 @@ +from antelope_core.catalog import LcCatalog +from etl.libs.resource_loader import ResourceLoader +from dagster import solid, InputDefinition, OutputDefinition, String, List + + +@solid(input_defs=[InputDefinition(name="data_root", dagster_type=String), InputDefinition(name="cat_root",dagster_type=String)], + output_defs=[OutputDefinition(name="statuses", dagster_type=List)]) + + +def construct_container_catalog(context, data_root, cat_root): + cat = LcCatalog(cat_root) + ars = ResourceLoader(data_root) + return ars.load_resources(cat) diff --git a/etl/solids/run_static_catalog.py b/etl/solids/run_static_catalog.py new file mode 100644 index 0000000..0aa0abe --- /dev/null +++ b/etl/solids/run_static_catalog.py @@ -0,0 +1,22 @@ +from antelope_core import add_antelope_providers +import sys +sys.path.append('/data/GitHub/Antelope/olca-data') +import antelope_olca +from ..libs.xdb_catalog import XdbCatalog +#from dagster import solid, InputDefinition, OutputDefinition, String, List + + +add_antelope_providers(antelope_olca) + + +#@solid(input_defs=[InputDefinition(name="cat_root",dagster_type=String), InputDefinition(name="config_origins", dagster_type=List[String])], +# output_defs=[OutputDefinition(name="s_cat", dagster_type=XdbCatalog)]) + + +def run_static_catalog(cat_root, config_origins): + s_cat = XdbCatalog(cat_root, strict_clookup=False) + #for origin in config_origins: # unclear what this should accomplish + # for iface in('exchange', 'index', 'background'): + # assert ':'.join([origin, iface]) in s_cat.interfaces + + return s_cat # use this to answer all HTTP queries diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..79dccd8 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +pydantic>=1.8.2 +fastapi>=0.65.2 +git+git://github.com/AntelopeLCA/core@virtualize#egg=antelope_core +git+git://github.com/AntelopeLCA/antelope@virtualize#egg=antelope_interface +git+git://github.com/AntelopeLCA/background@master#egg=antelope_background