Skip to content
This repository has been archived by the owner on Sep 26, 2023. It is now read-only.

Ingest subset of HLS data for EJ story #146

Closed
4 of 7 tasks
anayeaye opened this issue Jun 13, 2022 · 5 comments
Closed
4 of 7 tasks

Ingest subset of HLS data for EJ story #146

anayeaye opened this issue Jun 13, 2022 · 5 comments
Assignees
Labels

Comments

@anayeaye
Copy link
Contributor

anayeaye commented Jun 13, 2022

Epic

#89

Description

Ingest STAC metadata for a small subset of HLS data that have been uploaded directly to IMPACT-owned bucket(s) into new hls(l|s)30-ej collections to unblock dashboard environmental justice story.

Strategy/notes

New collections

  • Copy existing HLS(L|S)30 collections and update id and description to differentiate from the LP DAAC hosted collection (-ej)
  • These new collections should not include dashboard:* properties.

New Items
Walk the buckets in UAH

  • Parse the STAC item id from the filename and fetch item metadata
  • Copy the current delta-backend-staging metadata record with that id
  • Update the item id and collection id with (-ej) and update the href for all assets to match the UAH bucket (this is necessary because item id needs to unique for the entire pgstac catalog)
  • Load the new metadata record with pypgstac

UAH account data location

  • HLSL30 s3://covid-eo-data/hlsl30-ej
  • HLSS30 s3://covid-eo-data/hlss30-ej

Rationale

This temporary work around has two purposes:

  1. This will allow the dynamic tiler to access these data without an earthdata login (currently the backend api only supports one set of session credentials--when these are overridden with earthdata login credentials, UAH and MCP protected buckets are no longer accessible (see delta-backend issue 72 for the latest work to solve this issue).
  2. Subsetting HLS data to very small temporal and spatial ranges make the data more compatible with dashboard configuration which does not yet support granular non-periodic data like HLS that need to be selected over variable location-specific date ranges. The subsetted data will only cover a limited spatial range for specific dates.

Acceptance Criteria:

  • New environmental justice collections created with subsetted items
  • Stretch goal: geotiffs have been reprocessed with the latest code to add crs headers
  • Stretch goal: records are transferred to MCP (see Data transfer to MCP #145)

Checklist:

  • Epic Link
  • Detailed description
  • Concept diagrams
  • Assignee
@anayeaye
Copy link
Contributor Author

@freitagb @xhagrg @slesaad I think this work is probably custom enough that it won't lend itself well to the existing pipelines so I am happy to start the work of manually copying and updating the metadata with some one-off code (unless you see a clearer path--please let me know if you do!). After we have the metadata ingested, do you think the COGs and metadata can be transferred to MCP using exiting pipelines?

@xhagrg
Copy link
Contributor

xhagrg commented Jun 14, 2022

If there are custom filename changes etc we will need to prepare the files accordingly. However, if we can define the collection detail and the COGs are in one of the UAH buckets, we should be able to transfer the data, create the stac records, and create the collection to hold it using the current publication process.

@xhagrg
Copy link
Contributor

xhagrg commented Jun 14, 2022

Parse the STAC item id from the filename and fetch item metadata
Copy the current delta-backend-staging metadata record with that id

Do we need to do this? Can we not rely on the stac metadata generation process?

@anayeaye anayeaye self-assigned this Jun 15, 2022
@anayeaye
Copy link
Contributor Author

HLS EJ Subset Update:

@anayeaye
Copy link
Contributor Author

The subset of HLS data uploaded to the UAH covid-eo-data bucket is now loaded in delta backend staging stack. Both Landsat (L30) and Sentinel (S30) collections have 2021 data in Louisiana and 2017 data in Puerto Rico (see browser links to zoom in on locations). Note that the band combinations are different for the L30 and S30 collections. The load for these tiles is a little slow but this will hopefully be fixed up with the addition of a crs header in the files soon.

HLS S30

HLS L30

Ingest code
I don't know that this one-off script fits into the workflows for cloud-optimized-pipelines but I think there are some useful patterns so I am sharing it here. Once the collection is loaded this script generally:

  1. Finds metadata jsons in key path
  2. Updates links to self=derived_from and maintains cite-as
  3. Updates asset hrefs and confirms that the updated keys exist
  4. Validates item with pystac after updates and then uploads new metadata to s3
  5. Bulk inserts items in pgstac database
# python3.8
import os
import json

import boto3
import base64
from botocore.exceptions import ClientError


from pystac.validation import validate_dict
from pypgstac import pypgstac

def get_secret(secret_name:str, profile_name:str=None) -> None:
    """Retrieve secrets from AWS Secrets Manager

    Args:
        secret_name (str): name of aws secrets manager secret containing database connection secrets
        profile_name (str, optional): optional name of aws profile for use in debugger only

    Returns:
        secrets (dict): decrypted secrets in dict
    """

    # Create a Secrets Manager client
    if profile_name:
        session = boto3.session.Session(profile_name=profile_name)
    else:
        session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager'
    )

    # In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
    # See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
    # We rethrow the exception by default.

    try:
        get_secret_value_response = client.get_secret_value(
            SecretId=secret_name
        )
    except ClientError as e:
            raise e
        
    else:
        # Decrypts secret using the associated KMS key.
        # Depending on whether the secret is a string or binary, one of these fields will be populated.
        if 'SecretString' in get_secret_value_response:
            return json.loads(get_secret_value_response['SecretString'])
        else:
            return json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
            
def get_dsn_string(secret:dict) -> str:
    """Form database connection string from a dictionary of connection secrets

    Args:
        secret (dict): dictionary containing connection secrets including username, database name, host, and password

    Returns:
        dsn (str): full database data source name
    """
    try:
        return f"postgres://{secret['username']}:{secret['password']}@{secret['host']}:{secret['port']}/{secret['dbname']}"
    except Exception as e:
        raise e

# AWS
profile_name = "local-aws-profile-name"
secret_name = "the-secret-name"
# to walk bucket and manage metadata
s3 = boto3.resource("s3") 
# to verify keys exist
session = boto3.Session(profile_name=profile_name)
s3_client = session.client('s3')
bucket_name = "covid-eo-data"
BUCKET = boto3.Session().resource("s3").Bucket(bucket_name)

# Collection specific config L30
collection_json_file = "docs-local/hls-l30-002-ej.json"
coll_prefix = "hlsl30-ej"

# Collection specific config S30
# collection_json_file = "docs-local/hls-s30-002-ej.json"
# coll_prefix = "hlss30-ej"

tmp_collection_file = collection_json_file.replace(".json", "-nd.json")
tmp_items_file = collection_json_file.replace(".json", "-items-nd.json")

# 2017 MGRS ID = 19QHA, 2021 MGRS ID = 15RYP
s3_prefixes = [f"{coll_prefix}/2017/19QHA", f"{coll_prefix}/2021/15RYP"]

dry_run = True

if __name__ == "__main__":

    with open(collection_json_file) as fl:
        collection = json.loads(fl.read())

    # Load connection info
    con_secrets = get_secret(secret_name, profile_name=profile_name)
    dsn = get_dsn_string(
      con_secrets
    )

    # Load collection into pgstac
    with open(tmp_collection_file, "w") as f:
      f.write(f"{json.dumps(collection)}\n")
      print(f"Collection written to {tmp_collection_file}")

    if not dry_run:
      # Load collection into pgstac and remove temporary file 
      pypgstac.load(
        table="collections",
        file=tmp_collection_file,
        dsn=dsn,
        method="insert_ignore", # use insert_ignore to avoid overwritting existing collection, upsert to replace
      )
      os.remove(tmp_collection_file)


    # Now find items in S3 and update metadata references
    updated_stac_items = []
    for prefix in s3_prefixes:

        # Find all stac metadata
        stac_objs = [i for i in BUCKET.objects.filter(Prefix=prefix) if '_stac.json' in i.key]
        for stac_obj in stac_objs:

            updated_metadata_key = stac_obj.key.replace(".json", "-ej.json")
            print(f"Creating and verifying {updated_metadata_key=}")

            obj = s3.Object(stac_obj.bucket_name, stac_obj.key)
            data = obj.get()['Body'].read().decode('utf-8')
            item = json.loads(data)

            # Validate source item
            try: 
                validate_dict(item)
            except Exception as e:
                print(f"WARNING source item is invalid {item['id']} exception={e}")
                raise
            
            # Get s3 path for item
            stac_metadata_filename = os.path.basename(obj.key)
            stac_item_prefix = f"{obj.key.replace(stac_metadata_filename, '')}"

            # Update item id
            updated_id = f"{item['id']}-ej"
            item["id"] = updated_id

            # Update collection references
            item["collection"] = collection["id"]

            # Update asset hrefs
            assets = item.get("assets")
            updated_assets = {}
            for asset_key in assets.keys():
                asset = assets[asset_key]
                asset_filename = os.path.basename(asset["href"])
                updated_prefix = os.path.join(stac_item_prefix, asset_filename) 
                
                if "Contents" in s3_client.list_objects_v2(Bucket=bucket_name, Prefix=updated_prefix):
                    updated_href = f"s3://{bucket_name}/{updated_prefix}"
                    asset["href"] = updated_href
                    updated_assets[asset_key] = asset
                else:
                    print(f"WARNING {updated_prefix=} does not exist")
                    raise

            # Update item links
            derived_from_link = next((link for link in item.get("links") if link["rel"]=="self"))
            derived_from_link["rel"] = "derived_from"
            cite_as_link = next((link for link in item.get("links") if link["rel"]=="cite-as"))
            item["links"] = [derived_from_link, cite_as_link]

            # Validate item
            try: 
                validate_dict(item)
            except Exception as e:
                print(f"WARNING updated item is invalid {item['id']} exception={e}")
                raise
        
            # Append updated metadata to array for bulk pgstac update
            updated_stac_items.append(item)

            if not dry_run:
                # Upload updated metadata to s3
                print(f"Uploading {updated_metadata_key}")
                s3_client.put_object(
                    Body=json.dumps(item),
                    Bucket=bucket_name,
                    Key=updated_metadata_key
                )

            del item

    # Write items to tmp ndjson file 
    with open(tmp_items_file, "w") as f: 
        f.write("\n".join([json.dumps(x) for x in sorted(updated_stac_items, key=lambda x: x["properties"]["datetime"])]))

        if not dry_run:
            # Load items into pgstac and then delete temp file
            pypgstac.load(
                table="items",
                file=tmp_items_file,
                dsn=dsn,
                method="insert_ignore", # use insert_ignore to avoid overwritting existing items or upsert to replace
            )
            os.remove(tmp_items_file)
    
    print("fin.")

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants