From 323f462e49afebf4c0265ffb21356075cae1e2ae Mon Sep 17 00:00:00 2001 From: "Smith, Brent" Date: Wed, 20 Nov 2024 02:14:07 +0000 Subject: [PATCH] merge with master; custom download scripts cloud awareness --- README.md | 5 + docs/source/getting_started.rst | 6 + docs/source/projects.rst | 3 +- pyspedas/projects/cluster/load_csa.py | 54 +- pyspedas/projects/cluster/tests/uri_tests.py | 646 ++++++++++++++++++ .../maven/download_files_utilities.py | 111 ++- pyspedas/projects/maven/kp_utilities.py | 12 +- pyspedas/projects/maven/maven_kp_to_tplot.py | 30 +- pyspedas/projects/maven/maven_load.py | 48 +- pyspedas/projects/maven/orbit_time.py | 16 +- pyspedas/projects/maven/read_iuvs_file.py | 12 +- pyspedas/projects/maven/tests/uri_tests.py | 524 ++++++++++++++ pyspedas/projects/maven/utilities.py | 105 ++- .../mms_get_local_ancillary_files.py | 16 +- .../mec_ascii/mms_get_local_state_files.py | 16 +- .../mms/mec_ascii/mms_get_state_data.py | 48 +- .../mms/mec_ascii/mms_get_tetrahedron_qf.py | 47 +- pyspedas/projects/mms/mms_get_local_files.py | 38 +- pyspedas/projects/mms/mms_load_data.py | 49 +- pyspedas/projects/mms/tests/load_uri_tests.py | 517 ++++++++++++++ pyspedas/utilities/download.py | 1 - .../utilities/tests/download_uri_tests.py | 145 ++++ 22 files changed, 2323 insertions(+), 126 deletions(-) create mode 100644 pyspedas/projects/cluster/tests/uri_tests.py create mode 100644 pyspedas/projects/maven/tests/uri_tests.py create mode 100644 pyspedas/projects/mms/tests/load_uri_tests.py create mode 100644 pyspedas/utilities/tests/download_uri_tests.py diff --git a/README.md b/README.md index 722ba1f6..fa35772b 100644 --- a/README.md +++ b/README.md @@ -111,6 +111,11 @@ The recommended way of setting your local data directory is to set the `SPEDAS_D Mission specific data directories (e.g., `MMS_DATA_DIR` for MMS, `THM_DATA_DIR` for THEMIS) can also be set, and these will override `SPEDAS_DATA_DIR` +## Cloud Repositories + +`SPEDAS_DATA_DIR` and mission specific data directories can also be the URI of a cloud repository (e.g., an S3 repository). If this data directory is set to an URI, files will be downloaded from the data server to the URI location. The data will then be streamed from the URI without needing to download the file locally. + +In order to successfully access the specified cloud repository, the user is required to correctly set up permissions to be able to read and write to that cloud repository on their own. Refer (here)[https://docs.aws.amazon.com/cli/v1/userguide/cli-configure-files.html] for how to prepare your AWS configuration and credentials. ## Usage diff --git a/docs/source/getting_started.rst b/docs/source/getting_started.rst index 22df2ce3..ecce5dce 100644 --- a/docs/source/getting_started.rst +++ b/docs/source/getting_started.rst @@ -31,6 +31,12 @@ By default, the data is stored in your pyspedas directory in a folder named 'pyd Mission specific data directories (e.g., **MMS_DATA_DIR** for MMS, **THM_DATA_DIR** for THEMIS) can also be set, and these will override **SPEDAS_DATA_DIR**. +Cloud Repositories +------------------------ +**SPEDAS_DATA_DIR** and mission specific data directories can also be the URI of a cloud repository (e.g., an S3 repository). If this data directory is set to an URI, files will be downloaded from the data server to the URI location. The data will then be streamed from the URI without needing to download the file locally. + +In order to successfully access the specified cloud repository, the user is required to correctly set up permissions to be able to read and write to that cloud repository on their own. Refer `here `_ for how to prepare your AWS configuration and credentials. + Loading and Plotting Data --------------------------- You can load data into tplot variables by calling pyspedas.mission.instrument(), e.g., diff --git a/docs/source/projects.rst b/docs/source/projects.rst index eed88992..e092bb62 100644 --- a/docs/source/projects.rst +++ b/docs/source/projects.rst @@ -8,7 +8,8 @@ Some key points that apply to most or all of these load routines: * PySPEDAS maintains a cache of previously downloaded data. The cache location to use is controlled by the SPEDAS_DATA_DIR environment variable. Many missions allow the user to set a data directory specific to that mission, overriding the global SPEDAS_DATA_DIR setting. For example, - THM_DATA_DIR can be used to specify the local directory to use for the THEMIS mission. + THM_DATA_DIR can be used to specify the local directory to use for the THEMIS mission. + The cache location can be a local file directory or a URI location (e.g., an S3 repository). * By default, PySPEDAS contacts the data server to get a list of filenames to fulfill the request, and compares the modification times on the server and locally cached files to determine diff --git a/pyspedas/projects/cluster/load_csa.py b/pyspedas/projects/cluster/load_csa.py index 262e4622..e97aa40d 100644 --- a/pyspedas/projects/cluster/load_csa.py +++ b/pyspedas/projects/cluster/load_csa.py @@ -22,6 +22,8 @@ from typing import List from .config import CONFIG +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def cl_master_datatypes(): """Return list of data types.""" @@ -193,9 +195,17 @@ def load_csa(trange:List[str]=['2001-02-01', '2001-02-03'], # Encode the url urllib.parse.quote url = base_url + (query_string) - local_path = CONFIG['local_data_dir'] - Path(local_path).mkdir(parents=True, exist_ok=True) - out_gz = os.path.join(local_path, 'temp_cluster_file.tar.gz') # Temp file name + local_path = CONFIG['local_data_dir'] # could be URI + if is_fsspec_uri(local_path): + local_protocol, lpath = local_path.split("://") + local_fs = fsspec.filesystem(local_protocol, anon=False) + + out_gz = '/'.join([local_path, 'temp_cluster_file.tar.gz']) # Temp file name + fileobj = local_fs.open(out_gz, 'wb') + else: + Path(local_path).mkdir(parents=True, exist_ok=True) + out_gz = os.path.join(local_path, 'temp_cluster_file.tar.gz') # Temp file name + fileobj = open(out_gz, 'wb') # Download the file. logging.info("Downloading Cluster data, please wait....") @@ -211,24 +221,46 @@ def load_csa(trange:List[str]=['2001-02-01', '2001-02-03'], logging.info("Download complete.") # Open the downloaded file. - with open(out_gz, 'wb') as w: + with fileobj as w: w.write(r.content) # Extract the tar archive. - tar = tarfile.open(out_gz, "r:gz") - f = tar.getnames() - if sys.version_info >= (3, 12): - tar.extractall(path=local_path, filter='fully_trusted') + if is_fsspec_uri(out_gz): + # Cloud-Awareness: Opens byte stream for tarfile package. + bo = local_fs.open(out_gz, "rb") + tar = tarfile.open(fileobj=bo) else: - tar.extractall(path=local_path) + tar = tarfile.open(out_gz, "r:gz") + f = tar.getnames() + + for member in tar.getmembers(): + if member.isfile(): + p = '/'.join([local_path, member.path]) + if is_fsspec_uri(p): + membo = local_fs.open(p, "wb") + else: + os.makedirs(str(Path(p).parent), exist_ok=True) + membo = open(p, "wb") + + # Python > 3.9 requirement from setup.py + # note: data is written after file is read into memory + # https://stackoverflow.com/a/62247729 + with tar.extractfile(member.path) as tarbo: + membo.write(tarbo.read()) + + membo.close() tar.close() # Remove the tar.gz file but keep the extracted. - os.remove(out_gz) + if is_fsspec_uri(out_gz): + local_fs.delete(out_gz) + else: + os.remove(out_gz) # Get unique set of files. f_set = set(f) # File list with full path. - out_files = [os.path.join(local_path, s) for s in list(f_set)] + sep = "/" if is_fsspec_uri(local_path) else os.path.sep + out_files = [sep.join([local_path, s]) for s in list(f_set)] out_files = sorted(out_files) if downloadonly: diff --git a/pyspedas/projects/cluster/tests/uri_tests.py b/pyspedas/projects/cluster/tests/uri_tests.py new file mode 100644 index 00000000..5e6249ca --- /dev/null +++ b/pyspedas/projects/cluster/tests/uri_tests.py @@ -0,0 +1,646 @@ +import os +import time +import requests +import unittest +import subprocess + +from pytplot import data_exists +from pyspedas.projects.cluster import load_csa, config + +#========================================================== + +# moto server mock details +localhost = "http://localhost:3000" +bucket_name = "test-bucket" + +# Set up mock AWS environment variables (fake credentials) +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" +os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + +# Set environment to use the local Moto S3 server +# S3 ENDPOINT for fsspec +# ENDPOINT URL for cdflib/boto3 +os.environ["AWS_S3_ENDPOINT"] = localhost +os.environ["AWS_ENDPOINT_URL"] = localhost + +class LoadTestCases(unittest.TestCase): + """ + Cloud Awareness Unit Tests + + Depends upon moto[server] package. Install via: + pip install moto[server] + + These tests essentially create a local mock-AWS server as a background + process at port 3000. + Note: The environment variables are used as mock credentials in order + to avoid having to pass the endpoint url to fsspec calls. + """ + + @classmethod + def setUpClass(cls): + # Start the moto server for S3 in the background + # https://github.com/getmoto/moto/issues/4418 + cls.moto_server = subprocess.Popen( + ["moto_server", "-p3000"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Allow the server to start properly + time.sleep(2) + + # Create a bucket using direct HTTP requests + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + @classmethod + def tearDownClass(cls): + # Terminate the moto server after tests + cls.moto_server.terminate() + cls.moto_server.communicate() + + def clean_data(self): + # reset moto server to original state + response = requests.post("http://localhost:3000/moto-api/reset") + assert response.status_code == 200, "Moto Server reset failed" + + # create bucket again + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + #========================================================================== + # Cloud Awareness Note: Cluster implementation does not stream data due to + # lack of functionality already present in PySPEDAS. + + # Adapted unit tests for AWS-specific URI testing. + def test_load_csa_CE_WBD_WAVEFORM_CDF_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2012-11-06T02:19:00Z', '2012-11-06T02:19:59Z'] + wbd_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CE_WBD_WAVEFORM_CDF'], time_clip=True) + print(wbd_data) + self.assertTrue('WBD_Elec' in wbd_data) + self.assertTrue(data_exists('WBD_Elec')) + + def test_load_csa_CP_AUX_POSGSE_1M_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + pos_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_AUX_POSGSE_1M'], time_clip=True) + self.assertTrue('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M' in pos_data) + self.assertTrue(data_exists('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M')) + + def test_load_csa_CP_AUX_POSGSE_1M_data_prefix_none(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + pos_data = load_csa(probes=['C1'], + trange=trange, + prefix=None, + datatypes=['CP_AUX_POSGSE_1M'], time_clip=True) + self.assertTrue('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M' in pos_data) + self.assertTrue(data_exists('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M')) + + def test_load_csa_CP_AUX_POSGSE_1M_data_suffix_none(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + pos_data = load_csa(probes=['C1'], + trange=trange, + suffix=None, + datatypes=['CP_AUX_POSGSE_1M'], time_clip=True) + self.assertTrue('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M' in pos_data) + self.assertTrue(data_exists('sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M')) + + def test_load_csa_CP_AUX_POSGSE_1M_data_prefix_suffix(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + pos_data = load_csa(probes=['C1'], + trange=trange, + prefix='pre_', + suffix='_suf', + datatypes=['CP_AUX_POSGSE_1M'], time_clip=True) + self.assertTrue('pre_sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M_suf' in pos_data) + self.assertTrue(data_exists('pre_sc_r_xyz_gse__C1_CP_AUX_POSGSE_1M_suf')) + + def test_load_csa_CP_CIS_CODIF_HS_H1_MOMENTS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_HS_H1_MOMENTS'], time_clip=True) + print(mom_data) + self.assertTrue('density__C1_CP_CIS_CODIF_HS_H1_MOMENTS' in mom_data) + self.assertTrue(data_exists('density__C1_CP_CIS_CODIF_HS_H1_MOMENTS')) + + def test_load_csa_CP_CIS_CODIF_HS_He1_MOMENTS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_HS_He1_MOMENTS'], time_clip=True) + print(mom_data) + self.assertTrue('density__C1_CP_CIS_CODIF_HS_He1_MOMENTS' in mom_data) + self.assertTrue(data_exists('density__C1_CP_CIS_CODIF_HS_He1_MOMENTS')) + + def test_load_csa_CP_CIS_CODIF_HS_O1_MOMENTS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_HS_O1_MOMENTS'], time_clip=True) + print(mom_data) + self.assertTrue('density__C1_CP_CIS_CODIF_HS_O1_MOMENTS' in mom_data) + self.assertTrue(data_exists('density__C1_CP_CIS_CODIF_HS_O1_MOMENTS')) + + def test_load_csa_CP_CIS_CODIF_PAD_HS_H1_PF_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_PAD_HS_H1_PF'], time_clip=True) + print(mom_data) + self.assertTrue('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_H1_PF' in mom_data) + self.assertTrue(data_exists('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_H1_PF')) + + def test_load_csa_CP_CIS_CODIF_PAD_HS_He1_PF_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_PAD_HS_He1_PF'], time_clip=True) + print(mom_data) + self.assertTrue('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_He1_PF' in mom_data) + self.assertTrue(data_exists('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_He1_PF')) + + def test_load_csa_CP_CIS_CODIF_PAD_HS_O1_PF_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-CODIF_PAD_HS_O1_PF'], time_clip=True) + print(mom_data) + self.assertTrue('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_O1_PF' in mom_data) + self.assertTrue(data_exists('Differential_Particle_Flux__C1_CP_CIS_CODIF_PAD_HS_O1_PF')) + + def test_load_csa_CP_CIS_HIA_ONBOARD_MOMENTS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-HIA_ONBOARD_MOMENTS'], time_clip=True) + print(mom_data) + self.assertTrue('density__C1_CP_CIS_HIA_ONBOARD_MOMENTS' in mom_data) + self.assertTrue(data_exists('density__C1_CP_CIS_HIA_ONBOARD_MOMENTS')) + + # This test uses a different time range from test_load_csa_CP_CIS_HIA_ONBOARD_MOMENTS_data, + # and loads for all four probes. + def test_load_csa_mom_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + mom_data = load_csa(probes=['C1', 'C2', 'C3', 'C4'], + trange=['2003-08-17/16:40', '2003-08-17/16:45'], + datatypes=['CP_CIS-HIA_ONBOARD_MOMENTS'], time_clip=True) + self.assertTrue('density__C1_CP_CIS_HIA_ONBOARD_MOMENTS' in mom_data) + self.assertTrue(data_exists('density__C1_CP_CIS_HIA_ONBOARD_MOMENTS')) + + def test_load_csa_CP_CIS_HIA_PAD_HS_MAG_IONS_PF_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_CIS-HIA_PAD_HS_MAG_IONS_PF'], time_clip=True) + print(mom_data) + self.assertTrue('Differential_Particle_Flux__C1_CP_CIS_HIA_PAD_HS_MAG_IONS_PF' in mom_data) + self.assertTrue(data_exists('Differential_Particle_Flux__C1_CP_CIS_HIA_PAD_HS_MAG_IONS_PF')) + + def test_load_csa_CP_EDI_AEDC_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2005-08-01T00:00:00Z', '2005-08-02T00:00:00Z'] + edi_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EDI_AEDC'], time_clip=True) + print(edi_data) + self.assertTrue('counts_GDU1_PA_90__C1_CP_EDI_AEDC' in edi_data) + self.assertTrue(data_exists('counts_GDU1_PA_90__C1_CP_EDI_AEDC')) + + def test_load_csa_CP_EDI_MP_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + edi_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EDI_MP'], time_clip=True) + self.assertTrue('V_ed_xyz_gse__C1_CP_EDI_MP' in edi_data) + self.assertTrue(data_exists('V_ed_xyz_gse__C1_CP_EDI_MP')) + + def test_load_csa_CP_EDI_SPIN_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + edi_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EDI_SPIN'], time_clip=True) + self.assertTrue('V_ed_xyz_gse__C1_CP_EDI_SPIN' in edi_data) + self.assertTrue(data_exists('V_ed_xyz_gse__C1_CP_EDI_SPIN')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_EFW_L2_E3D_INERT_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L2_E3D_INERT'], time_clip=True) + self.assertTrue('E_Vec_xyz_ISR2__C1_CP_EFW_L2_E3D_INERT' in efw_data) + self.assertTrue(data_exists('E_Vec_xyz_ISR2__C1_CP_EFW_L2_E3D_INERT')) + + def test_load_csa_CP_EFW_L2_P_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-04T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L2_P'], time_clip=True) + self.assertTrue('Spacecraft_potential__C1_CP_EFW_L2_P' in efw_data) + self.assertTrue(data_exists('Spacecraft_potential__C1_CP_EFW_L2_P')) + + def test_load_csa_CP_EFW_L2_V3D_INERT_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L2_V3D_INERT'], time_clip=True) + self.assertTrue('v_drift_ISR2__C1_CP_EFW_L2_V3D_INERT' in efw_data) + self.assertTrue(data_exists('v_drift_ISR2__C1_CP_EFW_L2_V3D_INERT')) + + def test_load_csa_CP_EFW_L3_E3D_INERT_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L3_E3D_INERT'], time_clip=True) + self.assertTrue('E_Vec_xyz_ISR2__C1_CP_EFW_L3_E3D_INERT' in efw_data) + self.assertTrue(data_exists('E_Vec_xyz_ISR2__C1_CP_EFW_L3_E3D_INERT')) + + def test_load_csa_CP_EFW_L3_P_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L3_P'], time_clip=True) + self.assertTrue('Spacecraft_potential__C1_CP_EFW_L3_P' in efw_data) + self.assertTrue(data_exists('Spacecraft_potential__C1_CP_EFW_L3_P')) + + def test_load_csa_CP_EFW_L3_V3D_INERT_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + efw_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_EFW_L3_V3D_INERT'], time_clip=True) + self.assertTrue('v_drift_ISR2__C1_CP_EFW_L3_V3D_INERT' in efw_data) + self.assertTrue(data_exists('v_drift_ISR2__C1_CP_EFW_L3_V3D_INERT')) + + def test_load_csa_CP_FGM_5VPS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + fgm_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_FGM_5VPS'], time_clip=True) + self.assertTrue('B_vec_xyz_gse__C1_CP_FGM_5VPS' in fgm_data) + self.assertTrue(data_exists('B_vec_xyz_gse__C1_CP_FGM_5VPS')) + + def test_load_csa_CP_FGM_FULL_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + fgm_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_FGM_FULL'], time_clip=True) + self.assertTrue('B_vec_xyz_gse__C1_CP_FGM_FULL' in fgm_data) + self.assertTrue(data_exists('B_vec_xyz_gse__C1_CP_FGM_FULL')) + + def test_load_csa_CP_FGM_SPIN_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + fgm_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_FGM_SPIN'], time_clip=True) + self.assertTrue('B_vec_xyz_gse__C1_CP_FGM_SPIN' in fgm_data) + self.assertTrue(data_exists('B_vec_xyz_gse__C1_CP_FGM_SPIN')) + + def test_load_csa_CP_PEA_MOMENTS_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + mom_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_PEA_MOMENTS'], time_clip=True) + self.assertTrue('Data_Density__C1_CP_PEA_MOMENTS' in mom_data) + self.assertTrue(data_exists('Data_Density__C1_CP_PEA_MOMENTS')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_PEA_PITCH_SPIN_DEFlux_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + pea_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_PEA_PITCH_SPIN_DEFlux'], time_clip=True) + self.assertTrue('Data__C1_CP_PEA_PITCH_SPIN_DEFlux' in pea_data) + self.assertTrue(data_exists('Data__C1_CP_PEA_PITCH_SPIN_DEFlux')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_PEA_PITCH_SPIN_DPFlux_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + pea_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_PEA_PITCH_SPIN_DPFlux'], time_clip=True) + self.assertTrue('Data__C1_CP_PEA_PITCH_SPIN_DPFlux' in pea_data) + self.assertTrue(data_exists('Data__C1_CP_PEA_PITCH_SPIN_DPFlux')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_PEA_PITCH_SPIN_PSD_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + pea_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_PEA_PITCH_SPIN_PSD'], time_clip=True) + self.assertTrue('Data__C1_CP_PEA_PITCH_SPIN_PSD' in pea_data) + self.assertTrue(data_exists('Data__C1_CP_PEA_PITCH_SPIN_PSD')) + + # compressed file error + @unittest.skip + def test_load_csa_CP_RAP_ESPCT6_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2008-02-03T00:00:00Z', '2008-02-05T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_ESPCT6'], time_clip=True) + print(rap_data) + self.assertTrue('Electron_Dif_flux__C1_CP_RAP_ESPCT6' in rap_data) + self.assertTrue(data_exists('Electron_Dif_flux__C1_CP_RAP_ESPCT6')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_RAP_ESPCT6_R_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_ESPCT6_R'], time_clip=True) + self.assertTrue('Electron_Rate__C1_CP_RAP_ESPCT6_R' in rap_data) + self.assertTrue(data_exists('Electron_Rate__C1_CP_RAP_ESPCT6_R')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_RAP_HSPCT_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_HSPCT'], time_clip=True) + self.assertTrue('Proton_Dif_flux__C1_CP_RAP_HSPCT' in rap_data) + self.assertTrue(data_exists('Proton_Dif_flux__C1_CP_RAP_HSPCT')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_RAP_HSPCT_R_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_HSPCT_R'], time_clip=True) + self.assertTrue('Proton_Rate__C1_CP_RAP_HSPCT_R' in rap_data) + self.assertTrue(data_exists('Proton_Rate__C1_CP_RAP_HSPCT_R')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_RAP_ISPCT_CNO_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_ISPCT_CNO'], time_clip=True) + self.assertTrue('CNO_Dif_flux__C1_CP_RAP_ISPCT_CNO' in rap_data) + self.assertTrue(data_exists('CNO_Dif_flux__C1_CP_RAP_ISPCT_CNO')) + + # multidimensional DEPEND_0 array for dsettings/caveat variables + def test_load_csa_CP_RAP_ISPCT_He_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + rap_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_RAP_ISPCT_He'], time_clip=True) + self.assertTrue('Helium_Dif_flux__C1_CP_RAP_ISPCT_He' in rap_data) + self.assertTrue(data_exists('Helium_Dif_flux__C1_CP_RAP_ISPCT_He')) + + # This date returns data, but it's large enough (tens of megabytes that there's a significant + # chance of the server not completing the request. + @unittest.skip + def test_load_csa_CP_STA_CS_HBR_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-01-31T00:00:00Z', '2001-01-31T23:59:59Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_CS_HBR'], time_clip=True) + print(sta_data) + self.assertTrue('Complex_Spectrum__C1_CP_STA_CS_HBR' in sta_data) + self.assertTrue(data_exists('Complex_Spectrum__C1_CP_STA_CS_HBR')) + + # This date returns data, but it's large enough (tens of megabytes that there's a significant + # chance of the server not completing the request. + @unittest.skip + def test_load_csa_CP_STA_CS_NBR_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-01-31T00:00:00Z', '2001-01-31T23:59:59Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_CS_NBR'], time_clip=True) + print(sta_data) + self.assertTrue('Complex_Spectrum__C1_CP_STA_CS_NBR' in sta_data) + self.assertTrue(data_exists('Complex_Spectrum__C1_CP_STA_CS_NBR')) + + # compressed file error + @unittest.skip + def test_load_csa_CP_STA_CWF_GSE_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_CWF_GSE'], time_clip=True) + self.assertTrue('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE' in sta_data) + self.assertTrue(data_exists('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE')) + + # 404 error, no data? + @unittest.skip + def test_load_csa_CP_STA_CWF_HBR_ISR2_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_CWF_HBR_ISR2'], time_clip=True) + print(sta_data) + self.assertTrue('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE' in sta_data) + self.assertTrue(data_exists('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE')) + + # 404 error, no data? + @unittest.skip + def test_load_csa_CP_STA_CWF_NBR_ISR2_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_CWF_NBR_ISR2'], time_clip=True) + print(sta_data) + self.assertTrue('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE' in sta_data) + self.assertTrue(data_exists('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE')) + + # Spectrogram variables have string-valued DEPEND_2, which is not ISTP compliant. + @unittest.skip + def test_load_csa_CP_STA_PSD_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + sta_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_STA_PSD'], time_clip=True) + print(sta_data) + #self.assertTrue('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE' in sta_data) + #self.assertTrue(data_exists('B_vec_xyz_Instrument__C1_CP_STA_CWF_GSE')) + + # data load seemed rather large; potential timeout issue (works separately) + @unittest.skip + def test_load_csa_CP_WBD_WAVEFORM_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-04T13:40:00Z', '2001-02-04T13:49:59Z'] + wbd_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_WBD_WAVEFORM'], time_clip=True) + print(wbd_data) + self.assertTrue('E__C1_CP_WBD_WAVEFORM' in wbd_data) + self.assertTrue(data_exists('E__C1_CP_WBD_WAVEFORM')) + + def test_load_csa_CP_WHI_ELECTRON_DENSITY_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2018-01-12T00:00:00Z', '2018-01-13T00:00:00Z'] + whi_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_WHI_ELECTRON_DENSITY'], time_clip=True) + print(whi_data) + self.assertTrue('Electron_Density__C1_CP_WHI_ELECTRON_DENSITY' in whi_data) + self.assertTrue(data_exists('Electron_Density__C1_CP_WHI_ELECTRON_DENSITY')) + + def test_load_csa_CP_CP_WHI_NATURAL_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2001-02-01T00:00:00Z', '2001-02-02T00:00:00Z'] + whi_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['CP_WHI_NATURAL'], time_clip=True) + self.assertTrue('Electric_Spectral_Power_Density__C1_CP_WHI_NATURAL' in whi_data) + self.assertTrue(data_exists('Electric_Spectral_Power_Density__C1_CP_WHI_NATURAL')) + + def test_load_csa_JP_AUX_PMP_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2017-01-01T00:00:00Z', '2017-02-01T00:00:00Z'] + jp_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['JP_AUX_PMP'], time_clip=True) + print(jp_data) + self.assertTrue('L_value__C1_JP_AUX_PMP' in jp_data) + self.assertTrue(data_exists('L_value__C1_JP_AUX_PMP')) + + def test_load_csa_JP_PSE_data(self): + self.clean_data() + config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + trange = ['2017-01-01T00:00:00Z', '2018-01-02T00:00:00Z'] + jp_data = load_csa(probes=['C1'], + trange=trange, + datatypes=['JP_AUX_PSE'], time_clip=True) + print(jp_data) + self.assertTrue('sc_r1_xyz_gse__C1_JP_AUX_PSE' in jp_data) + self.assertTrue(data_exists('sc_r1_xyz_gse__C1_JP_AUX_PSE')) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/pyspedas/projects/maven/download_files_utilities.py b/pyspedas/projects/maven/download_files_utilities.py index 0c9eca0e..3fcf2d7b 100644 --- a/pyspedas/projects/maven/download_files_utilities.py +++ b/pyspedas/projects/maven/download_files_utilities.py @@ -2,6 +2,8 @@ from .file_regex import maven_kp_l2_regex from .config import CONFIG +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def get_filenames(query, public): """ @@ -96,7 +98,16 @@ def get_file_from_site(filename, public, data_dir): page = urllib.request.urlopen(public_url) logging.debug("get_file_from_site finished request to public url: %s", public_url) - with open(os.path.join(data_dir, filename), "wb") as code: + # Cloud Awareness: write page contents to URI + if is_fsspec_uri(data_dir): + protocol, path = data_dir.split("://") + fs = fsspec.filesystem(protocol) + + fo = fs.open("/".join([data_dir, filename]), "wb") + else: + fo = open(os.path.join(data_dir, filename), "wb") + + with fo as code: code.write(page.read()) return @@ -121,20 +132,33 @@ def get_orbit_files(): page_string = str(page.read()) toolkit_path = CONFIG["local_data_dir"] - orbit_files_path = os.path.join(toolkit_path, "orbitfiles") + # Cloud Awareness: operate on fsspec filesystem instead of os + if is_fsspec_uri(toolkit_path): + protocol, path = toolkit_path.split("://") + fs = fsspec.filesystem(protocol) - if not os.path.exists(toolkit_path): - os.mkdir(toolkit_path) + orbit_files_path = "/".join([toolkit_path, "orbitfiles"]) + fs.mkdirs(orbit_files_path) + else: + orbit_files_path = os.path.join(toolkit_path, "orbitfiles") + + if not os.path.exists(toolkit_path): + os.mkdir(toolkit_path) - if not os.path.exists(orbit_files_path): - os.mkdir(orbit_files_path) + if not os.path.exists(orbit_files_path): + os.mkdir(orbit_files_path) for matching_pattern in re.findall(pattern, page_string): filename = "maven_orb_rec" + matching_pattern logging.debug("get_orbit_files() making request to URL %s", orbit_files_url + filename) o_file = urllib.request.urlopen(orbit_files_url + filename) logging.debug("get_orbit_files() finished request to URL %s", orbit_files_url + filename) - with open(os.path.join(orbit_files_path, filename), "wb") as code: + + if is_fsspec_uri(toolkit_path): + fo = fs.open("/".join([orbit_files_path, filename]), "wb") + else: + fo = open(os.path.join(orbit_files_path, filename), "wb") + with fo as code: code.write(o_file.read()) merge_orbit_files() @@ -156,14 +180,29 @@ def merge_orbit_files(): import re toolkit_path = CONFIG["local_data_dir"] - orbit_files_path = os.path.join(toolkit_path, "orbitfiles") + + # Cloud Awareness: traverse fsspec-like filesystem instead of os + if is_fsspec_uri(toolkit_path): + protocol, path = toolkit_path.split("://") + fs = fsspec.filesystem(protocol) + + orbit_files_path = "/".join([toolkit_path, "orbitfiles"]) + fl = fs.listdir(orbit_files_path, detail=False) + fl = [f.rstrip("/").split("/")[-1] for f in fl] + fo = fs.open("/".join([orbit_files_path, "maven_orb_rec.orb"]), "w") + else: + orbit_files_path = os.path.join(toolkit_path, "orbitfiles") + + fl = os.listdir(orbit_files_path) + fo = open(os.path.join(orbit_files_path, "maven_orb_rec.orb"), "w") + pattern = "maven_orb_rec(_|)(|.{6})(|_.{9}).orb" orb_dates = [] orb_files = [] - for f in os.listdir(orbit_files_path): + for f in fl: x = re.match(pattern, f) if x is not None: - orb_files.append(os.path.join(orbit_files_path, f)) + orb_file = os.path.join(orbit_files_path, f) if not is_fsspec_uri(toolkit_path) else "/".join([orbit_files_path, f]) if x.group(2) != "": orb_dates.append(x.group(2)) else: @@ -171,10 +210,15 @@ def merge_orbit_files(): sorted_files = [x for (y, x) in sorted(zip(orb_dates, orb_files))] - with open(os.path.join(orbit_files_path, "maven_orb_rec.orb"), "w") as code: + with fo as code: skip_2_lines = False for o_file in sorted_files: - with open(o_file) as f: + if is_fsspec_uri(toolkit_path): + # assumes fsspec filesystem triggered above + fo_file = fs.open(o_file) + else: + fo_file = open(o_file) + with fo_file as f: if skip_2_lines: f.readline() f.readline() @@ -263,8 +307,26 @@ def set_new_data_root_dir(): # Get new preferred data download location for pyspedas project valid_path = input("Enter directory preference: ") - while not os.path.exists(valid_path): - valid_path = input("Specified path does not exist. Enter new path: ") + + # Cloud Awareness: had to rework for ensuring URI exists + exists = False + while not exists: + if is_fsspec_uri(valid_path): + if not "://" in valid_path: + valid_path = input("Specified path does not exist. Enter new path: ") + else: + protocol, path = valid_path.split("://") + fs = fsspec.filesystem(protocol) + + if fs.exists(valid_path): + exists = True + else: + valid_path = input("Specified path does not exist. Enter new path: ") + else: + if os.path.exists(valid_path): + exists = True + else: + valid_path = input("Specified path does not exist. Enter new path: ") download_path = valid_path logging.warning("Location of the mvn_toolkit_prefs file set to " + download_path) @@ -291,7 +353,17 @@ def get_new_files(files_on_site, data_dir, instrument, level): fos = files_on_site files_on_hd = [] - for dir, _, files in os.walk(data_dir): + + # Cloud Awareness: traverse fsspec filesystem + if is_fsspec_uri(data_dir): + protocol, path = data_dir.split("://") + fs = fsspec.filesystem(protocol) + + walk = fs.walk(data_dir) + else: + walk = os.walk(data_dir) + + for dir, _, files in walk: for f in files: if re.match("mvn_" + instrument + "_" + level + "_*", f): files_on_hd.append(f) @@ -322,6 +394,15 @@ def create_dir_if_needed(f, data_dir, level): else: year, month, _ = get_year_month_day_from_sci_file(f) + # Cloud Awareness: operate upon fsspec-like filesystem + if is_fsspec_uri(data_dir): + protocol, path = data_dir.split("://") + fs = fsspec.filesystem(protocol) + + full_path = "/".join([data_dir, year, month]) + fs.mkdirs(full_path) + return full_path + if not os.path.exists(os.path.join(data_dir, year, month)): os.makedirs(os.path.join(data_dir, year, month)) diff --git a/pyspedas/projects/maven/kp_utilities.py b/pyspedas/projects/maven/kp_utilities.py index 13993587..a7c8b542 100644 --- a/pyspedas/projects/maven/kp_utilities.py +++ b/pyspedas/projects/maven/kp_utilities.py @@ -141,11 +141,11 @@ def range_select(kp, time=None, parameter=None, maximum=None, minimum=None): return kp if minimum is None: - minimum = np.repeat(-np.Infinity, len(parameter)) + minimum = np.repeat(-np.inf, len(parameter)) elif not isinstance(minimum, list): minimum = [minimum] if maximum is None: - maximum = np.repeat(np.Infinity, len(parameter)) + maximum = np.repeat(np.inf, len(parameter)) elif not isinstance(maximum, list): maximum = [maximum] @@ -257,9 +257,9 @@ def range_select(kp, time=None, parameter=None, maximum=None, minimum=None): logging.warning("Applying only Time filtering") parameter = None elif minimum is None: - minimum = [-np.Infinity] # Unbounded below + minimum = [-np.inf] # Unbounded below elif maximum is None: - maximum = [np.Infinity] # Unbounded above + maximum = [np.inf] # Unbounded above else: pass # Range fully bounded if not isinstance(minimum, list): @@ -291,11 +291,11 @@ def range_select(kp, time=None, parameter=None, maximum=None, minimum=None): lmin = 0 lmax = 0 elif minimum is None: - minimum = np.repeat(-np.Infinity,len(parameter)) + minimum = np.repeat(-np.inf,len(parameter)) lmin = len(parameter) lmax = len(maximum) elif maximum is None: - maximum = np.repeat(np.Infinity, len(parameter)) + maximum = np.repeat(np.inf, len(parameter)) lmin = len(minimum) lmax = len(parameter) else: diff --git a/pyspedas/projects/maven/maven_kp_to_tplot.py b/pyspedas/projects/maven/maven_kp_to_tplot.py index 09578bb5..62ce1616 100644 --- a/pyspedas/projects/maven/maven_kp_to_tplot.py +++ b/pyspedas/projects/maven/maven_kp_to_tplot.py @@ -16,6 +16,8 @@ import builtins import os +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def maven_kp_to_tplot( filename=None, @@ -172,8 +174,15 @@ def maven_kp_to_tplot( c_found = False r_found = False for f in filenames: + if is_fsspec_uri(f): + protocol, path = f.split("://") + fs = fsspec.filesystem(protocol) + + basename = f.rstrip("/").split("/")[-1] + else: + basename = os.path.basename(f) if ( - kp_regex.match(os.path.basename(f)).group("description") == "_crustal" + kp_regex.match(basename).group("description") == "_crustal" and not c_found ): name, inss = get_header_info(f) @@ -183,7 +192,7 @@ def maven_kp_to_tplot( crus_inst.extend(inss[1:]) c_found = True elif ( - kp_regex.match(os.path.basename(f)).group("description") == "" + kp_regex.match(basename).group("description") == "" and not r_found ): name, ins = get_header_info(f) @@ -268,12 +277,25 @@ def maven_kp_to_tplot( for filename in filenames: # Determine number of header lines nheader = 0 - with open(filename) as f: + if is_fsspec_uri(filename): + protocol, path = filename.split("://") + fs = fsspec.filesystem(protocol) + fo = fs.open(filename, "rt") + else: + fo = open(filename) + with fo as f: for line in f: if line.startswith("#"): nheader += 1 + if is_fsspec_uri(filename): + protocol, path = filename.split("://") + fs = fsspec.filesystem(protocol) + + basename = filename.rstrip("/").split("/")[-1] + else: + basename = os.path.basename(filename) if ( - kp_regex.match(os.path.basename(filename)).group("description") + kp_regex.match(basename).group("description") == "_crustal" ): temp_data.append( diff --git a/pyspedas/projects/maven/maven_load.py b/pyspedas/projects/maven/maven_load.py index 985c1077..22f8d64c 100644 --- a/pyspedas/projects/maven/maven_load.py +++ b/pyspedas/projects/maven/maven_load.py @@ -18,6 +18,8 @@ from .orbit_time import orbit_time from .maven_kp_to_tplot import maven_kp_to_tplot +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def maven_filenames( filenames=None, @@ -131,9 +133,10 @@ def maven_filenames( if level == "iuvs": query_args.append("file_extension=tab") - data_dir = os.path.join( + sep = "/" if is_fsspec_uri(mvn_root_data_dir) else os.path.sep + data_dir = sep.join([ mvn_root_data_dir, "maven", "data", "sci", instrument, level - ) + ]) query = "&".join(query_args) @@ -161,9 +164,10 @@ def maven_filenames( query_args.append("level=insitu") query_args.append("start_date=" + start_date) query_args.append("end_date=" + end_date) - data_dir = os.path.join( + sep = "/" if is_fsspec_uri(mvn_root_data_dir) else os.path.sep + data_dir = sep.join([ mvn_root_data_dir, "maven", "data", "sci", "kp", "insitu" - ) + ]) query = "&".join(query_args) s = get_filenames(query, public) if not s: @@ -203,7 +207,14 @@ def maven_file_groups(files): kp_regex, l2_regex = maven_kp_l2_regex() for f in files: - desc = l2_regex.match(os.path.basename(f)).group("description") + if is_fsspec_uri(f): + protocol, path = f.split("://") + fs = fsspec.filesystem(protocol) + + basename = f.rstrip("/").split("/")[-1] + else: + basename = os.path.basename(f) + desc = l2_regex.match(basename).group("description") if desc not in result: result[desc] = [] result[desc].append(f) @@ -397,7 +408,8 @@ def load_data( else: full_path = create_dir_if_needed(f, data_dir, level) bn_files_to_load.append(f) - files_to_load.append(os.path.join(full_path, f)) + sep = "/" if is_fsspec_uri(full_path) else os.path.sep + files_to_load.append(sep.join([full_path, f])) except Exception as e: # todo: better handling of rse .tab files # rse files are .tab files (TAB delimited text files) that currently cannot be loaded into tplot @@ -495,7 +507,14 @@ def load_data( get_metadata=get_metadata, ) # Specifically for SWIA and SWEA data, make sure the plots have log axes and are spectrograms - instr = l2_regex.match(os.path.basename(cdf_dict[desc][0])).group( + if is_fsspec_uri(cdf_dict[desc][0]): + protocol, path = cdf_dict[desc][0].split("://") + fs = fsspec.filesystem(protocol) + + basename = cdf_dict[desc][0].rstrip("/").split("/")[-1] + else: + basename = os.path.basename(cdf_dict[desc][0]) + instr = l2_regex.match(basename).group( "instrument" ) if instr in ["swi", "swe"]: @@ -512,11 +531,16 @@ def load_data( else: # The description (part of the filename) is appended to the variable name suf = desc + suffix - created_vars = pytplot.sts_to_tplot( - sts_dict[desc], - prefix=prefix, - suffix=suf, - ) + try: + created_vars = pytplot.sts_to_tplot( + sts_dict[desc], + prefix=prefix, + suffix=suf, + ) + except FileNotFoundError: + logging.error("PyTplot Error: STS importer is not URI capable.") + logging.error("\tSkipping file as PyTplot cannot use this type of filesystem.") + continue loaded_tplot_vars.append(created_vars) # Remove the Decimal Day column, not really useful diff --git a/pyspedas/projects/maven/orbit_time.py b/pyspedas/projects/maven/orbit_time.py index e9eafd57..79518969 100644 --- a/pyspedas/projects/maven/orbit_time.py +++ b/pyspedas/projects/maven/orbit_time.py @@ -2,6 +2,8 @@ import os from .config import CONFIG +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def month_to_num(month_string): """ @@ -70,10 +72,18 @@ def orbit_time(begin_orbit, end_orbit=None): """ toolkit_path = CONFIG["local_data_dir"] - orbit_files_path = os.path.join(toolkit_path, "orbitfiles") - orb_file = os.path.join(orbit_files_path, "maven_orb_rec.orb") + sep = "/" if is_fsspec_uri(toolkit_path) else os.path.sep + orbit_files_path = sep.join([toolkit_path, "orbitfiles"]) + orb_file = sep.join([orbit_files_path, "maven_orb_rec.orb"]) + if is_fsspec_uri(toolkit_path): + protocol, path = toolkit_path.split("://") + fs = fsspec.filesystem(protocol) - with open(orb_file, "r") as f: + fileobj = fs.open(orb_file, "r") + else: + fileobj = open(orb_file, "r") + + with fileobj as f: if end_orbit is None: end_orbit = begin_orbit orbit_num = [] diff --git a/pyspedas/projects/maven/read_iuvs_file.py b/pyspedas/projects/maven/read_iuvs_file.py index 7a0dbb41..e2abae57 100644 --- a/pyspedas/projects/maven/read_iuvs_file.py +++ b/pyspedas/projects/maven/read_iuvs_file.py @@ -6,6 +6,9 @@ import numpy as np import collections +from pyspedas.utilities.download import is_fsspec_uri +import fsspec + def read_iuvs_file(file): """ Read an IUVS file and return a dictionary containing the data. @@ -19,7 +22,14 @@ def read_iuvs_file(file): iuvs_dict = {} periapse_num = 0 occ_num = 0 - with open(file) as f: + if is_fsspec_uri(file): + protocol, path = file.split("://") + fs = fsspec.filesystem(protocol) + + fileobj = fs.open(file, "r") + else: + fileobj = open(file, "r") + with fileobj as f: line = f.readline() while line != "": if line.startswith("*"): diff --git a/pyspedas/projects/maven/tests/uri_tests.py b/pyspedas/projects/maven/tests/uri_tests.py new file mode 100644 index 00000000..096bb345 --- /dev/null +++ b/pyspedas/projects/maven/tests/uri_tests.py @@ -0,0 +1,524 @@ +import os, sys +import time +import requests +import unittest +import datetime as dt +import subprocess + +from pytplot import data_exists, tplot_names +from pyspedas import maven +from pyspedas.projects.maven import config, \ + utilities, \ + maven_kp_to_tplot, \ + download_files_utilities +import fsspec +#========================================================== + +# moto server mock details +localhost = "http://localhost:3000" +bucket_name = "test-bucket" + +# Set up mock AWS environment variables (fake credentials) +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" +os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + +# Set environment to use the local Moto S3 server +# S3 ENDPOINT for fsspec +# ENDPOINT URL for cdflib/boto3 +os.environ["AWS_S3_ENDPOINT"] = localhost +os.environ["AWS_ENDPOINT_URL"] = localhost + +# We need sleep time to avoid "HTTP Error 429: Too Many Requests" +sleep_time = 30 + +class LoadTestCases(unittest.TestCase): + """ + Cloud Awareness Unit Tests + + Depends upon moto[server] package. Install via: + pip install moto[server] + + These tests essentially create a local mock-AWS server as a background + process at port 3000. + Note: The environment variables are used as mock credentials in order + to avoid having to pass the endpoint url to fsspec calls. + """ + + @classmethod + def setUpClass(cls): + # Start the moto server for S3 in the background + # https://github.com/getmoto/moto/issues/4418 + cls.moto_server = subprocess.Popen( + ["moto_server", "-p3000"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Allow the server to start properly + time.sleep(2) + + # Create a bucket using direct HTTP requests + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + @classmethod + def tearDownClass(cls): + # Terminate the moto server after tests + cls.moto_server.terminate() + cls.moto_server.communicate() + + def get_kp_dict(self): + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp() + fn = [ + '/'.join([ + config.CONFIG["local_data_dir"], + "maven/data/sci/kp/insitu/2016/01/mvn_kp_insitu_20160101_v20_r01.tab", + ]), + '/'.join([ + config.CONFIG["local_data_dir"], + "maven/data/sci/kp/insitu/2016/01/mvn_kp_insitu_20160102_v20_r01.tab", + ]), + ] + + return maven_kp_to_tplot.maven_kp_to_tplot(filename=fn, notplot=True) + + def clean_data(self): + # reset moto server to original state + response = requests.post("http://localhost:3000/moto-api/reset") + assert response.status_code == 200, "Moto Server reset failed" + + # create bucket again + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + #========================================================================== + # Cloud Awareness Note: Cluster implementation does not stream data due to + # lack of functionality already present in PySPEDAS. + + # Adapted unit tests for AWS-specific URI testing. + def test_get_merge_orbit_files(self): + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + download_files_utilities.get_orbit_files() + download_files_utilities.merge_orbit_files() + orbfilepath = "/".join([ + config.CONFIG["local_data_dir"], "orbitfiles", "maven_orb_rec.orb" + ]) + + # assert file exists + fs = fsspec.filesystem("s3") + self.assertTrue(fs.exists(orbfilepath)) + + def test_load_kp_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp() + self.assertTrue(data_exists("mvn_kp::spacecraft::geo_x")) + time.sleep(sleep_time) + + def test_load_kp_spdf_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp(spdf=True) + self.assertTrue(data_exists("LPW_Electron_density")) + time.sleep(sleep_time) + + def test_load_kp_iuvs_occ_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp(trange=["2016-01-18","2016-01-19"],iuvs=True) + self.assertTrue(data_exists("mvn_kp::spacecraft::geo_x")) + dt1 = dt.datetime.strptime("2016-01-18", "%Y-%m-%d") + dt2 = dt.datetime.strptime("2016-01-19", "%Y-%m-%d") + fnames = utilities.get_latest_iuvs_files_from_date_range(dt1,dt2) + self.assertTrue(len(fnames) > 0) + self.assertTrue("mvn_kp_iuvs_occ-02533_20160118T125134_v13_r01.tab" in fnames[0]) + time.sleep(sleep_time) + + def test_load_kp_iuvs_periapse_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp(trange=["2015-03-07","2015-03-08"],iuvs=True) + self.assertTrue(data_exists("mvn_kp::spacecraft::geo_x")) + dt1 = dt.datetime.strptime("2015-03-07", "%Y-%m-%d") + dt2 = dt.datetime.strptime("2015-03-08", "%Y-%m-%d") + # fnames = get_latest_iuvs_files_from_date_range(dt1,dt2) + # print(fnames) + #self.assertTrue(len(fnames) > 0) + #self.assertTrue("mvn_kp_iuvs_00850_20150308T221253_v13_r01.tab" in fnames[0]) + time.sleep(sleep_time) + + def test_load_kp_iuvs_corona_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.kp(trange=["2016-01-14","2016-01-15"],iuvs=True) + self.assertTrue(data_exists("mvn_kp::spacecraft::geo_x")) + dt1 = dt.datetime.strptime("2016-01-07", "%Y-%m-%d") + dt2 = dt.datetime.strptime("2016-01-08", "%Y-%m-%d") + # fnames = get_latest_iuvs_files_from_date_range(dt1,dt2) + # print(fnames) + #self.assertTrue(len(fnames) > 0) + #self.assertTrue("mvn_kp_iuvs_00850_20150308T221253_v13_r01.tab" in fnames[0]) + time.sleep(sleep_time) + + def test_kp_param_errors(self): + from pyspedas.projects.maven.kp_utilities import param_list, param_range, range_select + # bad value in kp dict + kp = {} + kp["foo"] = "bar" + with self.assertLogs(level="WARNING") as log: + param_list = param_list(kp) + self.assertTrue("unexpected value type" in log.output[0]) + kp_insitu = {} + kp_iuvs = {} + kp_insitu["TimeString"] = ["1970-01-01", "1970-01-02"] + kp_insitu["Orbit"] = [0,1] + param_range(kp_insitu) + + kp_iuvs["TimeString"] = ["1971-01-01", "1971-01-02"] + kp_iuvs["Orbit"] = [10, 11] + with self.assertLogs(level="WARNING") as log: + param_range(kp_insitu, kp_iuvs) + self.assertTrue("No overlap" in log.output[0]) + + with self.assertLogs(level="WARNING") as log: + range_select(kp_iuvs) + self.assertTrue("*****ERROR*****" in log.output[0]) + i = len(log.output) + range_select(kp_iuvs,parameter=0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + range_select(kp_iuvs,time=["1970-01-01"]) + i = len(log.output) + range_select(kp_iuvs, time=["1970-01-01", 0]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + + def test_kp_utilities(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + from pyspedas.projects.maven.kp_utilities import ( + param_list, + param_range, + range_select, + get_inst_obs_labels, + ) + from pyspedas.projects.maven.kp_utilities import find_param_from_index + import collections + + kp = self.get_kp_dict() + self.assertTrue(type(kp) is collections.OrderedDict) + + param_list = param_list(kp) + self.assertTrue(len(param_list) > 0) + print(param_list) + + param_range = param_range(kp) + result = range_select(kp, [2440, 2445], [5], [1e9], [-1e9]) + self.assertTrue(len(result) > 0) + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], [5], [1e9], [-1e9]) + self.assertTrue(len(result) > 0) + labels = get_inst_obs_labels(kp, "LPW.EWAVE_LOW_FREQ") + self.assertTrue("LPW" in labels) + self.assertTrue("EWAVE_LOW_FREQ" in labels) + param = find_param_from_index(kp, 5) + self.assertTrue(param == "LPW.ELECTRON_DENSITY") + # no min + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no max + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], minimum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, param as list with int + result = range_select(kp, parameter=[5], minimum=[-1e9], maximum=[1e9]) + # no time, param as list with int + result = range_select(kp, parameter=[5], minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, param as list with string + result = range_select(kp, parameter=["LPW.ELECTRON_DENSITY"], minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, param as list with string and int + result = range_select(kp, parameter=["LPW.ELECTRON_DENSITY",6], minimum=[-1e9, -1e9], maximum=[1e9,1e9]) + self.assertTrue(len(result) > 0) + # no time, parameter as scalar int + result = range_select(kp, parameter=5, minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, parameter as scalar string + result = range_select(kp, parameter="LPW.ELECTRON_DENSITY", minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, parameter, min, max as scalars + result = range_select(kp, parameter="LPW.ELECTRON_DENSITY", minimum=-1e9, maximum=1e9) + self.assertTrue(len(result) > 0) + # no time, scalar param, minimum=None + result = range_select(kp, parameter=5, maximum=[1e9]) + self.assertTrue(len(result) > 0) + # no time, scalar param, maximum=None + result = range_select(kp, parameter=5, maximum=[1e9]) + self.assertTrue(len(result) > 0) + # times, scalar param, scalar max/min + result = range_select(kp, time=[2440,2441], parameter=5, minimum=-1e9, maximum=[1e9]) + self.assertTrue(len(result) > 0) + # times, scalar param, no min + result = range_select(kp, time=[2440, 2441], parameter=5, maximum=[1e9]) + # times, scalar param, no max + result = range_select(kp, time=[2440, 2441], parameter=5, minimum = [1e9]) + + with self.assertLogs(level="WARNING") as log: + # mismatched time types + result = range_select(kp, [2440, "2020/04/01"], [5], [1e9], [-1e9]) + self.assertTrue("*****WARNING*****" in log.output[0]) + i = len(log.output) + # only one time + result = range_select(kp, [2440], [5], [1e9], [-1e9]) + self.assertTrue("*****WARNING*****" in log.output[i]) + i = len(log.output) + # parameter but no max/min + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # len(min) doesn't match param + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], minimum=[-1e9,-1e9], maximum=[1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # len(max) doesn't match param + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], minimum=[-1e9], maximum=[1e9, 1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + self.assertTrue(len(result) > 0) + # len(min) doesn't match param, no max + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], minimum=[-1e9,-1e9]) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # len(max) doesn't match param, no min + result = range_select(kp, ["2016-01-01 00:00:00","2016-01-02 00:00:00"], parameter=[5], maximum=[1e9, 1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, param as list with string, int, and float + result = range_select(kp, parameter=["LPW.ELECTRON_DENSITY", 6, 1.2], minimum=[-1e9, -1e9, -1e9], maximum=[1e9, 1e9, 1e9]) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, param as scalar float + result = range_select(kp, parameter=1.2, minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, no param + result = range_select(kp, minimum=[-1e9], maximum=[1e9]) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, scalar param, no bounds + result = range_select(kp, parameter=5) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, len(min) doesn't match param + result = range_select(kp, parameter=[5], minimum=[-1e9,-1e9], maximum=[1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, len(max) doesn't match param + result = range_select(kp, parameter=[5], minimum=[-1e9], maximum=[1e9, 1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + self.assertTrue(len(result) > 0) + # no time, len(min) doesn't match param, no max + result = range_select(kp, parameter=[5], minimum=[-1e9,-1e9]) + self.assertTrue(len(result) > 0) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # no time, len(max) doesn't match param, no min + result = range_select(kp, parameter=[5], maximum=[1e9, 1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # only one time, no param + result = range_select(kp, [2440]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # Malformed times, no parameter + result = range_select(kp, [{}, {}]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # Malformed times, parameter given + result = range_select(kp, [{}, {}], parameter=[5], minimum=[-1e9], maximum=[1e9]) + self.assertTrue("*****WARNING*****" in log.output[i]) + i = len(log.output) + # times, scalar param, len(min) doesn't match param + result = range_select(kp, time=[2440, 2441], parameter=5, minimum=[-1e9, -1e9], maximum=[1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # times, scalar param, len(max) doesn't match param + result = range_select(kp, time=[2440, 2441], parameter=5, minimum=[-1e9], maximum=[1e9, 1e9]) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # times, scalar param, no bounds + result = range_select(kp, time=[2440, 2441], parameter=5) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # invalid parameter value + a,b = get_inst_obs_labels(kp, "foo") + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # too many components + a,b = get_inst_obs_labels(kp, "foo.bar.baz") + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + # Invalid numeric index + ind = find_param_from_index(kp,999) + self.assertTrue("*****ERROR*****" in log.output[i]) + i = len(log.output) + + @unittest.skip # BROKEN + def test_get_file_from_site_private(self): + f = 'mvn_mag_l2_2016002ss1s_20160102_v01_r01.xml' + public=False + full_path='maven_data/maven/data/sci/mag/l2/2016/01' + # We don't have credentials to the private site yet, so this will fail. + try: + download_files_utilities.get_file_from_site(f, public, full_path) + except Exception as e: + pass + + + def test_load_mag_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + from pyspedas.projects.maven.utilities import get_l2_files_from_date + + data = maven.mag(datatype="ss1s") + self.assertTrue(len(tplot_names("OB_B*"))>0) + dt1 = dt.datetime.strptime("2016-01-01/12:00:00", "%Y-%m-%d/%H:%M:%S") + files = get_l2_files_from_date(dt1, "mag") + self.assertTrue(len(files) > 0) + time.sleep(sleep_time) + + @unittest.skip + # Cloud Awareness: This extends the prior test to check for the STS file + # types in kp data. It should report an error due to + # PyTplot not currently cloud aware. + def test_load_mag_sts_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + with self.assertLogs(level="ERROR") as log: + data = maven.mag() # slow due to the large amount of data + self.assertIn( + "PyTplot Error: STS importer is not URI capable.", + log.output[0] + ) + + def test_load_mag_data_private(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + from pyspedas.projects.maven.utilities import get_l2_files_from_date + + # We don't have credentials to the private site yet, so this is expected to fail + try: + data = maven.mag(datatype="ss1s", public=False) + except Exception as e: + pass + + @unittest.skip # BROKEN + def test_load_mag_byorbit_data(self): + data = maven.mag(trange=[500, 501], datatype="ss1s") + self.assertTrue(len(tplot_names("OB_B*"))>0) + time.sleep(sleep_time) + + def test_load_sta_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + # No datatype means "load everything" + data = maven.sta() + self.assertTrue(data_exists("hkp_raw_2a-hkp")) + self.assertTrue(data_exists("hkp_2a-hkp")) + self.assertTrue(data_exists("data_d0-32e4d16a8m")) + self.assertTrue(data_exists("theta_d1-32e4d16a8m")) + time.sleep(sleep_time) + + def test_load_sta_hkp_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.sta(datatype=["2a"]) + self.assertTrue(data_exists("hkp_raw_2a-hkp")) + self.assertTrue(data_exists("hkp_2a-hkp")) + time.sleep(sleep_time) + + def test_load_swea_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.swea() + self.assertTrue(data_exists("diff_en_fluxes_svyspec")) + time.sleep(sleep_time) + + def test_load_swia_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.swia() + self.assertTrue(data_exists("spectra_diff_en_fluxes_onboardsvyspec")) + time.sleep(sleep_time) + + def test_load_sep_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.sep() + self.assertTrue(data_exists("f_ion_flux_tot_s2-cal-svy-full")) + time.sleep(sleep_time) + + def test_load_lpw_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.lpw() + self.assertTrue(data_exists("mvn_lpw_lp_iv_l2_lpiv")) + time.sleep(sleep_time) + + def test_load_euv_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.euv() + self.assertTrue(data_exists("mvn_euv_calib_bands_bands")) + time.sleep(sleep_time) + + def test_load_rse_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.rse() + self.assertTrue(data_exists("mvn_kp::spacecraft::altitude")) + time.sleep(sleep_time) + + def test_load_iuv_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.iuv() + self.assertTrue(data_exists("mvn_kp::spacecraft::altitude")) + time.sleep(sleep_time) + + def test_load_ngi_data(self): + self.clean_data() + config.CONFIG["local_data_dir"] = f"s3://{bucket_name}" + + data = maven.ngi() + self.assertTrue(data_exists("mvn_kp::spacecraft::altitude")) + time.sleep(sleep_time) + +if __name__ == '__main__': + unittest.main(verbosity=2) \ No newline at end of file diff --git a/pyspedas/projects/maven/utilities.py b/pyspedas/projects/maven/utilities.py index 874d1b14..05ae0883 100644 --- a/pyspedas/projects/maven/utilities.py +++ b/pyspedas/projects/maven/utilities.py @@ -6,7 +6,8 @@ import numpy as np import collections - +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def remove_inst_tag(df): """ @@ -43,9 +44,10 @@ def get_latest_files_from_date_range(date1, date2): from datetime import timedelta mvn_root_data_dir = utils.get_root_data_dir() - maven_data_dir = os.path.join( + sep = "/" if is_fsspec_uri(mvn_root_data_dir) else os.path.sep + maven_data_dir = sep.join([ mvn_root_data_dir, "maven", "data", "sci", "kp", "insitu" - ) + ]) # Each file starts at midnight, so lets cut off the hours and just pay attention to the days date1 = date1.replace(hour=0, minute=0, second=0) @@ -62,13 +64,26 @@ def get_latest_files_from_date_range(date1, date2): year = str(current_date.year) month = str("%02d" % current_date.month) day = str("%02d" % current_date.day) - full_path = os.path.join(maven_data_dir, year, month) - if os.path.exists(full_path): + full_path = sep.join([maven_data_dir, year, month]) + listdir = [] + if is_fsspec_uri(full_path): + protocol, path = maven_data_dir.split("://") + fs = fsspec.filesystem(protocol) + + exists = fs.exists(full_path) + if exists: + listdir = fs.listdir(full_path, detail=False) + # fsspec alternative to os.path.basename + listdir = [f.rstrip("/").split("/")[-1] for f in listdir] + else: + exists = os.path.exists(full_path) + if exists: listdir = os.listdir(full_path) + if exists: # Grab only the most recent version/revision of regular and crustal insitu files for each # day insitu = {} c_insitu = {} - for f in os.listdir(full_path): + for f in listdir: # logging.warning(f) if kp_regex.match(f).group("day") == day and not kp_regex.match( f @@ -96,7 +111,7 @@ def get_latest_files_from_date_range(date1, date2): most_recent_insitu = [ f for f in insitu.keys() if max_r in f and max_v in f ] - filenames.append(os.path.join(full_path, most_recent_insitu[0])) + filenames.append(sep.join([full_path, most_recent_insitu[0]])) if c_insitu: # Get max version @@ -110,7 +125,7 @@ def get_latest_files_from_date_range(date1, date2): most_recent_c_insitu = [ f for f in c_insitu.keys() if c_max_r in f and c_max_v in f ] - filenames.append(os.path.join(full_path, most_recent_c_insitu[0])) + filenames.append(sep.join([full_path, most_recent_c_insitu[0]])) filenames = sorted(filenames) return filenames @@ -132,9 +147,10 @@ def get_latest_iuvs_files_from_date_range(date1, date2): kp_regex, l2_regex = maven_kp_l2_regex() mvn_root_data_dir = utils.get_root_data_dir() - maven_data_dir = os.path.join( + sep = "/" if is_fsspec_uri(mvn_root_data_dir) else os.path.sep + maven_data_dir = sep.join([ mvn_root_data_dir, "maven", "data", "sci", "kp", "iuvs" - ) + ]) # Each file starts at midnight, so lets cut off the hours and just pay attention to the days date1 = date1.replace(hour=0, minute=0, second=0) @@ -150,11 +166,24 @@ def get_latest_iuvs_files_from_date_range(date1, date2): month = str("%02d" % current_date.month) day = str("%02d" % current_date.day) # TODO: Fix the path after we fix the iuvs regex - full_path = os.path.join(maven_data_dir,"occ-", "02") - if os.path.exists(full_path): + full_path = sep.join([maven_data_dir,"occ-", "02"]) + listdir = [] + if is_fsspec_uri(full_path): + protocol, path = maven_data_dir.split("://") + fs = fsspec.filesystem(protocol) + + exists = fs.exists(full_path) + if exists: + listdir = fs.listdir(full_path, detail=False) + # fsspec alternative to os.path.basename + listdir = [f.rstrip("/").split("/")[-1] for f in listdir] + else: + exists = os.path.exists(full_path) + if exists: listdir = os.listdir(full_path) + if exists: basenames = [] # Obtain a list of all the basenames for the day - for f in os.listdir(full_path): + for f in listdir: if kp_regex.match(f).group("day") == day: description = kp_regex.match(f).group("description") year = kp_regex.match(f).group("year") @@ -169,7 +198,7 @@ def get_latest_iuvs_files_from_date_range(date1, date2): for bn in basenames: version = 0 revision = 0 - for f in os.listdir(full_path): + for f in listdir: description = kp_regex.match(f).group("description") year = kp_regex.match(f).group("year") month = kp_regex.match(f).group("month") @@ -181,7 +210,7 @@ def get_latest_iuvs_files_from_date_range(date1, date2): if int(v) > int(version): version = v - for f in os.listdir(full_path): + for f in listdir: description = kp_regex.match(f).group("description") year = kp_regex.match(f).group("year") month = kp_regex.match(f).group("month") @@ -195,7 +224,7 @@ def get_latest_iuvs_files_from_date_range(date1, date2): revision = r if int(version) > 0: seq = (bn, "v" + str(version), "r" + str(revision) + ".tab") - files_to_return.append(os.path.join(full_path, "_".join(seq))) + files_to_return.append(sep.join([full_path, "_".join(seq)])) files_to_return = sorted(files_to_return) return files_to_return @@ -214,9 +243,8 @@ def get_l2_files_from_date(date1, instrument): kp_regex, l2_regex = maven_kp_l2_regex() mvn_root_data_dir = utils.get_root_data_dir() - maven_data_dir = os.path.join( - mvn_root_data_dir, "maven", "data", "sci", instrument, "l2" - ) + sep = "/" if is_fsspec_uri(mvn_root_data_dir) else os.path.sep + maven_data_dir = sep.join([mvn_root_data_dir, "maven", "data", "sci", instrument, "l2"]) # Each file starts at midnight, so lets cut off the hours and just pay attention to the days date1 = date1.replace(hour=0, minute=0, second=0) @@ -226,11 +254,25 @@ def get_l2_files_from_date(date1, instrument): year = str(date1.year) month = str("%02d" % date1.month) day = str("%02d" % date1.day) - full_path = os.path.join(maven_data_dir, year, month) - if os.path.exists(full_path): - for f in os.listdir(full_path): + full_path = sep.join([maven_data_dir, year, month]) + listdir = [] + if is_fsspec_uri(mvn_root_data_dir): + protocol, path = mvn_root_data_dir.split("://") + fs = fsspec.filesystem(protocol) + + exists = fs.exists(full_path) + if exists: + listdir = fs.listdir(full_path, detail=False) + # fsspec alternative to os.path.basename + listdir = [f.rstrip("/").split("/")[-1] for f in listdir] + else: + exists = os.path.exists(full_path) + if exists: os.listdir(full_path) + + if exists: + for f in listdir: if l2_regex.match(f).group("day") == day: - filenames.append(os.path.join(full_path, f)) + filenames.append(sep.join([full_path, f])) filenames = sorted(filenames) return filenames @@ -248,7 +290,14 @@ def get_header_info(filename): """ # Determine number of header lines nheader = 0 - with open(filename) as f: + if is_fsspec_uri(filename): + protocol, path = filename.split("://") + fs = fsspec.filesystem(protocol) + fo = fs.open(filename, "rt") + else: + fo = open(filename) + + with fo as f: for line in f: if line.startswith("#"): nheader += 1 @@ -257,7 +306,13 @@ def get_header_info(filename): read_param_list = False start_temp = False index_list = [] - with open(filename) as fin: + if is_fsspec_uri(filename): + protocol, path = filename.split("://") + fs = fsspec.filesystem(protocol) + fo = fs.open(filename, "rt") + else: + fo = open(filename) + with fo as fin: icol = -2 # Counting header lines detailing column names iname = 1 # for counting seven lines with name info ncol = -1 # Dummy value to allow reading of early headerlines? diff --git a/pyspedas/projects/mms/mec_ascii/mms_get_local_ancillary_files.py b/pyspedas/projects/mms/mec_ascii/mms_get_local_ancillary_files.py index ea134d73..b44e7633 100644 --- a/pyspedas/projects/mms/mec_ascii/mms_get_local_ancillary_files.py +++ b/pyspedas/projects/mms/mec_ascii/mms_get_local_ancillary_files.py @@ -5,6 +5,8 @@ import pandas as pd from pyspedas.projects.mms.mms_config import CONFIG +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_get_local_ancillary_files(filetype='tetrahedron_qf', trange=None): """ @@ -39,16 +41,24 @@ def mms_get_local_ancillary_files(filetype='tetrahedron_qf', trange=None): # and FILETYPE is either DEFATT, PREDATT, DEFEPH, PREDEPH in uppercase # and start/endDate is YYYYDOY # and version is Vnn (.V00, .V01, etc..) - dir_pattern = os.sep.join([CONFIG['local_data_dir'], 'ancillary', 'mms', f'{filetype}']) + sep = "/" if is_fsspec_uri(CONFIG["local_data_dir"]) else os.path.sep + dir_pattern = sep.join([CONFIG['local_data_dir'], 'ancillary', 'mms', f'{filetype}']) file_pattern = f'MMS_DEFQ_???????_???????.V??' files_in_trange = [] out_files = [] - files = glob.glob(os.sep.join([dir_pattern, file_pattern])) + if is_fsspec_uri(dir_pattern): + protocol, path = dir_pattern.split("://") + fs = fsspec.filesystem(protocol) + + files = fs.glob(sep.join([dir_pattern, file_pattern])) + files = [sep.join([protocol+"://", file]) for file in files] + else: + files = glob.glob(sep.join([dir_pattern, file_pattern])) for file in files: - filename = os.path.basename(file) + filename = os.path.basename(file) if not is_fsspec_uri(dir_pattern) else file.split("/")[-1] try: date_parts = filename.split('_') start_time_str = date_parts[2] diff --git a/pyspedas/projects/mms/mec_ascii/mms_get_local_state_files.py b/pyspedas/projects/mms/mec_ascii/mms_get_local_state_files.py index 05d12d88..79363a23 100644 --- a/pyspedas/projects/mms/mec_ascii/mms_get_local_state_files.py +++ b/pyspedas/projects/mms/mec_ascii/mms_get_local_state_files.py @@ -5,6 +5,8 @@ import pandas as pd from pyspedas.projects.mms.mms_config import CONFIG +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_get_local_state_files(probe='1', level='def', filetype='eph', trange=None): """ @@ -44,16 +46,24 @@ def mms_get_local_state_files(probe='1', level='def', filetype='eph', trange=Non # and FILETYPE is either DEFATT, PREDATT, DEFEPH, PREDEPH in uppercase # and start/endDate is YYYYDOY # and version is Vnn (.V00, .V01, etc..) - dir_pattern = os.sep.join([CONFIG['local_data_dir'], 'ancillary', f'mms{probe}', f'{level}{filetype}']) + sep = "/" if is_fsspec_uri(CONFIG["local_data_dir"]) else os.path.sep + dir_pattern = sep.join([CONFIG['local_data_dir'], 'ancillary', f'mms{probe}', f'{level}{filetype}']) file_pattern = f'MMS{probe}_{level.upper()}{filetype.upper()}_???????_???????.V??' files_in_trange = [] out_files = [] - files = glob.glob(os.sep.join([dir_pattern, file_pattern])) + if is_fsspec_uri(dir_pattern): + protocol, path = dir_pattern.split("://") + fs = fsspec.filesystem(protocol) + + files = fs.glob(sep.join([dir_pattern, file_pattern])) + files = [sep.join([protocol+"://", file]) for file in files] + else: + files = glob.glob(sep.join([dir_pattern, file_pattern])) for file in files: - filename = os.path.basename(file) + filename = os.path.basename(file) if not is_fsspec_uri(dir_pattern) else file.split("/")[-1] try: date_parts = filename.split('_') start_time_str = date_parts[2] diff --git a/pyspedas/projects/mms/mec_ascii/mms_get_state_data.py b/pyspedas/projects/mms/mec_ascii/mms_get_state_data.py index d58e7fd6..affe0c0d 100644 --- a/pyspedas/projects/mms/mec_ascii/mms_get_state_data.py +++ b/pyspedas/projects/mms/mec_ascii/mms_get_state_data.py @@ -10,6 +10,8 @@ from pyspedas.projects.mms.mec_ascii.mms_load_eph_tplot import mms_load_eph_tplot from pyspedas.projects.mms.mec_ascii.mms_load_att_tplot import mms_load_att_tplot +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_get_state_data(probe='1', trange=['2015-10-16', '2015-10-17'], datatypes=['pos', 'vel'], level='def', no_download=False, pred_or_def=True, @@ -55,6 +57,8 @@ def mms_get_state_data(probe='1', trange=['2015-10-16', '2015-10-17'], return_vars = [] + sep = "/" if is_fsspec_uri(local_data_dir) else os.path.sep + for probe_id in probe: # probe will need to be a string from now on probe_id = str(probe_id) @@ -65,7 +69,7 @@ def mms_get_state_data(probe='1', trange=['2015-10-16', '2015-10-17'], files_in_interval = [] out_files = [] - out_dir = os.sep.join([local_data_dir, 'ancillary', 'mms'+probe_id, level+filetype]) + out_dir = sep.join([local_data_dir, 'ancillary', 'mms'+probe_id, level+filetype]) if CONFIG['no_download'] != True and no_download != True: # predicted doesn't support start_date/end_date @@ -105,12 +109,26 @@ def mms_get_state_data(probe='1', trange=['2015-10-16', '2015-10-17'], files_in_interval = http_json['files'] for file in files_in_interval: - out_file = os.sep.join([out_dir, file['file_name']]) + out_file = sep.join([out_dir, file['file_name']]) - if os.path.exists(out_file) and str(os.stat(out_file).st_size) == str(file['file_size']): - out_files.append(out_file) - http_request.close() - continue + if is_fsspec_uri(out_file): + protocol, path = out_file.split("://") + fs = fsspec.filesystem(protocol) + + exists = fs.exists(out_file) + else: + exists = os.path.exists(out_file) + + if exists: + if is_fsspec_uri(out_file): + f_size = fs.size(out_file) + else: + f_size = os.stat(out_file).st_size + + if str(f_size) == str(file['file_size']): + out_files.append(out_file) + http_request.close() + continue if user is None: download_url = 'https://lasp.colorado.edu/mms/sdc/public/files/api/v1/download/ancillary?file=' + file['file_name'] @@ -128,11 +146,21 @@ def mms_get_state_data(probe='1', trange=['2015-10-16', '2015-10-17'], with open(ftmp.name, 'wb') as f: copyfileobj(fsrc.raw, f) - if not os.path.exists(out_dir): - os.makedirs(out_dir) + if is_fsspec_uri(out_dir): + protocol, path = out_dir.split("://") + fs = fsspec.filesystem(protocol) + + fs.makedirs(out_dir, exist_ok=True) + + # if the download was successful, put at URI specified + fs.put(ftmp.name, out_file) + else: + if not os.path.exists(out_dir): + os.makedirs(out_dir) + + # if the download was successful, copy to data directory + copy(ftmp.name, out_file) - # if the download was successful, copy to data directory - copy(ftmp.name, out_file) out_files.append(out_file) fsrc.close() ftmp.close() diff --git a/pyspedas/projects/mms/mec_ascii/mms_get_tetrahedron_qf.py b/pyspedas/projects/mms/mec_ascii/mms_get_tetrahedron_qf.py index b20fb750..dc3bcdb0 100644 --- a/pyspedas/projects/mms/mec_ascii/mms_get_tetrahedron_qf.py +++ b/pyspedas/projects/mms/mec_ascii/mms_get_tetrahedron_qf.py @@ -9,6 +9,8 @@ from pyspedas.projects.mms.mec_ascii.mms_get_local_ancillary_files import mms_get_local_ancillary_files from pyspedas.projects.mms.mec_ascii.mms_load_qf_tplot import mms_load_qf_tplot +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_get_tetrahedron_qf(trange=['2015-10-16', '2015-10-17'], no_download=False, suffix='', always_prompt=False): @@ -44,7 +46,8 @@ def mms_get_tetrahedron_qf(trange=['2015-10-16', '2015-10-17'], no_download=Fals out_files = [] - out_dir = os.sep.join([local_data_dir, 'ancillary', 'mms', 'tetrahedron_qf']) + sep = "/" if is_fsspec_uri(local_data_dir) else os.path.sep + out_dir = sep.join([local_data_dir, 'ancillary', 'mms', 'tetrahedron_qf']) if CONFIG['no_download'] != True and no_download != True: dates_for_query = '&start_date='+start_time_str+'&end_date='+end_time_str @@ -68,12 +71,26 @@ def mms_get_tetrahedron_qf(trange=['2015-10-16', '2015-10-17'], no_download=Fals files_in_interval = http_json['files'] for file in files_in_interval: - out_file = os.sep.join([out_dir, file['file_name']]) + out_file = sep.join([out_dir, file['file_name']]) - if os.path.exists(out_file) and str(os.stat(out_file).st_size) == str(file['file_size']): - out_files.append(out_file) - http_request.close() - continue + if is_fsspec_uri(out_file): + protocol, path = out_file.split("://") + fs = fsspec.filesystem(protocol) + + exists = fs.exists(out_file) + else: + exists = os.path.exists(out_file) + + if exists: + if is_fsspec_uri(out_file): + f_size = fs.size(out_file) + else: + f_size = os.stat(out_file).st_size + + if str(f_size) == str(file['file_size']): + out_files.append(out_file) + http_request.close() + continue if user is None: download_url = 'https://lasp.colorado.edu/mms/sdc/public/files/api/v1/download/ancillary?file=' + file['file_name'] @@ -91,11 +108,21 @@ def mms_get_tetrahedron_qf(trange=['2015-10-16', '2015-10-17'], no_download=Fals with open(ftmp.name, 'wb') as f: copyfileobj(fsrc.raw, f) - if not os.path.exists(out_dir): - os.makedirs(out_dir) + if is_fsspec_uri(out_dir): + protocol, _ = out_dir.split("://") + fs = fsspec.filesystem(protocol) + + fs.makedirs(out_dir, exist_ok=True) + + # if the download was successful, put at URI specified + fs.put(ftmp.name, out_file) + else: + if not os.path.exists(out_dir): + os.makedirs(out_dir) + + # if the download was successful, copy to data directory + copy(ftmp.name, out_file) - # if the download was successful, copy to data directory - copy(ftmp.name, out_file) out_files.append(out_file) fsrc.close() ftmp.close() diff --git a/pyspedas/projects/mms/mms_get_local_files.py b/pyspedas/projects/mms/mms_get_local_files.py index cae9dab0..de6b13b1 100644 --- a/pyspedas/projects/mms/mms_get_local_files.py +++ b/pyspedas/projects/mms/mms_get_local_files.py @@ -8,6 +8,8 @@ from dateutil.parser import parse from datetime import timedelta +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_get_local_files(probe, instrument, data_rate, level, datatype, trange, mirror=False): """ @@ -51,6 +53,8 @@ def mms_get_local_files(probe, instrument, data_rate, level, datatype, trange, m else: data_dir = CONFIG['local_data_dir'] + sep = "/" if is_fsspec_uri(data_dir) else os.path.sep + # directory and file name search patterns # -assume directories are of the form: # (srvy, SITL): spacecraft/instrument/rate/level[/datatype]/year/month/ @@ -65,27 +69,39 @@ def mms_get_local_files(probe, instrument, data_rate, level, datatype, trange, m if datatype == '' or datatype is None: level_and_dtype = level else: - level_and_dtype = os.sep.join([level, datatype]) + level_and_dtype = sep.join([level, datatype]) for date in days: if data_rate == 'brst': - local_dir = os.sep.join([data_dir, 'mms'+probe, instrument, data_rate, level_and_dtype, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]) + local_dir = sep.join([data_dir, 'mms'+probe, instrument, data_rate, level_and_dtype, date.strftime('%Y'), date.strftime('%m'), date.strftime('%d')]) else: - local_dir = os.sep.join([data_dir, 'mms'+probe, instrument, data_rate, level_and_dtype, date.strftime('%Y'), date.strftime('%m')]) + local_dir = sep.join([data_dir, 'mms'+probe, instrument, data_rate, level_and_dtype, date.strftime('%Y'), date.strftime('%m')]) if os.name == 'nt': - full_path = os.sep.join([re.escape(local_dir)+os.sep, file_name]) + full_path = sep.join([re.escape(local_dir)+os.sep, file_name]) else: - full_path = os.sep.join([re.escape(local_dir), file_name]) + full_path = sep.join([re.escape(local_dir), file_name]) # check for extra /'s in the path if '//' in full_path: full_path = full_path.replace('//', '/') + if is_fsspec_uri(data_dir): + # Cloud Awareness: the replacement above removes the expected :// URI pattern + full_path = full_path.replace(":/", "://") + + protocol, path = data_dir.split("://") + fs = fsspec.filesystem(protocol) + + walk = fs.walk(data_dir) + else: + walk = os.walk(data_dir) regex = re.compile(full_path) - for root, dirs, files in os.walk(data_dir): + for root, dirs, files in walk: for file in files: - this_file = os.sep.join([root, file]) + this_file = sep.join([root, file]) + if is_fsspec_uri(data_dir): + this_file = protocol + "://" + this_file if CONFIG['debug_mode']: logging.info('Checking ' + this_file) if CONFIG['debug_mode']: logging.info('against: ' + full_path) @@ -115,7 +131,13 @@ def mms_get_local_files(probe, instrument, data_rate, level, datatype, trange, m local_file = file.replace(mirror_dir, local_dir) if CONFIG['debug_mode']: logging.info('Copying ' + file + ' to ' + local_file) - shutil.copyfile(file, local_file) + if is_fsspec_uri(local_dir): + protocol, path = local_dir.split("://") + fs.fsspec.filesystem(protocol) + + fs.put_file(file, local_file) + else: + shutil.copyfile(file, local_file) local_files_copied.append(local_file) local_files = local_files_copied diff --git a/pyspedas/projects/mms/mms_load_data.py b/pyspedas/projects/mms/mms_load_data.py index 822747a7..38fffc71 100644 --- a/pyspedas/projects/mms/mms_load_data.py +++ b/pyspedas/projects/mms/mms_load_data.py @@ -18,6 +18,8 @@ from .mms_file_filter import mms_file_filter from .mms_load_data_spdf import mms_load_data_spdf +from pyspedas.utilities.download import is_fsspec_uri +import fsspec def mms_load_data(trange=['2015-10-16', '2015-10-17'], probe='1', data_rate='srvy', level='l2', instrument='fgm', datatype='', varformat=None, exclude_format=None, prefix='', suffix='', get_support_data=False, time_clip=False, @@ -127,23 +129,34 @@ def mms_load_data(trange=['2015-10-16', '2015-10-17'], probe='1', data_rate='srv for file in files_in_interval: file_date = parse(file['timetag']) + sep = "/" if is_fsspec_uri(CONFIG["local_data_dir"]) else os.path.sep if dtype == '': - out_dir = os.sep.join([CONFIG['local_data_dir'], 'mms'+prb, instrument, drate, lvl, file_date.strftime('%Y'), file_date.strftime('%m')]) + out_dir = sep.join([CONFIG['local_data_dir'], 'mms'+prb, instrument, drate, lvl, file_date.strftime('%Y'), file_date.strftime('%m')]) else: - out_dir = os.sep.join([CONFIG['local_data_dir'], 'mms'+prb, instrument, drate, lvl, dtype, file_date.strftime('%Y'), file_date.strftime('%m')]) + out_dir = sep.join([CONFIG['local_data_dir'], 'mms'+prb, instrument, drate, lvl, dtype, file_date.strftime('%Y'), file_date.strftime('%m')]) if drate.lower() == 'brst': - out_dir = os.sep.join([out_dir, file_date.strftime('%d')]) + out_dir = sep.join([out_dir, file_date.strftime('%d')]) - out_file = os.sep.join([out_dir, file['file_name']]) + out_file = sep.join([out_dir, file['file_name']]) if CONFIG['debug_mode']: logging.info('File: ' + file['file_name'] + ' / ' + file['timetag']) - if os.path.exists(out_file) and str(os.stat(out_file).st_size) == str(file['file_size']): - if not download_only: logging.info('Loading ' + out_file) - out_files.append(out_file) - file_found = True - continue + if is_fsspec_uri(CONFIG["local_data_dir"]): + protocol, path = out_file.split("://") + fs = fsspec.filesystem(protocol) + + if fs.exists(out_file) and str(fs.size(out_file)) == str(file["file_size"]): + if not download_only: logging.info('Streaming ' + out_file) + out_files.append(out_file) + file_found = True + continue + else: + if os.path.exists(out_file) and str(os.stat(out_file).st_size) == str(file['file_size']): + if not download_only: logging.info('Loading ' + out_file) + out_files.append(out_file) + file_found = True + continue if user is None: download_url = 'https://lasp.colorado.edu/mms/sdc/public/files/api/v1/download/science?file=' + file['file_name'] @@ -160,11 +173,21 @@ def mms_load_data(trange=['2015-10-16', '2015-10-17'], probe='1', data_rate='srv with open(ftmp.name, 'wb') as f: copyfileobj(fsrc.raw, f) - if not os.path.exists(out_dir): - os.makedirs(out_dir) + if is_fsspec_uri(CONFIG["local_data_dir"]): + protocol, path = out_dir.split("://") + fs = fsspec.filesystem(protocol) + + fs.makedirs(out_dir, exist_ok=True) + + # if the download was successful, put at URI specified + fs.put(ftmp.name, out_file) + else: + if not os.path.exists(out_dir): + os.makedirs(out_dir) + + # if the download was successful, copy to data directory + copy(ftmp.name, out_file) - # if the download was successful, copy to data directory - copy(ftmp.name, out_file) out_files.append(out_file) file_found = True fsrc.close() diff --git a/pyspedas/projects/mms/tests/load_uri_tests.py b/pyspedas/projects/mms/tests/load_uri_tests.py new file mode 100644 index 00000000..3d303812 --- /dev/null +++ b/pyspedas/projects/mms/tests/load_uri_tests.py @@ -0,0 +1,517 @@ +import os +import time +import numpy as np +import logging +import requests +import unittest +import subprocess + +from pytplot import data_exists, del_data, tplot, get_data +from pyspedas.projects.mms import mms_config, mms_load_state, \ + mms_load_tetrahedron_qf, mms_load_mec, \ + mms_load_fgm, mms_load_scm, \ + mms_load_hpca, mms_load_edp, \ + mms_load_edi, mms_load_aspoc, \ + mms_load_dsp + # mms_load_fpi, mms_load_feeps +import pyspedas +from pyspedas import tdpwrspc +from pyspedas.projects.mms.hpca.mms_hpca_calc_anodes import mms_hpca_calc_anodes +from pyspedas.projects.mms.hpca.mms_hpca_spin_sum import mms_hpca_spin_sum +from pyspedas.projects.mms.hpca.mms_get_hpca_info import mms_get_hpca_info +#========================================================== + +# moto server mock details +localhost = "http://localhost:3000" +bucket_name = "test-bucket" + +# Set up mock AWS environment variables (fake credentials) +os.environ["AWS_ACCESS_KEY_ID"] = "test" +os.environ["AWS_SECRET_ACCESS_KEY"] = "test" +os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + +# Set environment to use the local Moto S3 server +# S3 ENDPOINT for fsspec +# ENDPOINT URL for cdflib/boto3 +os.environ["AWS_S3_ENDPOINT"] = localhost +os.environ["AWS_ENDPOINT_URL"] = localhost + +class LoadTestCases(unittest.TestCase): + """ + Cloud Awareness Unit Tests + + Depends upon moto[server] package. Install via: + pip install moto[server] + + These tests essentially create a local mock-AWS server as a background + process at port 3000. + Note: The environment variables are used as mock credentials in order + to avoid having to pass the endpoint url to fsspec calls. + """ + + @classmethod + def setUpClass(cls): + # Start the moto server for S3 in the background + # https://github.com/getmoto/moto/issues/4418 + cls.moto_server = subprocess.Popen( + ["moto_server", "-p3000"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Allow the server to start properly + time.sleep(2) + + # Create a bucket using direct HTTP requests + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + @classmethod + def tearDownClass(cls): + # Terminate the moto server after tests + cls.moto_server.terminate() + cls.moto_server.communicate() + + def clean_data(self): + # reset moto server to original state + response = requests.post("http://localhost:3000/moto-api/reset") + assert response.status_code == 200, "Moto Server reset failed" + + # create bucket again + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + #========================================================================== + # Cloud Awareness Note: Cluster implementation does not stream data due to + # lack of functionality already present in PySPEDAS. + + # Adapted unit tests for AWS-specific URI testing. + def test_state_load_eph_no_update(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_state(datatypes=['pos', 'vel']) # ensure the files are stored locally + del_data('*') # remove the current tplot vars + data = mms_load_state(datatypes=['pos', 'vel'], no_update=True) # ensure the files are stored locally + self.assertTrue(data_exists('mms1_defeph_pos')) + self.assertTrue(data_exists('mms1_defeph_vel')) + self.assertTrue('mms1_defeph_pos' in data) + self.assertTrue('mms1_defeph_vel' in data) + + def test_state_load_eph_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_state(datatypes=['pos', 'vel']) + self.assertTrue(data_exists('mms1_defeph_pos')) + self.assertTrue(data_exists('mms1_defeph_vel')) + tplot(['mms1_defeph_pos', 'mms1_defeph_vel'], display=False) + self.assertTrue('mms1_defeph_pos' in data) + self.assertTrue('mms1_defeph_vel' in data) + + def test_state_load_eph_multiprobe_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_state(datatypes=['pos', 'vel'],probe=['1','2','3','4']) + self.assertTrue(data_exists('mms1_defeph_pos')) + self.assertTrue(data_exists('mms1_defeph_vel')) + tplot(['mms1_defeph_pos', 'mms1_defeph_vel'], display=False) + self.assertTrue('mms1_defeph_pos' in data) + self.assertTrue('mms1_defeph_vel' in data) + self.assertTrue('mms2_defeph_pos' in data) + self.assertTrue('mms2_defeph_vel' in data) + self.assertTrue('mms3_defeph_pos' in data) + self.assertTrue('mms3_defeph_vel' in data) + self.assertTrue('mms4_defeph_pos' in data) + self.assertTrue('mms4_defeph_vel' in data) + + def test_state_load_tqf(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_tetrahedron_qf() + self.assertTrue(data_exists('mms_tetrahedron_qf')) + tplot('mms_tetrahedron_qf',display=False) + self.assertTrue('mms_tetrahedron_qf' in data) + + def test_state_load_tqf_no_update(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_tetrahedron_qf() # Ensure that some data is downloaded + del_data('mms_tetrahedron_qf') # Delete the tplot variable + data = mms_load_tetrahedron_qf(no_update=True) # Ensure that it can be loaded from previously downloaded files + self.assertTrue(data_exists('mms_tetrahedron_qf')) + self.assertTrue('mms_tetrahedron_qf' in data) + tplot('mms_tetrahedron_qf',display=False) + + def test_state_load_att_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_state(trange=['2015-10-16', '2015-10-16/06:00'], datatypes=['spinras', 'spindec']) + self.assertTrue(data_exists('mms1_defatt_spinras')) + self.assertTrue(data_exists('mms1_defatt_spindec')) + self.assertTrue('mms1_defatt_spinras' in data) + self.assertTrue('mms1_defatt_spindec' in data) + tplot(['mms1_defatt_spinras', 'mms1_defatt_spindec'], display=False) + + def test_mec_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_mec(trange=['2015-10-16', '2015-10-16/01:00'], available=True) + data = mms_load_mec(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_mec_r_sm')) + tplot(['mms1_mec_r_sm'], display=False) + + def test_mec_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_mec(trange=['2015-10-16', '2015-10-16/01:00'], spdf=True) + self.assertTrue(data_exists('mms1_mec_r_sm')) + + def test_mec_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_mec(trange=['2015-10-16', '2015-10-16/01:00'], suffix='_test') + self.assertTrue(data_exists('mms1_mec_r_sm_test')) + + def test_fgm_regression_multi_imports_spdf(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], spdf=True) + t1, d1 = get_data('mms1_fgm_b_gse_brst_l2') + data = mms_load_fgm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], spdf=True) + t2, d2 = get_data('mms1_fgm_b_gse_brst_l2') + self.assertTrue(t1.shape == t2.shape) + self.assertTrue(d1.shape == d2.shape) + + def test_fgm_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00'],available=True) + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_fgm_b_gse_srvy_l2')) + self.assertTrue(data_exists('Epoch')) + self.assertTrue(data_exists('Epoch_state')) + tplot(['mms1_fgm_b_gse_srvy_l2'], display=False) + + def test_fgm_load_default_data_exclude(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + # Capture all log messages of level INFO or above + with self.assertLogs(level=logging.INFO) as captured: + # assertLogs fails if there are no log messages, so we make sure there's at least one + logging.info("Dummy log message") + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00'],exclude_format='*rdeltahalf*',available=True) + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00'],exclude_format='*rdeltahalf*') + self.assertTrue(data_exists('mms1_fgm_b_gse_srvy_l2')) + self.assertTrue(data_exists('Epoch')) + self.assertTrue(data_exists('Epoch_state')) + tplot(['mms1_fgm_b_gse_srvy_l2'], display=False) + # Assert that none of the log messages contain the string "rdeltahalf" + logging.info("Captured log messages:") + for rec in captured.records: + logging.info(rec.msg) + self.assertTrue("rdeltahalf" not in rec.msg) + + def test_fgm_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], spdf=True) + self.assertTrue(data_exists('mms1_fgm_b_gse_brst_l2')) + + def test_fgm_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], suffix='_test') + self.assertTrue(data_exists('mms1_fgm_b_gse_brst_l2_test')) + + def test_fgm_load_multiple_sc(self): + data = mms_load_fgm(probe=[1, 2, 3, 4], data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10']) + self.assertTrue(data_exists('mms1_fgm_b_gse_brst_l2')) + self.assertTrue(data_exists('mms2_fgm_b_gse_brst_l2')) + self.assertTrue(data_exists('mms3_fgm_b_gse_brst_l2')) + self.assertTrue(data_exists('mms4_fgm_b_gse_brst_l2')) + + def test_fgm_load_brst_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10']) + self.assertTrue(data_exists('mms1_fgm_b_gse_brst_l2')) + tplot(['mms1_fgm_b_gse_brst_l2'], display=False) + + def test_fgm_load_data_no_update(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00']) # make sure the files exist locally + del_data('*') + data = mms_load_fgm(trange=['2015-10-16', '2015-10-16/01:00'], no_update=True) # load the file from the local cache + self.assertTrue(data_exists('mms1_fgm_b_gse_srvy_l2')) + + def test_scm_brst_dpwrspc_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_scm(probe=4, data_rate='brst', datatype='scb', trange=['2015-10-01/10:48:16', '2015-10-01/10:49:16']) + tdpwrspc('mms4_scm_acb_gse_scb_brst_l2', notmvariance=True) + self.assertTrue(data_exists('mms4_scm_acb_gse_scb_brst_l2')) + self.assertTrue(data_exists('mms4_scm_acb_gse_scb_brst_l2_x_dpwrspc')) + self.assertTrue(data_exists('mms4_scm_acb_gse_scb_brst_l2_y_dpwrspc')) + self.assertTrue(data_exists('mms4_scm_acb_gse_scb_brst_l2_z_dpwrspc')) + tplot(['mms4_scm_acb_gse_scb_brst_l2', + 'mms4_scm_acb_gse_scb_brst_l2_x_dpwrspc', + 'mms4_scm_acb_gse_scb_brst_l2_y_dpwrspc', + 'mms4_scm_acb_gse_scb_brst_l2_z_dpwrspc'], display=False) + + def test_scm_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_scm(trange=['2015-10-16', '2015-10-16/01:00'], available=True) + data = mms_load_scm(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_scm_acb_gse_scsrvy_srvy_l2')) + + def test_scm_load_schb(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = pyspedas.mms.scm(probe=4, data_rate='brst', datatype='schb', trange=['2015-10-01/10:48:16', '2015-10-01/10:49:16']) + self.assertTrue(data_exists('mms4_scm_acb_gse_schb_brst_l2')) + + def test_scm_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_scm(trange=['2015-10-16', '2015-10-16/01:00'], suffix='_test') + self.assertTrue(data_exists('mms1_scm_acb_gse_scsrvy_srvy_l2_test')) + + def test_scm_load_multiple_sc(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_scm(probe=['1', '2', '3', '4'], trange=['2017-12-15', '2017-12-16']) + # self.assertTrue(data_exists('mms1_scm_acb_gse_scsrvy_srvy_l2')) + # self.assertTrue(data_exists('mms2_scm_acb_gse_scsrvy_srvy_l2')) + self.assertTrue(data_exists('mms3_scm_acb_gse_scsrvy_srvy_l2')) + self.assertTrue(data_exists('mms4_scm_acb_gse_scsrvy_srvy_l2')) + + def test_scm_load_brst_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_scm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], datatype='scb') + self.assertTrue(data_exists('mms1_scm_acb_gse_scb_brst_l2')) + + def test_scm_available(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + files = mms_load_scm(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10'], datatype='scb', available=True) + self.assertTrue(len(files) == 2) + + def test_hpca_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_hpca(trange=['2015-10-16', '2015-10-16/01:00'], available=True) + data = mms_load_hpca(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_hpca_hplus_number_density')) + tplot(['mms1_hpca_hplus_number_density'], display=False) + + def test_hpca_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_hpca(trange=['2015-10-16', '2015-10-16/01:00'], spdf=True) + self.assertTrue(data_exists('mms1_hpca_hplus_number_density')) + tplot(['mms1_hpca_hplus_number_density'], display=False) + + def test_hpca_load_ion_omni_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + del_data('*') + data = mms_load_hpca(probe=2, trange=['2016-08-09/09:10', '2016-08-09/10:10:00'], datatype='ion', data_rate='brst', suffix='_brst') + mms_hpca_calc_anodes(fov=[0, 360], probe=2, suffix='_brst') + mms_hpca_spin_sum(probe=2, suffix='_brst', avg=True) + self.assertTrue(data_exists('mms2_hpca_hplus_flux_brst_elev_0-360_spin')) + tplot(['mms2_hpca_hplus_flux_brst_elev_0-360_spin'], display=False) + + def test_hpca_load_ion_omni(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + del_data('*') + data = mms_load_hpca(trange=['2016-10-16', '2016-10-16/6:00'], datatype='ion') + mms_hpca_calc_anodes(fov=[0, 360], probe='1') + mms_hpca_spin_sum() + self.assertTrue(data_exists('mms1_hpca_hplus_flux_elev_0-360_spin')) + self.assertTrue(data_exists('mms1_hpca_heplus_flux_elev_0-360_spin')) + self.assertTrue(data_exists('mms1_hpca_heplusplus_flux_elev_0-360_spin')) + self.assertTrue(data_exists('mms1_hpca_oplus_flux_elev_0-360_spin')) + tplot(['mms1_hpca_hplus_flux_elev_0-360_spin', + 'mms1_hpca_heplus_flux_elev_0-360_spin', + 'mms1_hpca_heplusplus_flux_elev_0-360_spin', + 'mms1_hpca_oplus_flux_elev_0-360_spin'], display=False) + + def test_hpca_center_fast_moments_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_hpca(trange=['2015-10-16/14:00', '2015-10-16/15:00']) + centered = mms_load_hpca(trange=['2015-10-16/14:00', '2015-10-16/15:00'], center_measurement=True, suffix='_centered') + + t, d = get_data('mms1_hpca_hplus_ion_bulk_velocity') + c, d = get_data('mms1_hpca_hplus_ion_bulk_velocity_centered') + self.assertTrue(np.round(c[0]-t[0], decimals=3) == 5.0) + + def test_hpca_center_brst_moments_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_hpca(trange=['2015-10-16/13:06', '2015-10-16/13:07'], data_rate='brst') + centered = mms_load_hpca(trange=['2015-10-16/13:06', '2015-10-16/13:07'], data_rate='brst', center_measurement=True, suffix='_centered') + + t, d = get_data('mms1_hpca_hplus_ion_bulk_velocity') + c, d = get_data('mms1_hpca_hplus_ion_bulk_velocity_centered') + self.assertTrue(np.round(c[0]-t[0], decimals=3) == 5.0) + + def test_hpca_info(self): + info = mms_get_hpca_info() + self.assertTrue(list(info.keys()) == ['elevation', 't_spin', 't_sweep', 'azimuth_energy_offset']) + + def test_edp_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edp(trange=['2015-10-16', '2015-10-16/01:00'], available=True) + data = mms_load_edp(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_edp_dce_gse_fast_l2')) + tplot(['mms1_edp_dce_gse_fast_l2'], display=False) + + def test_edp_load_hfesp_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edp(trange=['2015-10-16', '2015-10-16/01:00'], datatype='hfesp', data_rate='srvy') + self.assertTrue(data_exists('mms1_edp_hfesp_srvy_l2')) + tplot(['mms1_edp_hfesp_srvy_l2'], display=False) + + def test_edp_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edp(trange=['2015-10-16', '2015-10-16/01:00'], spdf=True) + self.assertTrue(data_exists('mms1_edp_dce_gse_fast_l2')) + tplot(['mms1_edp_dce_gse_fast_l2'], display=False) + + def test_edp_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edp(trange=['2015-10-16', '2015-10-16/01:00'], suffix='_test') + self.assertTrue(data_exists('mms1_edp_dce_gse_fast_l2')) + + def test_edp_load_brst_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edp(data_rate='brst', trange=['2015-10-16/13:06', '2015-10-16/13:10']) + self.assertTrue(data_exists('mms1_edp_dce_gse_brst_l2')) + tplot(['mms1_edp_dce_gse_brst_l2'], display=False) + + def test_edi_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edi(trange=['2016-10-17/13:00', '2016-10-17/14:00'], available=True) + data = mms_load_edi(trange=['2016-10-17/13:00', '2016-10-17/14:00']) + self.assertTrue(data_exists('mms1_edi_e_gse_srvy_l2')) + tplot(['mms1_edi_e_gse_srvy_l2'], display=False) + + def test_edi_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edi(trange=['2016-10-17/13:00', '2016-10-17/14:00'], spdf=True) + self.assertTrue(data_exists('mms1_edi_e_gse_srvy_l2')) + tplot(['mms1_edi_e_gse_srvy_l2'], display=False) + + def test_edi_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_edi(trange=['2016-10-17/13:00', '2016-10-17/14:00'], suffix='_test') + self.assertTrue(data_exists('mms1_edi_e_gse_srvy_l2_test')) + tplot(['mms1_edi_e_gse_srvy_l2_test'], display=False) + + def test_aspoc_load_default_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_aspoc(trange=['2015-10-16', '2015-10-16/01:00'], available=True) + data = mms_load_aspoc(trange=['2015-10-16', '2015-10-16/01:00']) + self.assertTrue(data_exists('mms1_aspoc_ionc_l2')) + tplot(['mms1_aspoc_ionc_l2'], display=False) + + def test_aspoc_load_spdf_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_aspoc(trange=['2015-10-16', '2015-10-16/01:00'], spdf=True) + self.assertTrue(data_exists('mms1_aspoc_ionc_l2')) + + def test_aspoc_load_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_aspoc(trange=['2015-10-16', '2015-10-16/01:00'], suffix='_test') + self.assertTrue(data_exists('mms1_aspoc_ionc_l2_test')) + + def test_dsp_load_epsd_bpsd_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_dsp(trange=['2015-08-01','2015-08-02'], datatype=['epsd', 'bpsd'], level='l2', data_rate='fast') + self.assertTrue(data_exists('mms1_dsp_epsd_omni')) + self.assertTrue(data_exists('mms1_dsp_bpsd_omni')) + tplot(['mms1_dsp_epsd_omni', 'mms1_dsp_bpsd_omni'], display=False) + + def test_dsp_load_bpsd_data(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_dsp(trange=['2015-10-16','2015-10-17'], datatype='bpsd', level='l2', data_rate='fast', available=True) + data = mms_load_dsp(trange=['2015-10-16','2015-10-17'], datatype='bpsd', level='l2', data_rate='fast') + self.assertTrue(data_exists('mms1_dsp_bpsd_omni_fast_l2')) + tplot(['mms1_dsp_bpsd_omni_fast_l2'], display=False) + + def test_dsp_load_epsd_spdf(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_dsp(trange=['2015-08-01','2015-08-02'], datatype='epsd', level='l2', data_rate='fast', spdf=True) + self.assertTrue(data_exists('mms1_dsp_epsd_omni')) + tplot(['mms1_dsp_epsd_omni'], display=False) + + def test_dsp_load_epsd_suffix(self): + self.clean_data() + mms_config.CONFIG['local_data_dir'] = f"s3://{bucket_name}" + + data = mms_load_dsp(trange=['2015-08-01','2015-08-02'], datatype='epsd', level='l2', data_rate='fast', suffix='_test') + self.assertTrue(data_exists('mms1_dsp_epsd_omni_test')) + tplot(['mms1_dsp_epsd_omni_test'], display=False) + +if __name__ == '__main__': + unittest.main() diff --git a/pyspedas/utilities/download.py b/pyspedas/utilities/download.py index 36a6764f..6ed740db 100644 --- a/pyspedas/utilities/download.py +++ b/pyspedas/utilities/download.py @@ -205,7 +205,6 @@ def download_file( # Cloud Awareness if is_fsspec_uri(url): - #dt_fmt = "%a, %d %b %Y %H:%M:%S GMT" protocol, path = url.split("://") remote_fs = fsspec.filesystem(protocol, anon=False) remote_modtime = remote_fs.info(path)["LastModified"] diff --git a/pyspedas/utilities/tests/download_uri_tests.py b/pyspedas/utilities/tests/download_uri_tests.py new file mode 100644 index 00000000..d37d77c7 --- /dev/null +++ b/pyspedas/utilities/tests/download_uri_tests.py @@ -0,0 +1,145 @@ +import os +import time +import requests +import unittest +import subprocess + +from pyspedas.utilities.download import download + +class DownloadTestCases(unittest.TestCase): + """ + Cloud Awareness Unit Tests + + Depends upon moto[server] package. Install via: + pip install moto[server] + + These tests essentially create a local mock-AWS server as a background + process at port 3000. + Note: The environment variables are used as mock credentials in order + to avoid having to pass the endpoint url to fsspec calls. + """ + + @classmethod + def setUpClass(cls): + # Start the moto server for S3 in the background + # https://github.com/getmoto/moto/issues/4418 + cls.moto_server = subprocess.Popen( + ["moto_server", "-p3000"], + stdout=subprocess.PIPE, stderr=subprocess.PIPE + ) + + # Allow the server to start properly + time.sleep(2) + + # Set up mock AWS environment variables (fake credentials) + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + # Set environment to use the local Moto S3 server + # S3 ENDPOINT for fsspec + # ENDPOINT URL for cdflib's boto3 + os.environ["AWS_S3_ENDPOINT"] = "http://localhost:3000" + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:3000" + + # Create a bucket using direct HTTP requests + bucket_name = "test-bucket" + response = requests.put(f"http://localhost:3000/{bucket_name}") + assert response.status_code == 200, "Bucket creation failed" + + @classmethod + def tearDownClass(cls): + # Terminate the moto server after tests + cls.moto_server.terminate() + cls.moto_server.communicate() + + #========================================================================== + # Adapted unit tests (from download_tests.py) for AWS-specific URI testing. + def test_local_path(self): + # Remote/AWS details + bucket_name = "test-bucket" + s3_url = f"s3://{bucket_name}" + + # Include mock AWS credentials and endpoints + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + os.environ["AWS_S3_ENDPOINT"] = "http://localhost:3000" + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:3000" + + # specifying local_path changes the local data directory + files = download( + local_path=s3_url + "/psp_data/spc/13", + remote_file="https://spdf.gsfc.nasa.gov/pub/data/psp/sweap/spc/l3/l3i/2019/psp_swp_spc_l3i_20190401_v01.cdf" + ) + + self.assertTrue(len(files) == 1) + self.assertTrue(files[0] == s3_url + "/psp_data/spc/13/psp_swp_spc_l3i_20190401_v01.cdf") + + def test_remote_path(self): + # Remote/AWS details + bucket_name = "test-bucket" + s3_url = f"s3://{bucket_name}" + + # Include mock AWS credentials and endpoints + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + os.environ["AWS_S3_ENDPOINT"] = "http://localhost:3000" + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:3000" + + # download (if not already downloaded from prior tests) + files = download( + local_path=s3_url + "/psp/sweap/spc/l3/l3i/2019", + remote_file="https://spdf.gsfc.nasa.gov/pub/data/psp/sweap/spc/l3/l3i/2019/psp_swp_spc_l3i_20190401_v01.cdf" + ) + self.assertTrue(len(files) == 1) + self.assertTrue(files[0] == s3_url + "/psp/sweap/spc/l3/l3i/2019/psp_swp_spc_l3i_20190401_v01.cdf") + + # stream from remote + with self.assertLogs(level='INFO') as log: + files = download( + remote_path=s3_url, + remote_file="/psp/sweap/spc/l3/l3i/2019/psp_swp_spc_l3i_20190401_v01.cdf" + ) + self.assertIn("Streaming from remote", log.output[0]) + + def test_force_download(self): + # Remote/AWS details + bucket_name = "test-bucket" + s3_url = f"s3://{bucket_name}" + + # Include mock AWS credentials and endpoints + os.environ["AWS_ACCESS_KEY_ID"] = "test" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test" + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + os.environ["AWS_S3_ENDPOINT"] = "http://localhost:3000" + os.environ["AWS_ENDPOINT_URL"] = "http://localhost:3000" + + files = download( + local_path=s3_url + "/themis/tha/l1/state/2007", + remote_file="https://spdf.gsfc.nasa.gov/pub/data/themis/tha/l1/state/2007/tha_l1_state_20070217_v01.cdf" + ) + self.assertTrue(len(files) == 1) + print(files) + self.assertTrue(files[0] == s3_url + "/themis/tha/l1/state/2007/tha_l1_state_20070217_v01.cdf") + + # Download the same file without force_download, should not re-download + with self.assertLogs(level='INFO') as log: + files = download( + local_path=s3_url + "/themis/tha/l1/state/2007", + remote_file="https://spdf.gsfc.nasa.gov/pub/data/themis/tha/l1/state/2007/tha_l1_state_20070217_v01.cdf" + ) + self.assertIn("File is current", log.output[0]) + + # Download the same file with force_download, should re-download + with self.assertLogs(level='INFO') as log: + files = download( + local_path=s3_url + "/psp_data/spc/13", + remote_file="https://spdf.gsfc.nasa.gov/pub/data/psp/sweap/spc/l3/l3i/2019/psp_swp_spc_l3i_20190401_v01.cdf", + force_download=True + ) + self.assertIn("Downloading", log.output[0]) + +if __name__ == '__main__': + unittest.main(verbosity=2)