Skip to content

Commit

Permalink
added h5 as option in arrow fields; more robust error handling in atl24g
Browse files Browse the repository at this point in the history
  • Loading branch information
jpswinski committed Oct 16, 2024
1 parent 883e75d commit 91c68ce
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 18 deletions.
39 changes: 36 additions & 3 deletions datasets/bathy/docker/oceaneyes/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pandas as pd
import geopandas as gpd
import pyarrow as pa
import pyarrow.parquet as pq
import numpy as np
Expand Down Expand Up @@ -346,13 +347,31 @@ def runClassifier(classifier, classifier_func, num_processes=6):
print("Writing Parquet file: " + settings["filename"])

# HDF5
elif format == "hdf5" or format == "h5":
elif format == "h5":


# build GeoDataFrame (default geometry is crs=SLIDERULE_EPSG)
geometry = gpd.points_from_xy(df["lon_ph"], df["lat_ph"])
gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:7912")

# get CMR query information
start_time = time.time()
hull = gdf.unary_union.convex_hull
cmr_polygon = ' '.join([f'{coord[1]} {coord[0]}' for coord in list(hull.exterior.coords)]) # lat1 lon1 lat2 lon2 ...
cmr_begin_time = df["time_ns"].min()
cmr_end_time = df["time_ns"].max()
print(f'CMR Polygon: {cmr_polygon}')
print(f'CMR Begin Time: {cmr_begin_time}')
print(f'CMR End Time: {cmr_end_time}')
print(f'calculations took {time.time() - start_time} seconds')

# helper function that adds a variable
def add_variable(group, name, data, dtype, attrs):
dataset = group.create_dataset(name, data=data, dtype=dtype)
for key in attrs:
dataset.attrs[key] = attrs[key]

# write h5 file
with h5py.File(settings["filename"], 'w') as hf:

# open granule information
Expand Down Expand Up @@ -603,24 +622,28 @@ def add_variable(group, name, data, dtype, attrs):
beam_group = hf.create_group(beam) # e.g. gt1r, gt2l, etc.
add_variable(beam_group, "index_ph", df['index_ph'], 'int32',
{'contentType':'physicalMeasurement',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'0-based index of the photon in the ATL03 heights group',
'long_name':'Photon index',
'source':'ATL03',
'units':'scalar'})
add_variable(beam_group, "index_seg", df["index_seg"], 'int32',
{'contentType':'physicalMeasurement',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'0-based index of the photon in the ATL03 geolocation group',
'long_name':'Segment index',
'source':'ATL03',
'units':'scalar'})
add_variable(beam_group, "delta_time", df["delta_time"], 'float64',
{'contentType':'physicalMeasurement',
'coordinates': 'lat_ph lon_ph',
'description':'The transmit time of a given photon, measured in seconds from the ATLAS Standard Data Product Epoch. Note that multiple received photons associated with a single transmit pulse will have the same delta_time. The ATLAS Standard Data Products (SDP) epoch offset is defined within /ancillary_data/atlas_sdp_gps_epoch as the number of GPS seconds between the GPS epoch (1980-01-06T00:00:00.000000Z UTC) and the ATLAS SDP epoch. By adding the offset contained within atlas_sdp_gps_epoch to delta time parameters, the time in gps_seconds relative to the GPS epoch can be computed.',
'long_name':'Elapsed GPS seconds',
'source':'ATL03',
'units':'seconds since 2018-01-01'})
add_variable(beam_group, "lat_ph", df["lat_ph"], 'float64',
{'contentType':'modelResult',
'coordinates': 'delta_time lon_ph',
'description':'Latitude of each received photon. Computed from the ECF Cartesian coordinates of the bounce point.',
'long_name':'Latitude',
'source':'ATL03',
Expand All @@ -630,6 +653,7 @@ def add_variable(group, name, data, dtype, attrs):
'valid_min': -90.0})
add_variable(beam_group, "lon_ph", df["lon_ph"], 'float64',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph',
'description':'Longitude of each received photon. Computed from the ECF Cartesian coordinates of the bounce point.',
'long_name':'Longitude',
'source':'ATL03',
Expand All @@ -639,54 +663,63 @@ def add_variable(group, name, data, dtype, attrs):
'valid_min': -180.0})
add_variable(beam_group, "x_atc", df["x_atc"], 'float32',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'Along-track distance in a segment projected to the ellipsoid of the received photon, based on the Along-Track Segment algorithm. Total along track distance can be found by adding this value to the sum of segment lengths measured from the start of the most recent reference groundtrack.',
'long_name':'Distance from equator crossing',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "y_atc", df["y_atc"], 'float32',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'Across-track distance projected to the ellipsoid of the received photon from the reference ground track. This is based on the Along-Track Segment algorithm described in Section 3.1.',
'long_name':'Distance off RGT',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "ellipse_h", df["ellipse_h"], 'float32',
{'contentType':'physicalMeasurement',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'Height of each received photon, relative to the WGS-84 ellipsoid including refraction correction. Note neither the geoid, ocean tide nor the dynamic atmosphere (DAC) corrections are applied to the ellipsoidal heights.',
'long_name':'Photon WGS84 height',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "ortho_h", df["ortho_h"], 'float32',
{'contentType':'physicalMeasurement',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'Height of each received photon, relative to the geoid.',
'long_name':'Orthometric height',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "depth", df["depth"], 'float32',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'Depth of the received photon below the sea surface',
'long_name':'Depth',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "sigma_thu", df["sigma_thu"], 'float32',
{'contentType':'physicalMeasurement',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'The combination of the aerial and subaqueous horizontal uncertainty for each received photon',
'long_name':'Total horizontal uncertainty',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "sigma_tvu", df["sigma_tvu"], 'float32',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'The combination of the aerial and subaqueous vertical uncertainty for each received photon',
'long_name':'Total vertical uncertainty',
'source':'ATL03',
'units':'meters'})
add_variable(beam_group, "flags", df["flags"], 'int32',
{'contentType':'modelResult',
'description':'bit 0 - max sensor depth exceeded',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'bit 0 - bathy mask boundary, bit 1 - max sensor depth exceeded, bit 2 - sea surface undetected, bit 3 - invalid kd, bit 4 - invalid wind speed, bit 5 - night flag',
'long_name':'Processing flags',
'source':'ATL03',
'units':'bit mask'})
add_variable(beam_group, "class_ph", df["ensemble"].astype(np.int16), 'int16',
add_variable(beam_group, "class_ph", df["ensemble"].astype(np.int8), 'int8',
{'contentType':'modelResult',
'coordinates': 'delta_time lat_ph lon_ph',
'description':'0 - unclassified, 1 - other, 40 - bathymetry, 41 - sea surface',
'long_name':'Photon classification',
'source':'ATL03',
Expand Down
34 changes: 21 additions & 13 deletions datasets/bathy/endpoints/atl24g.lua
Original file line number Diff line number Diff line change
Expand Up @@ -211,44 +211,52 @@ end
-------------------------------------------------------
-- wait for dataframes to complete and write to file
-------------------------------------------------------
local failed_processing_run = false
for beam,dataframe in pairs(dataframes) do
local failed_dataframe = false
if dataframe:finished(ctimeout(), rspq) then
if dataframes[beam]:length() <= 0 then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s created an empty bathy dataframe for spot %d", rspq, resource, dataframe:meta("spot")))
elseif not dataframes[beam]:isvalid() then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s failed to create valid bathy dataframe for spot %d", rspq, resource, dataframe:meta("spot")))
cleanup(crenv, transaction_id)
return
failed_dataframe = true
else
local spot = dataframe:meta("spot")
local output_filename = string.format("%s/bathy_spot_%d.parquet", crenv.host_sandbox_directory, spot)
local arrow_dataframe = arrow.dataframe(parms, dataframe)
if not arrow_dataframe then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s failed to create arrow dataframe for spot %d", rspq, resource, dataframe:meta("spot")))
cleanup(crenv, transaction_id)
return
failed_dataframe = true
elseif not arrow_dataframe:export(output_filename, arrow.PARQUET) then
userlog:alert(core.ERROR, core.RTE_ERROR, string.format("request <%s> on %s failed to write dataframe for spot %d", rspq, resource, dataframe:meta("spot")))
cleanup(crenv, transaction_id)
return
failed_dataframe = true
end
if not failed_dataframe then
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> dataframe for %s created", rspq, beam))
outputs[beam] = string.format("%s/bathy_spot_%d.parquet", crenv.container_sandbox_mount, spot)
end
userlog:alert(core.INFO, core.RTE_INFO, string.format("request <%s> dataframe for %s created", rspq, beam))
outputs[beam] = string.format("%s/bathy_spot_%d.parquet", crenv.container_sandbox_mount, spot)
dataframe:destroy()
end
-- cleanup to save memory
dataframe:destroy()
else
userlog:alert(core.ERROR, core.RTE_TIMEOUT, string.format("request <%s> on %s timed out waiting for dataframe to complete on spot %d", rspq, resource, dataframe:meta("spot")))
cleanup(crenv, transaction_id)
return
failed_dataframe = true
end
failed_processing_run = failed_processing_run or failed_dataframe
end
-- delay clean up and exit because race condition exists for
-- dataframes that otherwise might not have finished yet
if failed_processing_run then
cleanup(crenv, transaction_id)
return
end

-------------------------------------------------------
-- wait for granule to complete and write to file
-------------------------------------------------------
if granule then
outputs["granule"] = string.format("%s/bathy_granule.json", crenv.container_sandbox_mount, "w")
local f = io.open(outputs["granule"])
outputs["granule"] = string.format("%s/bathy_granule.json", crenv.container_sandbox_mount)
local f = io.open(outputs["granule"], "w")
if f then
if granule:waiton(ctimeout(), rspq) then
f:write(json.encode(granule:export()))
Expand Down
5 changes: 4 additions & 1 deletion packages/arrow/ArrowFields.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ string convertToJson(const ArrowFields::format_t& v)
case ArrowFields::PARQUET: return "\"parquet\"";
case ArrowFields::GEOPARQUET: return "\"geoparquet\"";
case ArrowFields::CSV: return "\"csv\"";
case ArrowFields::H5: return "\"h5\"";
default: throw RunTimeException(CRITICAL, RTE_ERROR, "invalid format: %d", static_cast<int>(v));
}
}
Expand All @@ -165,6 +166,7 @@ int convertToLua(lua_State* L, const ArrowFields::format_t& v)
case ArrowFields::PARQUET: lua_pushstring(L, "parquet"); break;
case ArrowFields::GEOPARQUET: lua_pushstring(L, "geoparquet"); break;
case ArrowFields::CSV: lua_pushstring(L, "csv"); break;
case ArrowFields::H5: lua_pushstring(L, "h5"); break;
default: throw RunTimeException(CRITICAL, RTE_ERROR, "invalid format: %d", static_cast<int>(v));
}

Expand All @@ -187,7 +189,8 @@ void convertFromLua(lua_State* L, int index, ArrowFields::format_t& v)
else if(StringLib::match(str, "parquet")) v = ArrowFields::PARQUET;
else if(StringLib::match(str, "geoparquet")) v = ArrowFields::GEOPARQUET;
else if(StringLib::match(str, "csv")) v = ArrowFields::CSV;
else throw RunTimeException(CRITICAL, RTE_ERROR, "format is an invalid value: %d", static_cast<int>(v));
else if(StringLib::match(str, "h5")) v = ArrowFields::H5;
else throw RunTimeException(CRITICAL, RTE_ERROR, "format is an invalid value: %s", str);
}
else if(!lua_isnil(L, index))
{
Expand Down
3 changes: 2 additions & 1 deletion packages/arrow/ArrowFields.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class ArrowFields: public FieldDictionary
FEATHER = 1,
PARQUET = 2,
GEOPARQUET = 3,
CSV = 4
CSV = 4,
H5 = 5
} format_t;

/*--------------------------------------------------------------------
Expand Down

0 comments on commit 91c68ce

Please sign in to comment.