diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/COPYING b/COPYING new file mode 100644 index 0000000..05d3d4a --- /dev/null +++ b/COPYING @@ -0,0 +1,14 @@ +Copyright 2018, California Institute of Technology. + ALL RIGHTS RESERVED. + U.S. Government Sponsorship acknowledged. + +Any commercial use must be negotiated with the Office of Technology +Transfer t the California Institute of Technology. + +This software may be subject to U.S. export control laws and +regulations. By accepting this document, the user agrees to comply +with all applicable U.S. export laws and regulations. + +User has the responsibility to obtain export licenses, or other export +authority as may be required before exporting such information to +foreign countries or providing access to foreign persons. diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..ec8c6bd --- /dev/null +++ b/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2018 California Institute of Technology. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README.md b/README.md new file mode 100644 index 0000000..ab4ffca --- /dev/null +++ b/README.md @@ -0,0 +1,217 @@ +# s1_qc_ingest +Sentinel1 Quality Control File Ingest and Crawler + +## Release +- current release: release-20170613 + +## Requirements +- HySDS +- Osaka + +## crawl_orbits.py +- crawl ESA QC web service for precise (S1-AUX_POEORB) and restituted (S1-AUX_RESORB) orbits +- compare catalog of orbit files with those ingested into dataset ES (elasticsearch) +- submit jobs for ingest for orbit files not ingested into dataset ES +- Usage: +``` +usage: crawl_orbits.py [-h] [--dataset_version DATASET_VERSION] [--tag TAG] + ds_es_url +crawl_orbits.py: error: too few arguments +``` +- Example: +``` +$ ./crawl_orbits.py http://100.64.134.71:9200 --tag dev +``` + +## crawl_cals.py +- crawl ESA QC web service for active calibration files (S1-AUX_CAL) +- compare catalog of calibration files with those ingested into dataset ES (elasticsearch) +- create HySDS dataset for calibration files not ingested into dataset ES +- create singleton HySDS dataset for list of active calibration files (S1-AUX_CAL_ACTIVE) +- Usage: +``` +usage: crawl_cals.py [-h] [--dataset_version DATASET_VERSION] [--tag TAG] + ds_es_url + +Crawl calibration files, create and ingest calibration datasets. + +positional arguments: + ds_es_url ElasticSearch URL for datasets, e.g. http://aria- + products.jpl.nasa.gov:9200 + +optional arguments: + -h, --help show this help message and exit + --dataset_version DATASET_VERSION + dataset version + --tag TAG PGE docker image tag (release, version, or branch) to + propagate +``` +- Example: +``` +$ ./crawl_cals.py http://100.64.134.71:9200 --tag dev +``` + +## create_orbit_ds.py +- create a HySDS dataset from a Sentinel1 precise or restituted orbit +- Usage: +``` +usage: create_orbit_ds.py [-h] [--dataset_version DATASET_VERSION] + orbit_file ds_es_url + +Create a HySDS dataset from a Sentinel1 precise or restituted orbit. + +positional arguments: + orbit_file Sentinel1 precise/restituted orbit file + ds_es_url ElasticSearch URL for datasets, e.g. http://aria- + products.jpl.nasa.gov:9200 + +optional arguments: + -h, --help show this help message and exit + --dataset_version DATASET_VERSION + dataset version +``` +- Example: +``` +$ wget --no-check-certificate https://qc.sentinel1.eo.esa.int/aux_poeorb/S1B_OPER_AUX_POEORB_OPOD_2017061 +4T111434_V20170524T225942_20170526T005942.EOF +--2017-06-14 16:59:08-- https://qc.sentinel1.eo.esa.int/aux_poeorb/S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942.EOF +Resolving qc.sentinel1.eo.esa.int (qc.sentinel1.eo.esa.int)... 131.176.235.71 +Connecting to qc.sentinel1.eo.esa.int (qc.sentinel1.eo.esa.int)|131.176.235.71|:443... connected. +WARNING: cannot verify qc.sentinel1.eo.esa.int's certificate, issued by '/C=GB/ST=Greater Manchester/L=Salford/O=COMODO CA Limited/CN=COMODO RSA Organizat +ion Validation Secure Server CA': + Unable to locally verify the issuer's authority. +HTTP request sent, awaiting response... 200 OK +Length: 4410158 (4.2M) [application/octet-stream] +Saving to: 'S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942.EOF' + +100%[================================================================================================================>] 4,410,158 998KB/s in 4.3s + +2017-06-14 16:59:14 (998 KB/s) - 'S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942.EOF' saved [4410158/4410158] + +$ ./create_orbit_ds.py S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942.EOF http://100.64.134.71:9200 --dataset_version v1.1 +[2017-06-14 16:59:45,332: INFO create_orbit_ds.py:create_orbit_ds] create date: 2017-06-14 11:14:34 +[2017-06-14 16:59:45,332: INFO create_orbit_ds.py:create_orbit_ds] validity start date: 2017-05-24 22:59:42 +[2017-06-14 16:59:45,332: INFO create_orbit_ds.py:create_orbit_ds] validity end date: 2017-05-26 00:59:42 +[2017-06-14 16:59:45,332: INFO create_orbit_ds.py:create_orbit_ds] sat: S1B +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] sensor: SAR-C Sentinel1 +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] platform: Sentinel-1B +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] typ: orbit +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] orbit_type: POEORB +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] dataset: S1-AUX_POEORB +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] met: { + "archive_filename": "S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942.EOF", + "creationTime": "2017-06-14T11:14:34", + "data_product_name": "S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942-v1.1", + "dataset": "S1-AUX_POEORB", + "platform": "Sentinel-1B", + "sensingStart": "2017-05-24T22:59:42", + "sensingStop": "2017-05-26T00:59:42", + "sensor": "SAR-C Sentinel1" +} +[2017-06-14 16:59:45,333: INFO create_orbit_ds.py:create_orbit_ds] dataset: { + "endtime": "2017-05-26T00:59:42", + "label": "S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942-v1.1", + "starttime": "2017-05-24T22:59:42", + "version": "v1.1" +} +[2017-06-14 16:59:45,341: INFO create_orbit_ds.py:create_orbit_ds] total, found_id: 1 S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942-v1.1 +[2017-06-14 16:59:45,341: INFO create_orbit_ds.py:create_orbit_ds] Found S1B_OPER_AUX_POEORB_OPOD_20170614T111434_V20170524T225942_20170526T005942-v1.1 in http://100.64.134.71:9200. Dedupping dataset. +``` + +## create_cal_ds.py +- create a HySDS dataset from a Sentinel1 calibration tar file +- Usage: +``` +usage: create_cal_ds.py [-h] [--dataset_version DATASET_VERSION] + cal_tar_file ds_es_url + +Create a HySDS dataset from a Sentinel1 calibration tar file. + +positional arguments: + cal_tar_file Sentinel1 calibration tar file + ds_es_url ElasticSearch URL for datasets, e.g. http://aria- + products.jpl.nasa.gov:9200 + +optional arguments: + -h, --help show this help message and exit + --dataset_version DATASET_VERSION + dataset version +``` +- Example: +``` +$ wget --no-check-certificate https://qc.sentinel1.eo.esa.int/aux_cal/S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE +--2017-06-14 17:06:12-- https://qc.sentinel1.eo.esa.int/aux_cal/S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE +Resolving qc.sentinel1.eo.esa.int (qc.sentinel1.eo.esa.int)... 131.176.235.71 +Connecting to qc.sentinel1.eo.esa.int (qc.sentinel1.eo.esa.int)|131.176.235.71|:443... connected. +WARNING: cannot verify qc.sentinel1.eo.esa.int's certificate, issued by '/C=GB/ST=Greater Manchester/L=Salford/O=COMODO CA Limited/CN=COMODO RSA Organization Validation Secure Server CA': + Unable to locally verify the issuer's authority. +HTTP request sent, awaiting response... 200 OK +Length: 493188 (482K) [application/x-gzip] +Saving to: 'S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE' + +100%[================================================================================================================>] 493,188 251KB/s in 1.9s + +2017-06-14 17:06:15 (251 KB/s) - 'S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE' saved [493188/493188] + +$ ./create_cal_ds.py S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE http://100.64.134.71:9200 --dataset_version v1.1 +[2017-06-14 17:07:16,267: INFO/create_cal_ds] create date: 2017-05-22 13:20:42 +[2017-06-14 17:07:16,267: INFO/create_cal_ds] validity start date: 2016-06-27 00:00:00 +[2017-06-14 17:07:16,267: INFO/create_cal_ds] sat: S1A +[2017-06-14 17:07:16,268: INFO/create_cal_ds] sensor: SAR-C Sentinel1 +[2017-06-14 17:07:16,268: INFO/create_cal_ds] platform: Sentinel-1A +[2017-06-14 17:07:16,268: INFO/create_cal_ds] typ: auxiliary +[2017-06-14 17:07:16,268: INFO/create_cal_ds] aux_type: CAL +[2017-06-14 17:07:16,268: INFO/create_cal_ds] dataset: S1-AUX_CAL +[2017-06-14 17:07:16,268: INFO/create_cal_ds] met: { + "archive_filename": "S1A_AUX_CAL_V20160627T000000_G20170522T132042.SAFE", + "creationTime": "2017-05-22T13:20:42", + "data_product_name": "S1A_AUX_CAL_V20160627T000000_G20170522T132042-v1.1", + "dataset": "S1-AUX_CAL", + "platform": "Sentinel-1A", + "sensingStart": "2016-06-27T00:00:00", + "sensor": "SAR-C Sentinel1" +} +[2017-06-14 17:07:16,268: INFO/create_cal_ds] dataset: { + "label": "S1A_AUX_CAL_V20160627T000000_G20170522T132042-v1.1", + "starttime": "2016-06-27T00:00:00", + "version": "v1.1" +} +[2017-06-14 17:07:16,275: INFO/create_cal_ds] total, found_id: 1 S1A_AUX_CAL_V20160627T000000_G20170522T132042-v1.1 +[2017-06-14 17:07:16,275: INFO/create_cal_ds] Found S1A_AUX_CAL_V20160627T000000_G20170522T132042-v1.1 in http://100.64.134.71:9200. Dedupping dataset. +``` + +## cron_crawler.py +- cron script to submit Sentinel-1 crawler job +- Usage: +``` +usage: cron_crawler.py [-h] [--dataset_version DATASET_VERSION] [--tag TAG] + --type {orbit,calibration} + ds_es_url + +Cron script to submit Sentinel-1 crawler job. + +positional arguments: + ds_es_url ElasticSearch URL for datasets, e.g. http://aria- + products.jpl.nasa.gov:9200 + +optional arguments: + -h, --help show this help message and exit + --dataset_version DATASET_VERSION + dataset version + --tag TAG PGE docker image tag (release, version, or branch) to + propagate + --type {orbit,calibration} + Sentinel-1 QC file type to crawl +``` +- Example cron: +``` +# crawl for orbits +0,30 * * * * $HOME/verdi/bin/python $HOME/verdi/ops/s1_qc_ingest/cron_crawler.py \ + --type orbit --dataset_version v1.1 --tag release-20170613 + http://100.64.134.71:9200 > $HOME/verdi/log/s1_orbit_cron_crawler.log 2>&1 + +# crawl for active calibrations +15,45 * * * * $HOME/verdi/bin/python $HOME/verdi/ops/s1_qc_ingest/cron_crawler.py \ + --type calibration --dataset_version v1.1 --tag release-20170613 + http://100.64.134.71:9200 > $HOME/verdi/log/s1_calibration_cron_crawler.log 2>&1 +``` diff --git a/archived_history.txt b/archived_history.txt new file mode 100644 index 0000000..40292ed --- /dev/null +++ b/archived_history.txt @@ -0,0 +1,262 @@ +commit e95d7d81f3c61ec9a042d65c53acc5cd8dbb7e0d +Author: gmanipon +Date: Mon Sep 24 20:48:05 2018 +0000 + + clean up + +commit 8b6b8c298ad24dc2bfbe9fa7400ee317c1ddf0d1 +Author: gmanipon +Date: Wed Jun 27 22:13:10 2018 +0000 + + increase time limits + +commit 58d83ce15d3fa96350c0629750c521a385b1edd6 +Author: gmanipon +Date: Fri Jun 22 19:20:42 2018 +0000 + + differentiate soft/hard time limits and set according to expected execution times + +commit 9140967cae4eaf2b659e2da484f39f41df42b4a9 +Author: jlinick +Date: Wed Jun 20 14:23:51 2018 +0000 + + update filter for active calibrations + +commit c70435a782de97cc2f04b876e058762f36dba814 +Author: jlinick +Date: Wed Jun 20 14:08:00 2018 +0000 + + expose exceptions + +commit 01dd4d252b732891266d3f2d17c90458e11b81c0 +Author: jlinick +Date: Wed Jun 20 00:54:04 2018 +0000 + + pass renamed safe tar file + +commit 1383fac3bae0283637bdcee96f8088de0ae9b22d +Author: jlinick +Date: Wed Jun 20 00:47:26 2018 +0000 + + rename cal files + +commit 87f8df8d90ca68d5ecdab6314e2736d00795b4ad +Author: jlinick +Date: Wed Jun 20 00:25:57 2018 +0000 + + add correct suffix + +commit 4673731a3edbc0c0ce0b1fd35dd35c9c6672cbcc +Author: gmanipon +Date: Wed Jun 20 00:10:59 2018 +0000 + + update to handle changes to qc urls + +commit 3027608c609d6af00a70caae6d6402770daf9acc +Author: gmanipon +Date: Tue Jun 19 23:41:50 2018 +0000 + + handle infrastructure changes + +commit 3ede9fdb61a5b9f148e06218ea62968d78fdce21 +Author: jlinick +Date: Tue Jun 12 23:58:28 2018 +0000 + + update crawler time limit to 1000 sec + +commit 63112c4c478266cb78b8e3516dc43643f92cdbb7 +Author: Mohammed R Karim +Date: Mon Apr 30 21:48:14 2018 +0000 + + added time limit + +commit e3820b728b82761db8ff81fc489e50aa27af2af0 +Author: gmanipon +Date: Tue Nov 14 04:21:42 2017 +0000 + + link in AWS creds + +commit 169ca6460fe07fdf972bed573c78d279a411311f +Author: gmanipon +Date: Tue Nov 14 04:16:24 2017 +0000 + + pull fields + +commit ad4d2cd29af77748adc4cb5f7831f03aec0427bc +Author: gmanipon +Date: Tue Nov 14 04:06:20 2017 +0000 + + purge dataset from current dataset entry + +commit 9317b742d8e9d08a243347fd2750a0f57253e8ca +Author: gmanipon +Date: Tue Nov 14 03:09:32 2017 +0000 + + add dataset to purge since osaka will bomb on clobber + +commit 51ff844ac1b6a5ccc33fa4f5b9dc12396015318c +Author: gmanipon +Date: Wed Jun 14 17:11:17 2017 +0000 + + add README.md + +commit 03db31d3cb1bc8a8b15361cdff98d65ea1fcc487 +Author: gmanipon +Date: Tue Jun 13 18:29:29 2017 +0000 + + add ids of active calibration files to met + +commit d9c2c4d4dd245b92de671f767b37750bf2cd665e +Author: gmanipon +Date: Tue Jun 13 18:12:42 2017 +0000 + + add job-spec/hysds-io for S1 calibration file crawler/ingester + +commit 943508dd055bc25b0348eec213dec64ef00ddcc3 +Author: gmanipon +Date: Tue Jun 13 18:04:47 2017 +0000 + + remove timestamp + +commit fadba9b49927b691ea9838805fa245ef5bbc531e +Author: gmanipon +Date: Tue Jun 13 17:57:35 2017 +0000 + + update to create S1-AUX_CAL and S1-AUX_CAL_ACTIVE datasets in crawler + + Ensure active calibrations files can be queried by making it a dataset + that gets ingested last. + +commit d4ad0e70b04096c26d0d58b99dcad322ba46ff21 +Author: gmanipon +Date: Tue Jun 13 16:18:53 2017 +0000 + + add calibration dataset generation script + +commit f747513a48c625ff0ea69f1ab77de5f8a2c51d4f +Author: gmanipon +Date: Tue Jun 13 14:37:08 2017 +0000 + + add crawler for calibration urls + +commit 12d25f9606e64e9380bed3076ba48d6b2e83114e +Author: gmanipon +Date: Tue Jun 13 14:25:25 2017 +0000 + + use doc string for description + +commit 98ddd048fcd2c16be85fb2000ef92bfddfc733be +Author: gmanipon +Date: Tue Jun 13 14:22:19 2017 +0000 + + fix logger name + +commit ab19ae27963343304110824f9214f7f994f7b674 +Author: gmanipon +Date: Tue Jun 13 14:19:19 2017 +0000 + + allow specification of QC file type to crawl (orbit or calibration) + +commit b18769a16c2f2175c732bfbbff79e8e30ee5f1b5 +Author: gmanipon +Date: Tue Jun 13 14:07:45 2017 +0000 + + set archive_filename + +commit dfc9d6d43a1320e171551ec3eedbe9914458f9e5 +Author: gmanipon +Date: Tue Jun 13 07:13:49 2017 +0000 + + disable dedup for crawler job + +commit 20ff39a42a55937a8d56d754e755eeaa262be17d +Author: gmanipon +Date: Tue Jun 13 06:55:28 2017 +0000 + + fix cut-n-paste bug + +commit 5692d5131888ffe70aceaab03528cc4f39af8b80 +Author: gmanipon +Date: Tue Jun 13 06:39:52 2017 +0000 + + submit orbit ingest job to factotum-job_worker-large + +commit 07d3fcea011727ad71bb8274b7a3fe902b7673aa +Author: gmanipon +Date: Tue Jun 13 06:35:56 2017 +0000 + + set page limit on number of pages to go back + +commit fec51447e05c70ee0857e361c35cee6ddbf31a92 +Author: gmanipon +Date: Tue Jun 13 05:59:16 2017 +0000 + + fix bug in iteration stop detection + +commit 0cea3e1d6eae79acc39d8f5417f093fbde0316bc +Author: gmanipon +Date: Tue Jun 13 05:43:20 2017 +0000 + + ensure dataset id includes version; fix job names + +commit f5ffd9c40549dc8a431ace512b3b273f5e586699 +Author: gmanipon +Date: Tue Jun 13 05:32:42 2017 +0000 + + set correct path to executable + +commit b44a459c20734027b8d20a32cb3ddfd84eaed931 +Author: gmanipon +Date: Tue Jun 13 05:28:56 2017 +0000 + + place recommended-queues in job-specs + +commit 634773dd276eacdcacf15c06391394cc47172282 +Author: gmanipon +Date: Tue Jun 13 05:23:15 2017 +0000 + + set as individual submission type + +commit f00405445105fc0c600a09e4f02e8ae589790f93 +Author: gmanipon +Date: Tue Jun 13 05:09:38 2017 +0000 + + add cron script for submitting orbit crawler jobs + + Propagate tag and dataset version. + +commit a7a3f2728e9cf19797c01346f257298a871b065a +Author: gmanipon +Date: Tue Jun 13 04:37:01 2017 +0000 + + clean up logging + +commit 3a9b0d5043814ba4fb27f99453004a002b9a24fb +Author: gmanipon +Date: Tue Jun 13 04:34:26 2017 +0000 + + submit to factotum-job_worker-small queue + +commit 88455e7f48bdc692c5327e43c0eaea0c2f7b5947 +Author: gmanipon +Date: Tue Jun 13 04:31:50 2017 +0000 + + add arg to specify release tag of PGE + +commit 872728956b642daa58475b566463ea6bfa7ce13f +Author: gmanipon +Date: Tue Jun 13 04:20:46 2017 +0000 + + remove deprecated field + +commit c0d606b300bd9bfabae82c20ddf5a0642fccf615 +Author: gmanipon +Date: Tue Jun 13 04:14:02 2017 +0000 + + initial implementation of crawler and job-spec/hysds-io configs + +commit c1c03b8681801c0ef57cd38c0bbcbd749e3bb96c +Author: gmanipon +Date: Mon Jun 12 21:44:33 2017 +0000 + + initial commit diff --git a/crawl_cals.py b/crawl_cals.py new file mode 100755 index 0000000..3a0b23d --- /dev/null +++ b/crawl_cals.py @@ -0,0 +1,279 @@ +#!/usr/bin/env python +""" +Crawl calibration files, create and ingest calibration datasets. +""" + +import os, sys, re, json, logging, traceback, requests, argparse, backoff, shutil +from datetime import datetime +from requests.packages.urllib3.exceptions import (InsecureRequestWarning, + InsecurePlatformWarning) +try: from html.parser import HTMLParser +except: from HTMLParser import HTMLParser + +from osaka.main import get, rmall + +from create_cal_ds import check_cal, create_cal_ds + + +# disable warnings for SSL verification +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) +requests.packages.urllib3.disable_warnings(InsecurePlatformWarning) + + +# set logger +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, level=logging.INFO) + +class LogFilter(logging.Filter): + def filter(self, record): + if not hasattr(record, 'id'): record.id = '--' + return True + +logger = logging.getLogger('crawl_cals') +logger.setLevel(logging.INFO) +logger.addFilter(LogFilter()) + + +QC_SERVER = 'https://qc.sentinel1.eo.esa.int/' +DATA_SERVER = 'https://qc.sentinel1.eo.esa.int/' +#DATA_SERVER = 'http://aux.sentinel1.eo.esa.int/' + +CAL_RE = re.compile(r'(?PS1\w)_(?PAUX_CAL)_V(?P
\d{8}T\d{6})') + + +def cmdLineParse(): + """Command line parser.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("ds_es_url", help="ElasticSearch URL for datasets, e.g. " + + "http://aria-products.jpl.nasa.gov:9200") + parser.add_argument("--dataset_version", help="dataset version", + default="v1.1", required=False) + parser.add_argument("--tag", help="PGE docker image tag (release, version, " + + "or branch) to propagate", + default="master", required=False) + return parser.parse_args() + + +class MyHTMLParser(HTMLParser): + + def __init__(self): + HTMLParser.__init__(self) + self.fileList = [] + self.pages = 0 + self.in_td = False + self.in_a = False + self.in_ul = False + + def handle_starttag(self, tag, attrs): + if tag == 'td': + self.in_td = True + elif tag == 'a' and self.in_td: + self.in_a = True + elif tag == 'ul': + for k,v in attrs: + if k == 'class' and v.startswith('pagination'): + self.in_ul = True + elif tag == 'li' and self.in_ul: + self.pages += 1 + + def handle_data(self,data): + if self.in_td and self.in_a: + if CAL_RE.search(data): + self.fileList.append(data.strip()) + + def handle_endtag(self, tag): + if tag == 'td': + self.in_td = False + self.in_a = False + elif tag == 'a' and self.in_td: + self.in_a = False + elif tag == 'ul' and self.in_ul: + self.in_ul = False + elif tag == 'html': + if self.pages == 0: + self.pages = 1 + else: + # decrement page back and page forward list items + self.pages -= 2 + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, + max_tries=8, max_value=32) +def session_get(session, url): + return session.get(url, verify=False) + + +def crawl_cals(dataset_version): + """Crawl for calibration urls.""" + + results = {} + session = requests.Session() + oType = 'calibration' + url = QC_SERVER + 'aux_cal' + page_limit = 100 + query = url + '/?adf__active=True' + + logger.info(query) + + logger.info('Querying for {0} calibration files'.format(oType)) + r = session_get(session, query) + r.raise_for_status() + parser = MyHTMLParser() + parser.feed(r.text) + logger.info("Found {} pages".format(parser.pages)) + + for res in parser.fileList: + id = "%s-%s" % (os.path.splitext(res)[0], dataset_version) + results[id] = os.path.join(url, res) + match = CAL_RE.search(res) + if not match: + raise RuntimeError("Failed to parse cal: {}".format(res)) + results[id] = os.path.join(DATA_SERVER, "product", "/".join(match.groups()), "{}.SAFE.TGZ".format(res)) + yield id, results[id] + + # page through and get more results + page = 2 + reached_end = False + while True: + page_query = "{}?page={}".format(query, page) + logger.info(page_query) + r = session_get(session, page_query) + r.raise_for_status() + page_parser = MyHTMLParser() + page_parser.feed(r.text) + for res in page_parser.fileList: + id = "%s-%s" % (os.path.splitext(res)[0], dataset_version) + if id in results or page >= page_limit: + reached_end = True + break + else: + match = CAL_RE.search(res) + if not match: + raise RuntimeError("Failed to parse cal: {}".format(res)) + results[id] = os.path.join(DATA_SERVER, "product", "/".join(match.groups()), "{}.SAFE.TGZ".format(res)) + yield id, results[id] + if reached_end: break + else: page += 1 + + # close session + session.close() + + #logger.info(json.dumps(results, indent=2, sort_keys=True)) + logger.info(len(results)) + + +def create_active_cal_ds(active_ids, dataset_version, root_ds_dir="."): + """Create active calibration files dataset.""" + + # set id + id = "S1_AUX_CAL_ACTIVE" + + # get metadata json + now = datetime.utcnow() + met = { + "creationTime": now.isoformat('T'), + "data_product_name": id, + "sensor": "SAR-C Sentinel1", + "dataset": "S1-AUX_CAL_ACTIVE", + "active_ids": active_ids, + } + logger.info("met: %s" % json.dumps(met, indent=2, sort_keys=True)) + + # get dataset json + ds = { + "version": dataset_version, + "label": met['data_product_name'], + "starttime": met['creationTime'], + } + + # create dataset dir + #ds_id = "%s-%04d%02d%02dT%02d%02d%02d" % (id, now.year, now.month, + # now.day, now.hour, now.minute, + # now.second) + ds_id = id + root_ds_dir = os.path.abspath(root_ds_dir) + ds_dir = os.path.join(root_ds_dir, ds_id) + if not os.path.isdir(ds_dir): os.makedirs(ds_dir, 0755) + + # dump dataset and met JSON + ds_file = os.path.join(ds_dir, "%s.dataset.json" % ds_id) + met_file = os.path.join(ds_dir, "%s.met.json" % ds_id) + with open(ds_file, 'w') as f: + json.dump(ds, f, indent=2, sort_keys=True) + with open(met_file, 'w') as f: + json.dump(met, f, indent=2, sort_keys=True) + + logger.info("created dataset %s" % ds_dir) + return id, ds_dir + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, + max_tries=8, max_value=32) +def purge_active_cal_ds(es_url, dataset_version): + """Purge active cal dataset.""" + + id = "S1_AUX_CAL_ACTIVE" + query = { + "query":{ + "bool":{ + "must": [ + { "term": { "_id": id } }, + ] + } + }, + "fields": [ "urls" ], + } + es_index = "grq_%s_s1-aux_cal_active" % dataset_version + if es_url.endswith('/'): + search_url = '%s%s/_search' % (es_url, es_index) + else: + search_url = '%s/%s/_search' % (es_url, es_index) + #logger.info("search_url: %s" % search_url) + r = requests.post(search_url, data=json.dumps(query)) + if r.status_code == 200: + result = r.json() + #logger.info(pformat(result)) + total = result['hits']['total'] + if total > 0: + hit = result['hits']['hits'][0]['fields'] + for url in hit['urls']: + if url.startswith('s3://'): rmall(url) + else: + logger.error("Failed to query %s:\n%s" % (es_url, r.text)) + logger.error("query: %s" % json.dumps(query, indent=2)) + logger.error("returned: %s" % r.text) + if r.status_code != 404: r.raise_for_status() + + +def crawl(ds_es_url, dataset_version, tag): + """Crawl for calibration files and create datasets if they don't exist in ES.""" + + active_ids = [] + for id, url in crawl_cals(dataset_version): + #logger.info("%s: %s" % (id, url)) + active_ids.append(id) + total, found_id = check_cal(ds_es_url, "grq", id) + if total > 0: + logger.info("Found %s." % id) + else: + logger.info("Missing %s. Creating dataset." % id) + cal_tar_file = os.path.basename(url) + get(url, cal_tar_file) + safe_tar_file = cal_tar_file.replace('.TGZ', '') + shutil.move(cal_tar_file, safe_tar_file) + create_cal_ds(safe_tar_file, ds_es_url, dataset_version) + purge_active_cal_ds(ds_es_url, dataset_version) + create_active_cal_ds(active_ids, dataset_version) + + +if __name__ == '__main__': + inps = cmdLineParse() + try: status = crawl(inps.ds_es_url, inps.dataset_version, inps.tag) + except Exception as e: + with open('_alt_error.txt', 'w') as f: + f.write("%s\n" % str(e)) + with open('_alt_traceback.txt', 'w') as f: + f.write("%s\n" % traceback.format_exc()) + raise + sys.exit(status) diff --git a/crawl_orbits.py b/crawl_orbits.py new file mode 100755 index 0000000..bdac054 --- /dev/null +++ b/crawl_orbits.py @@ -0,0 +1,277 @@ +#!/usr/bin/env python +""" +Crawl orbits and submit orbit dataset generation jobs. +""" + +import os, sys, re, json, logging, traceback, requests, argparse, backoff +from datetime import datetime +from pprint import pformat +from requests.packages.urllib3.exceptions import (InsecureRequestWarning, + InsecurePlatformWarning) +try: from html.parser import HTMLParser +except: from HTMLParser import HTMLParser + +from hysds_commons.job_utils import submit_mozart_job +from hysds.celery import app + + +# disable warnings for SSL verification +requests.packages.urllib3.disable_warnings(InsecureRequestWarning) +requests.packages.urllib3.disable_warnings(InsecurePlatformWarning) + + +# set logger +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, level=logging.INFO) + +class LogFilter(logging.Filter): + def filter(self, record): + if not hasattr(record, 'id'): record.id = '--' + return True + +logger = logging.getLogger('crawl_orbits') +logger.setLevel(logging.INFO) +logger.addFilter(LogFilter()) + + +QC_SERVER = 'https://qc.sentinel1.eo.esa.int/' +DATA_SERVER = 'http://aux.sentinel1.eo.esa.int/' + +ORBITMAP = [('precise','aux_poeorb', 100), + ('restituted','aux_resorb', 100)] + +OPER_RE = re.compile(r'S1\w_OPER_AUX_(?P\w+)_OPOD_(?P\d{4})(?P\d{2})(?P\d{2})') + + +def cmdLineParse(): + """Command line parser.""" + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("ds_es_url", help="ElasticSearch URL for datasets, e.g. " + + "http://aria-products.jpl.nasa.gov:9200") + parser.add_argument("--dataset_version", help="dataset version", + default="v1.1", required=False) + parser.add_argument("--tag", help="PGE docker image tag (release, version, " + + "or branch) to propagate", + default="master", required=False) + return parser.parse_args() + + +class MyHTMLParser(HTMLParser): + + def __init__(self): + HTMLParser.__init__(self) + self.fileList = [] + self.pages = 0 + self.in_td = False + self.in_a = False + self.in_ul = False + + def handle_starttag(self, tag, attrs): + if tag == 'td': + self.in_td = True + elif tag == 'a' and self.in_td: + self.in_a = True + elif tag == 'ul': + for k,v in attrs: + if k == 'class' and v.startswith('pagination'): + self.in_ul = True + elif tag == 'li' and self.in_ul: + self.pages += 1 + + def handle_data(self,data): + if self.in_td and self.in_a: + if OPER_RE.search(data): + self.fileList.append(data.strip()) + + def handle_endtag(self, tag): + if tag == 'td': + self.in_td = False + self.in_a = False + elif tag == 'a' and self.in_td: + self.in_a = False + elif tag == 'ul' and self.in_ul: + self.in_ul = False + elif tag == 'html': + if self.pages == 0: + self.pages = 1 + else: + # decrement page back and page forward list items + self.pages -= 2 + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, + max_tries=8, max_value=32) +def session_get(session, url): + return session.get(url, verify=False) + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, + max_tries=8, max_value=32) +def check_orbit(es_url, es_index, id): + """Query for orbits with specified input ID.""" + + query = { + "query":{ + "bool":{ + "must": [ + { "term": { "_id": id } }, + ] + } + }, + "fields": [], + } + + if es_url.endswith('/'): + search_url = '%s%s/_search' % (es_url, es_index) + else: + search_url = '%s/%s/_search' % (es_url, es_index) + #logger.info("search_url: %s" % search_url) + r = requests.post(search_url, data=json.dumps(query)) + if r.status_code == 200: + result = r.json() + #logger.info(pformat(result)) + total = result['hits']['total'] + id = 'NONE' if total == 0 else result['hits']['hits'][0]['_id'] + else: + logger.error("Failed to query %s:\n%s" % (es_url, r.text)) + logger.error("query: %s" % json.dumps(query, indent=2)) + logger.error("returned: %s" % r.text) + if r.status_code == 404: total, id = 0, 'NONE' + else: r.raise_for_status() + return total, id + + +def crawl_orbits(dataset_version): + """Crawl for orbit urls.""" + + results = {} + session = requests.Session() + for spec in ORBITMAP: + oType = spec[0] + url = QC_SERVER + spec[1] + page_limit = spec[2] + query = url + '/' + + logger.info(query) + + logger.info('Querying for {0} orbits'.format(oType)) + r = session_get(session, query) + r.raise_for_status() + parser = MyHTMLParser() + parser.feed(r.text) + logger.info("Found {} pages".format(parser.pages)) + + for res in parser.fileList: + id = "%s-%s" % (os.path.splitext(res)[0], dataset_version) + match = OPER_RE.search(res) + if not match: + raise RuntimeError("Failed to parse orbit: {}".format(res)) + results[id] = os.path.join(DATA_SERVER, "/".join(match.groups()), "{}.EOF".format(res)) + yield id, results[id] + + # page through and get more results + page = 2 + reached_end = False + while True: + page_query = "{}?page={}".format(query, page) + logger.info(page_query) + r = session_get(session, page_query) + r.raise_for_status() + page_parser = MyHTMLParser() + page_parser.feed(r.text) + for res in page_parser.fileList: + id = "%s-%s" % (os.path.splitext(res)[0], dataset_version) + if id in results or page >= page_limit: + reached_end = True + break + else: + match = OPER_RE.search(res) + if not match: + raise RuntimeError("Failed to parse orbit: {}".format(res)) + results[id] = os.path.join(DATA_SERVER, "/".join(match.groups()), "{}.EOF".format(res)) + yield id, results[id] + if reached_end: break + else: page += 1 + + # close session + session.close() + + #logger.info(json.dumps(results, indent=2, sort_keys=True)) + logger.info(len(results)) + + +def submit_job(id, url, ds_es_url, tag, dataset_version): + """Submit job for orbit dataset generation.""" + + job_spec = "job-s1_orbit_ingest:%s" % tag + job_name = "%s-%s" % (job_spec, id) + job_name = job_name.lstrip('job-') + + #Setup input arguments here + rule = { + "rule_name": "s1_orbit_ingest", + "queue": "factotum-job_worker-large", + "priority": 0, + "kwargs":'{}' + } + params = [ + { + "name": "version_opt", + "from": "value", + "value": "--dataset_version", + }, + { + "name": "version", + "from": "value", + "value": dataset_version, + }, + { + "name": "orbit_url", + "from": "value", + "value": url, + }, + { + "name": "orbit_file", + "from": "value", + "value": os.path.basename(url), + }, + { + "name": "es_dataset_url", + "from": "value", + "value": ds_es_url, + } + ] + print("submitting orbit ingest job for %s" % id) + submit_mozart_job({}, rule, + hysdsio={"id": "internal-temporary-wiring", + "params": params, + "job-specification": job_spec}, + job_name=job_name) + + +def crawl(ds_es_url, dataset_version, tag): + """Crawl for orbits and submit job if they don't exist in ES.""" + + for id, url in crawl_orbits(dataset_version): + #logger.info("%s: %s" % (id, url)) + total, found_id = check_orbit(ds_es_url, "grq", id) + if total > 0: + logger.info("Found %s." % id) + #prods_found.append(acq_id) + else: + logger.info("Missing %s. Submitting job." % id) + #prods_missing.append(acq_id) + submit_job(id, url, ds_es_url, tag, dataset_version) + + +if __name__ == '__main__': + inps = cmdLineParse() + try: status = crawl(inps.ds_es_url, inps.dataset_version, inps.tag) + except Exception as e: + with open('_alt_error.txt', 'w') as f: + f.write("%s\n" % str(e)) + with open('_alt_traceback.txt', 'w') as f: + f.write("%s\n" % traceback.format_exc()) + raise + sys.exit(status) diff --git a/create_cal_ds.py b/create_cal_ds.py new file mode 100755 index 0000000..22a4bfe --- /dev/null +++ b/create_cal_ds.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python +""" +Create a HySDS dataset from a Sentinel1 calibration tar file. +""" + +import os, sys, time, re, json, requests, shutil, logging, traceback, argparse, backoff +from requests.packages.urllib3.exceptions import (InsecureRequestWarning, + InsecurePlatformWarning) +from datetime import datetime, timedelta +from pprint import pformat + + +# set logger +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, level=logging.INFO) + +class LogFilter(logging.Filter): + def filter(self, record): + if not hasattr(record, 'id'): record.id = '--' + return True + +logger = logging.getLogger('create_cal_ds') +logger.setLevel(logging.INFO) +logger.addFilter(LogFilter()) + + +# regexes +AUX_RE = re.compile(r'^(?PS1.+?)_AUX_(?P.*?)_V(?P\d{4})(?P\d{2})(?P\d{2})T(?P\d{2})(?P\d{2})(?P\d{2})_G(?P\d{4})(?P\d{2})(?P\d{2})T(?P\d{2})(?P\d{2})(?P\d{2})-(?P.+)$') +PLATFORM_RE = re.compile(r'S1(.+?)_') + + +@backoff.on_exception(backoff.expo, requests.exceptions.RequestException, + max_tries=8, max_value=32) +def check_cal(es_url, es_index, id): + """Query for calibration file with specified input ID.""" + + query = { + "query":{ + "bool":{ + "must": [ + { "term": { "_id": id } }, + ] + } + }, + "fields": [], + } + + if es_url.endswith('/'): + search_url = '%s%s/_search' % (es_url, es_index) + else: + search_url = '%s/%s/_search' % (es_url, es_index) + #logger.info("search_url: %s" % search_url) + r = requests.post(search_url, data=json.dumps(query)) + if r.status_code == 200: + result = r.json() + #logger.info(pformat(result)) + total = result['hits']['total'] + id = 'NONE' if total == 0 else result['hits']['hits'][0]['_id'] + else: + logger.error("Failed to query %s:\n%s" % (es_url, r.text)) + logger.error("query: %s" % json.dumps(query, indent=2)) + logger.error("returned: %s" % r.text) + if r.status_code == 404: total, id = 0, 'NONE' + else: r.raise_for_status() + return total, id + + +def get_dataset_json(met, version): + """Generated HySDS dataset JSON from met JSON.""" + + return { + "version": version, + "label": met['data_product_name'], + "starttime": met['sensingStart'], + } + + +def create_dataset(ds, met, cal_tar_file, root_ds_dir="."): + """Create dataset. Return tuple of (dataset ID, dataset dir).""" + + # create dataset dir + id = met['data_product_name'] + root_ds_dir = os.path.abspath(root_ds_dir) + ds_dir = os.path.join(root_ds_dir, id) + if not os.path.isdir(ds_dir): os.makedirs(ds_dir, 0755) + + # dump dataset and met JSON + ds_file = os.path.join(ds_dir, "%s.dataset.json" % id) + met_file = os.path.join(ds_dir, "%s.met.json" % id) + with open(ds_file, 'w') as f: + json.dump(ds, f, indent=2, sort_keys=True) + with open(met_file, 'w') as f: + json.dump(met, f, indent=2, sort_keys=True) + + # copy calibration tar file + shutil.copy(cal_tar_file, ds_dir) + + logger.info("created dataset %s" % ds_dir) + return id, ds_dir + + +def create_cal_ds(cal_tar_file, ds_es_url, version="v1.1"): + """Create calibration dataset.""" + + # extract info from calibration tar filename + cal_tar_file_base = os.path.basename(cal_tar_file) + id = "%s-%s" % (os.path.splitext(cal_tar_file_base)[0], version) + match = AUX_RE.search(id) + if not match: + raise RuntimeError("Failed to extract info from calibration tar filename %s." % id) + info = match.groupdict() + + # get dates + create_dt = datetime(*[int(info[i]) for i in ['cr_yr', 'cr_mo', 'cr_dy', 'cr_hh', 'cr_mm', 'cr_ss']]) + valid_start = datetime(*[int(info[i]) for i in ['vs_yr', 'vs_mo', 'vs_dy', 'vs_hh', 'vs_mm', 'vs_ss']]) + logger.info("create date: %s" % create_dt) + logger.info("validity start date: %s" % valid_start) + + # get sat/platform and sensor + sensor = "SAR-C Sentinel1" + sat = info['sat'] + if sat == "S1A": platform = "Sentinel-1A" + elif sat == "S1B": platform = "Sentinel-1B" + else: raise RuntimeError("Failed to recognize sat: %s" % sat) + logger.info("sat: %s" % sat) + logger.info("sensor: %s" % sensor) + logger.info("platform: %s" % platform) + + # get calibration tar product type + typ = "auxiliary" + aux_type = info['type'] + if aux_type == "CAL": dataset = "S1-AUX_CAL" + else: raise RuntimeError("Failed to recognize auxiliary type: %s" % aux_type) + logger.info("typ: %s" % typ) + logger.info("aux_type: %s" % aux_type) + logger.info("dataset: %s" % dataset) + + # get metadata json + met = { + "creationTime": create_dt.isoformat('T'), + "data_product_name": id, + "sensingStart": valid_start.isoformat('T'), + "sensor": sensor, + "platform": platform, + "dataset": dataset, + "archive_filename": cal_tar_file_base, + } + logger.info("met: %s" % json.dumps(met, indent=2, sort_keys=True)) + + # get dataset json + ds = get_dataset_json(met, version) + logger.info("dataset: %s" % json.dumps(ds, indent=2, sort_keys=True)) + + # dedup dataset + total, found_id = check_cal(ds_es_url, "grq", id) + logger.info("total, found_id: %s %s" % (total, found_id)) + if total > 0: + logger.info("Found %s in %s. Dedupping dataset." % (id, ds_es_url)) + return + + # create dataset + id, ds_dir = create_dataset(ds, met, cal_tar_file) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("cal_tar_file", help="Sentinel1 calibration tar file") + parser.add_argument("ds_es_url", help="ElasticSearch URL for datasets, e.g. " + + "http://aria-products.jpl.nasa.gov:9200") + parser.add_argument("--dataset_version", help="dataset version", + default="v1.1", required=False) + args = parser.parse_args() + try: create_cal_ds(args.cal_tar_file, args.ds_es_url, args.dataset_version) + except Exception as e: + with open('_alt_error.txt', 'a') as f: + f.write("%s\n" % str(e)) + with open('_alt_traceback.txt', 'a') as f: + f.write("%s\n" % traceback.format_exc()) + raise diff --git a/create_orbit_ds.py b/create_orbit_ds.py new file mode 100755 index 0000000..a57f918 --- /dev/null +++ b/create_orbit_ds.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +""" +Create a HySDS dataset from a Sentinel1 precise or restituted orbit. +""" + +import os, sys, time, re, json, requests, shutil, logging, traceback, argparse +from datetime import datetime, timedelta + +from crawl_orbits import check_orbit + +# set logger +log_format = "[%(asctime)s: %(levelname)s/%(funcName)s] %(message)s" +logging.basicConfig(format=log_format, level=logging.INFO) + +class LogFilter(logging.Filter): + def filter(self, record): + if not hasattr(record, 'id'): record.id = '--' + return True + +logger = logging.getLogger('create_orbit_ds') +logger.setLevel(logging.INFO) +logger.addFilter(LogFilter()) + + +# regexes +ORBIT_RE = re.compile(r'^(?PS1.+?)_OPER_AUX_(?P.*?)_OPOD_(?P\d{4})(?P\d{2})(?P\d{2})T(?P\d{2})(?P\d{2})(?P\d{2})_V(?P\d{4})(?P\d{2})(?P\d{2})T(?P\d{2})(?P\d{2})(?P\d{2})_(?P\d{4})(?P\d{2})(?P\d{2})T(?P\d{2})(?P\d{2})(?P\d{2})-(?P.+)$') +PLATFORM_RE = re.compile(r'S1(.+?)_') + + +def get_dataset_json(met, version): + """Generated HySDS dataset JSON from met JSON.""" + + return { + "version": version, + "label": met['data_product_name'], + "starttime": met['sensingStart'], + "endtime": met['sensingStop'], + } + + +def create_dataset(ds, met, orbit_file, root_ds_dir="."): + """Create dataset. Return tuple of (dataset ID, dataset dir).""" + + # create dataset dir + id = met['data_product_name'] + root_ds_dir = os.path.abspath(root_ds_dir) + ds_dir = os.path.join(root_ds_dir, id) + if not os.path.isdir(ds_dir): os.makedirs(ds_dir, 0755) + + # dump dataset and met JSON + ds_file = os.path.join(ds_dir, "%s.dataset.json" % id) + met_file = os.path.join(ds_dir, "%s.met.json" % id) + with open(ds_file, 'w') as f: + json.dump(ds, f, indent=2, sort_keys=True) + with open(met_file, 'w') as f: + json.dump(met, f, indent=2, sort_keys=True) + + # copy orbit file + shutil.copy(orbit_file, ds_dir) + + logger.info("created dataset %s" % ds_dir) + return id, ds_dir + + +def create_orbit_ds(orbit_file, ds_es_url, version="v1.1"): + """Create orbit dataset.""" + + # extract info from orbit filename + orbit_file_base = os.path.basename(orbit_file) + id = "%s-%s" % (os.path.splitext(orbit_file_base)[0], version) + match = ORBIT_RE.search(id) + if not match: + raise RuntimeError("Failed to extract info from orbit filename %s." % id) + info = match.groupdict() + + # get dates + create_dt = datetime(*[int(info[i]) for i in ['cr_yr', 'cr_mo', 'cr_dy', 'cr_hh', 'cr_mm', 'cr_ss']]) + valid_start = datetime(*[int(info[i]) for i in ['vs_yr', 'vs_mo', 'vs_dy', 'vs_hh', 'vs_mm', 'vs_ss']]) + valid_end = datetime(*[int(info[i]) for i in ['ve_yr', 've_mo', 've_dy', 've_hh', 've_mm', 've_ss']]) + logger.info("create date: %s" % create_dt) + logger.info("validity start date: %s" % valid_start) + logger.info("validity end date: %s" % valid_end) + + # get sat/platform and sensor + sensor = "SAR-C Sentinel1" + sat = info['sat'] + if sat == "S1A": platform = "Sentinel-1A" + elif sat == "S1B": platform = "Sentinel-1B" + else: raise RuntimeError("Failed to recognize sat: %s" % sat) + logger.info("sat: %s" % sat) + logger.info("sensor: %s" % sensor) + logger.info("platform: %s" % platform) + + # get orbit product type + typ = "orbit" + orbit_type = info['type'] + if orbit_type == "POEORB": dataset = "S1-AUX_POEORB" + elif orbit_type == "RESORB": dataset = "S1-AUX_RESORB" + else: raise RuntimeError("Failed to recognize orbit type: %s" % orbit_type) + logger.info("typ: %s" % typ) + logger.info("orbit_type: %s" % orbit_type) + logger.info("dataset: %s" % dataset) + + # get metadata json + met = { + "creationTime": create_dt.isoformat('T'), + "data_product_name": id, + "sensingStart": valid_start.isoformat('T'), + "sensingStop": valid_end.isoformat('T'), + "sensor": sensor, + "platform": platform, + "dataset": dataset, + "archive_filename": orbit_file_base, + } + logger.info("met: %s" % json.dumps(met, indent=2, sort_keys=True)) + + # get dataset json + ds = get_dataset_json(met, version) + logger.info("dataset: %s" % json.dumps(ds, indent=2, sort_keys=True)) + + # dedup dataset + total, found_id = check_orbit(ds_es_url, "grq", id) + logger.info("total, found_id: %s %s" % (total, found_id)) + if total > 0: + logger.info("Found %s in %s. Dedupping dataset." % (id, ds_es_url)) + return + + # create dataset + id, ds_dir = create_dataset(ds, met, orbit_file) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("orbit_file", help="Sentinel1 precise/restituted orbit file") + parser.add_argument("ds_es_url", help="ElasticSearch URL for datasets, e.g. " + + "http://aria-products.jpl.nasa.gov:9200") + parser.add_argument("--dataset_version", help="dataset version", + default="v1.1", required=False) + args = parser.parse_args() + try: create_orbit_ds(args.orbit_file, args.ds_es_url, args.dataset_version) + except Exception as e: + with open('_alt_error.txt', 'a') as f: + f.write("%s\n" % str(e)) + with open('_alt_traceback.txt', 'a') as f: + f.write("%s\n" % traceback.format_exc()) + raise diff --git a/cron_crawler.py b/cron_crawler.py new file mode 100755 index 0000000..79699ba --- /dev/null +++ b/cron_crawler.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +""" +Cron script to submit Sentinel-1 crawler job. +""" + +from __future__ import print_function + +import os, sys, json, requests, argparse +from datetime import datetime, timedelta +import argparse + +from hysds_commons.job_utils import submit_mozart_job +from hysds.celery import app + + +if __name__ == "__main__": + ''' + Main program that is run by cron to submit a Sentinel-1 crawler job + ''' + + parser = argparse.ArgumentParser(description=__doc__) + parser.add_argument("ds_es_url", help="ElasticSearch URL for datasets, e.g. " + + "http://aria-products.jpl.nasa.gov:9200") + parser.add_argument("--dataset_version", help="dataset version", + default="v1.1", required=False) + parser.add_argument("--tag", help="PGE docker image tag (release, version, " + + "or branch) to propagate", + default="master", required=False) + parser.add_argument("--type", help="Sentinel-1 QC file type to crawl", + choices=['orbit', 'calibration'], required=True) + args = parser.parse_args() + + ds_es_url = args.ds_es_url + dataset_version = args.dataset_version + tag = args.tag + qc_type = args.type + job_spec = "job-s1_%s_crawler:%s" % (qc_type, tag) + + job_name = job_spec + job_name = job_name.lstrip('job-') + + #Setup input arguments here + rule = { + "rule_name": job_name, + "queue": "factotum-job_worker-small", + "priority": 0, + "kwargs":'{}' + } + params = [ + { + "name": "version_opt", + "from": "value", + "value": "--dataset_version", + }, + { + "name": "version", + "from": "value", + "value": dataset_version, + }, + { + "name": "tag_opt", + "from": "value", + "value": "--tag", + }, + { + "name": "tag", + "from": "value", + "value": tag, + }, + { + "name": "es_dataset_url", + "from": "value", + "value": ds_es_url, + } + ] + print("submitting %s crawler job" % qc_type) + submit_mozart_job({}, rule, + hysdsio={"id": "internal-temporary-wiring", + "params": params, + "job-specification": job_spec}, + job_name=job_name, enable_dedup=False) diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..a46c08a --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,15 @@ +FROM hysds/pge-base + +MAINTAINER aria-dev "aria-dev@jpl.nasa.gov" +LABEL description="Sentinel1 orbit crawler/ingester PGE" + +USER ops + +# copy packages +COPY . /home/ops/verdi/ops/s1_qc_ingest +RUN set -ex \ + && source /home/ops/verdi/bin/activate \ + && sudo chown -R ops:ops /home/ops/verdi/ops/s1_qc_ingest + +WORKDIR /home/ops +CMD ["/bin/bash", "--login"] diff --git a/docker/hysds-io.json.s1_calibration_crawler b/docker/hysds-io.json.s1_calibration_crawler new file mode 100644 index 0000000..5ae1fce --- /dev/null +++ b/docker/hysds-io.json.s1_calibration_crawler @@ -0,0 +1,29 @@ +{ + "label" : "Sentinel-1 Auxiliary Calibration Crawler", + "allowed_accounts": [ "ops" ], + "submission_type":"individual", + "params" : [ + { + "name": "version_opt", + "from": "value", + "value": "--dataset_version" + }, + { + "name": "version", + "from": "submitter" + }, + { + "name": "tag_opt", + "from": "value", + "value": "--tag" + }, + { + "name": "tag", + "from": "submitter" + }, + { + "name": "es_dataset_url", + "from": "submitter" + } + ] +} diff --git a/docker/hysds-io.json.s1_orbit_crawler b/docker/hysds-io.json.s1_orbit_crawler new file mode 100644 index 0000000..439c00d --- /dev/null +++ b/docker/hysds-io.json.s1_orbit_crawler @@ -0,0 +1,29 @@ +{ + "label" : "Sentinel-1 Orbit Crawler", + "allowed_accounts": [ "ops" ], + "submission_type":"individual", + "params" : [ + { + "name": "version_opt", + "from": "value", + "value": "--dataset_version" + }, + { + "name": "version", + "from": "submitter" + }, + { + "name": "tag_opt", + "from": "value", + "value": "--tag" + }, + { + "name": "tag", + "from": "submitter" + }, + { + "name": "es_dataset_url", + "from": "submitter" + } + ] +} diff --git a/docker/hysds-io.json.s1_orbit_ingest b/docker/hysds-io.json.s1_orbit_ingest new file mode 100644 index 0000000..f332e06 --- /dev/null +++ b/docker/hysds-io.json.s1_orbit_ingest @@ -0,0 +1,28 @@ +{ + "label" : "Sentinel-1 Orbit Ingest", + "allowed_accounts": [ "ops" ], + "submission_type":"individual", + "params" : [ + { + "name": "version_opt", + "from": "value", + "value": "--dataset_version" + }, + { + "name": "version", + "from": "submitter" + }, + { + "name": "orbit_url", + "from": "submitter" + }, + { + "name": "orbit_file", + "from": "submitter" + }, + { + "name": "es_dataset_url", + "from": "submitter" + } + ] +} diff --git a/docker/job-spec.json.s1_calibration_crawler b/docker/job-spec.json.s1_calibration_crawler new file mode 100644 index 0000000..a34f136 --- /dev/null +++ b/docker/job-spec.json.s1_calibration_crawler @@ -0,0 +1,33 @@ +{ + "command": "/home/ops/verdi/ops/s1_qc_ingest/crawl_cals.py", + "imported_worker_files": { + "/home/ops/.aws": ["/home/ops/.aws", "ro"], + "/home/ops/.netrc": "/home/ops/.netrc" + }, + "recommended-queues" : [ "factotum-job_worker-small" ], + "disk_usage":"1GB", + "soft_time_limit": 1800, + "time_limit": 2100, + "params" : [ + { + "name": "version_opt", + "destination": "positional" + }, + { + "name": "version", + "destination": "positional" + }, + { + "name": "tag_opt", + "destination": "positional" + }, + { + "name": "tag", + "destination": "positional" + }, + { + "name": "es_dataset_url", + "destination": "positional" + } + ] +} diff --git a/docker/job-spec.json.s1_orbit_crawler b/docker/job-spec.json.s1_orbit_crawler new file mode 100644 index 0000000..ea66419 --- /dev/null +++ b/docker/job-spec.json.s1_orbit_crawler @@ -0,0 +1,33 @@ +{ + "command": "/home/ops/verdi/ops/s1_qc_ingest/crawl_orbits.py", + "imported_worker_files": { + "/home/ops/.aws": ["/home/ops/.aws", "ro"], + "/home/ops/.netrc": "/home/ops/.netrc" + }, + "recommended-queues" : [ "factotum-job_worker-small" ], + "disk_usage":"1GB", + "soft_time_limit": 1800, + "time_limit": 2100, + "params" : [ + { + "name": "version_opt", + "destination": "positional" + }, + { + "name": "version", + "destination": "positional" + }, + { + "name": "tag_opt", + "destination": "positional" + }, + { + "name": "tag", + "destination": "positional" + }, + { + "name": "es_dataset_url", + "destination": "positional" + } + ] +} diff --git a/docker/job-spec.json.s1_orbit_ingest b/docker/job-spec.json.s1_orbit_ingest new file mode 100644 index 0000000..c5f1894 --- /dev/null +++ b/docker/job-spec.json.s1_orbit_ingest @@ -0,0 +1,33 @@ +{ + "command": "/home/ops/verdi/ops/s1_qc_ingest/create_orbit_ds.py", + "imported_worker_files": { + "/home/ops/.aws": ["/home/ops/.aws", "ro"], + "/home/ops/.netrc": "/home/ops/.netrc" + }, + "recommended-queues" : [ "factotum-job_worker-large" ], + "disk_usage":"1GB", + "soft_time_limit": 300, + "time_limit": 600, + "params" : [ + { + "name": "version_opt", + "destination": "positional" + }, + { + "name": "version", + "destination": "positional" + }, + { + "name": "orbit_url", + "destination": "localize" + }, + { + "name": "orbit_file", + "destination": "positional" + }, + { + "name": "es_dataset_url", + "destination": "positional" + } + ] +}