Skip to content

Commit

Permalink
update scedc
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuwq0 committed Dec 29, 2023
1 parent 2a0f3df commit b6841f4
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 210 deletions.
13 changes: 10 additions & 3 deletions datasets/NCEDC/download_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ def read_phase_line(line):
tmp += timedelta(seconds=p_phase["second_of_p_arrival"])
p_phase["phase_time"] = tmp.strftime("%Y-%m-%dT%H:%M:%S.%f")
p_phase["phase_polarity"] = p_phase["p_polarity"]
p_phase["remark"] = p_phase["p_remark"]
p_phase["phase_remark"] = p_phase["p_remark"]
p_phase["phase_score"] = p_phase["p_weight_code"]
p_phase["phase_type"] = "P"
p_phase["location_residual_s"] = p_phase["p_travel_time_residual"]
Expand All @@ -258,7 +258,7 @@ def read_phase_line(line):
)
tmp += timedelta(seconds=s_phase["second_of_s_arrival"])
s_phase["phase_time"] = tmp.strftime("%Y-%m-%dT%H:%M:%S.%f")
s_phase["remark"] = s_phase["s_remark"]
s_phase["phase_remark"] = s_phase["s_remark"]
s_phase["phase_score"] = s_phase["s_weight_code"]
s_phase["phase_type"] = "S"
s_phase["location_residual_s"] = s_phase["s_travel_time_residual"]
Expand Down Expand Up @@ -322,7 +322,7 @@ def process(year):
"phase_time",
"phase_score",
"phase_polarity",
"remark",
"phase_remark",
"distance_km",
"azimuth",
"takeoff_angle",
Expand Down Expand Up @@ -364,6 +364,13 @@ def process(year):
phases_ps.to_csv(f"{result_path}/catalog/{phase_filename[:-2]}_ps.csv", index=False)
phases.to_csv(f"{result_path}/catalog/{phase_filename[:-2]}.csv", index=False)

# year, month = phase_filename.split("/")[-1].split(".")[0:2]
# if not os.path.exists(f"{result_path}/catalog/{year}"):
# os.makedirs(f"{result_path}/catalog/{year}")
# events.to_csv(f"{result_path}/catalog/{year}/{year}_{month}.event.csv", index=False)
# phases_ps.to_csv(f"{result_path}/catalog/{year}/{year}_{month}.phase_ps.csv", index=False)
# phases.to_csv(f"{result_path}/catalog/{year}/{year}_{month}.phase.csv", index=False)


if __name__ == "__main__":
ctx = mp.get_context("spawn")
Expand Down
1 change: 1 addition & 0 deletions datasets/SCEDC/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
*.h5
348 changes: 200 additions & 148 deletions datasets/SCEDC/convert_hdf5.py

Large diffs are not rendered by default.

39 changes: 18 additions & 21 deletions datasets/SCEDC/convert_station.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
# %%
from datetime import datetime, timedelta, timezone
from pathlib import Path
import os
from datetime import timezone

import fsspec
import numpy as np
import obspy
import pandas as pd
from tqdm import tqdm

# %%
protocol = "s3"
bucket = "scedc-pds"
fs = fsspec.filesystem(protocol, anon=True)
input_protocol = "s3"
input_bucket = "scedc-pds"
input_fs = fsspec.filesystem(input_protocol, anon=True)

output_protocol = "gs"
output_token = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json"
output_bucket = "quakeflow_dataset/SC"
output_fs = fsspec.filesystem(output_protocol, token=output_token)

# %%
catalog_path = "event_phases"
station_path = "FDSNstationXML"
waveform_path = "continuous_waveforms/"
dataset_path = Path("./dataset")
if not dataset_path.exists():
dataset_path.mkdir()
if not (dataset_path / "statioin").exists():
(dataset_path / "station").mkdir()
station_path = f"{input_bucket}/FDSNstationXML"


# %%
Expand Down Expand Up @@ -72,17 +69,17 @@ def parse_inventory_csv(inventory):

# %%
inv = obspy.Inventory()
for network in fs.glob(f"{bucket}/{station_path}/*"):
for network in input_fs.glob(f"{station_path}/*"):
print(f"Parse {network}")
# inv = obspy.Inventory()
for xml in tqdm(fs.glob(f"{network}/*.xml")):
with fs.open(xml) as f:
for xml in tqdm(input_fs.glob(f"{network}/*.xml")):
with input_fs.open(xml) as f:
inv += obspy.read_inventory(f)

stations = parse_inventory_csv(inv)
# stations.to_csv(dataset_path / "station" / f"{network.split('/')[-1]}.csv", index=False)

for network, sta in stations.groupby(["network"]):
print(network, sta)
sta.to_csv(dataset_path / "station" / f"{network}.csv", index=False)
print(network)
with output_fs.open(f"{output_bucket}/station/{network}.csv", "wb") as f:
sta.to_csv(f, index=False)

# %%
23 changes: 12 additions & 11 deletions datasets/SCEDC/download_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
# %%
## https://scedc.caltech.edu/data/stp/STP_Manual_v1.01.pdf
# Defining a function to parse event location information
parse_event_time = lambda x: datetime.strptime(":".join(x.split(":")[:-1]), "%Y/%m/%d,%H:%M") + timedelta(
seconds=float(x.split(":")[-1])
parse_event_time = lambda x: (
datetime.strptime(":".join(x.split(":")[:-1]), "%Y/%m/%d,%H:%M") + timedelta(seconds=float(x.split(":")[-1]))
)


Expand All @@ -38,7 +38,8 @@ def parse_event_info(line):
event_info = {
"event_id": "ci" + fields[0],
"event_type": fields[1],
"event_time": parse_event_time(fields[3]),
# "event_time": parse_event_time(fields[3]),
"time": parse_event_time(fields[3]),
"latitude": float(fields[4]),
"longitude": float(fields[5]),
"depth_km": float(fields[6]),
Expand All @@ -51,7 +52,8 @@ def parse_event_info(line):
"event_id": fields[0],
"event_type": fields[1],
# "date": fields[2],
"event_time": parse_event_time(fields[2]),
# "event_time": parse_event_time(fields[2]),
"time": parse_event_time(fields[2]),
"latitude": float(fields[3]),
"longitude": float(fields[4]),
"depth_km": float(fields[5]),
Expand Down Expand Up @@ -82,7 +84,7 @@ def parse_phase_pick(line, event_id, event_time):
"phase_remark": fields[9],
"phase_score": float(fields[10]),
"distance_km": float(fields[11]),
"phase_time": event_time + timedelta(seconds=float(fields[12])),
"phase_time": (event_time + timedelta(seconds=float(fields[12]))),
"event_id": event_id,
}
if phase_pick["phase_polarity"][0] == ".":
Expand Down Expand Up @@ -112,7 +114,7 @@ def parse(jday):
continue

event = parse_event_info(event_line)
phases_ = [parse_phase_pick(line.strip(), event["event_id"], event["event_time"]) for line in fp]
phases_ = [parse_phase_pick(line.strip(), event["event_id"], event["time"]) for line in fp]
if len(phases_) == 0:
continue

Expand Down Expand Up @@ -145,9 +147,9 @@ def parse(jday):
phases = phases[phases.event_id.isin(event_ids)]

# add timezone utc to phase_time
events["event_time"] = events["event_time"].dt.tz_localize("UTC")
phases["phase_time"] = phases["phase_time"].dt.tz_localize("UTC")
phases_ps["phase_time"] = phases_ps["phase_time"].dt.tz_localize("UTC")
events["time"] = events["time"].apply(lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%f") + "+00:00")
phases["phase_time"] = phases["phase_time"].apply(lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%f") + "+00:00")
phases_ps["phase_time"] = phases_ps["phase_time"].apply(lambda x: x.strftime("%Y-%m-%dT%H:%M:%S.%f") + "+00:00")

with output_fs.open(f"{dataset_path}/{jday.split('/')[-2]}/{jday.split('/')[-1]}.event.csv", "w") as fp:
events.to_csv(fp, index=False)
Expand All @@ -168,8 +170,7 @@ def parse(jday):
file_list.append(jday)

file_list = sorted(file_list, reverse=True)

ncpu = mp.cpu_count()
ncpu = mp.cpu_count() * 2
pbar = tqdm(total=len(file_list))
with mp.get_context("spawn").Pool(ncpu) as pool:
for f in file_list:
Expand Down
65 changes: 39 additions & 26 deletions datasets/SCEDC/download_waveform.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
result_path = f"{output_bucket}/SC"

# %%
catalog_path = f"{input_bucket}/event_phases"
station_path = f"{input_bucket}/FDSNstationXML"
waveform_path = f"{input_bucket}/continuous_waveforms"
catalog_path = f"{result_path}/catalog"
station_path = f"FDSNstationXML"
waveform_path = f"continuous_waveforms"


# %%
Expand All @@ -40,30 +40,30 @@ def cut_data(event, phases):
return 0

arrival_time = phases.loc[[event.event_id], "phase_time"].min()
begin_time = arrival_time - pd.Timedelta(seconds=30)
end_time = arrival_time + pd.Timedelta(seconds=90)
begin_time = arrival_time - pd.Timedelta(seconds=35)
end_time = arrival_time + pd.Timedelta(seconds=95)

for _, pick in phases.loc[[event.event_id]].iterrows():
outfile_path = f"{result_path}/waveform_mseed2/{event.time.year}/{event.time.year}.{event.time.dayofyear:03d}/{event.event_id}_{event.time.strftime('%Y%m%d%H%M%S')}"
outfile_path = f"{result_path}/waveform_mseed/{event.time.year}/{event.time.year}.{event.time.dayofyear:03d}/{event.event_id}_{begin_time.strftime('%Y%m%d%H%M%S')}"
outfile_name = f"{pick.network}.{pick.station}.{pick.location}.{pick.instrument}.mseed"
if output_fs.exists(f"{outfile_path}/{outfile_name}"):
continue

########### NCEDC ###########
inv_path = f"{station_path}/{pick.network}.info/{pick.network}.FDSN.xml/{pick.network}.{pick.station}.xml"
if not os.path.exists(inv_path):
continue
inv = obspy.read_inventory(str(inv_path))
# inv_path = f"{station_path}/{pick.network}.info/{pick.network}.FDSN.xml/{pick.network}.{pick.station}.xml"
# if not os.path.exists(inv_path):
# continue
# inv = obspy.read_inventory(str(inv_path))

begin_mseed_path = f"{waveform_path}/{pick.network}/{begin_time.year}/{begin_time.year}.{begin_time.dayofyear:03d}/{pick.station}.{pick.network}.{pick.instrument}?.{pick.location}.?.{begin_time.year}.{begin_time.dayofyear:03d}"
end_mseed_path = f"{waveform_path}/{pick.network}/{end_time.year}/{end_time.year}.{end_time.dayofyear:03d}/{pick.station}.{pick.network}.{pick.instrument}?.{pick.location}.?.{end_time.year}.{end_time.dayofyear:03d}"
# begin_mseed_path = f"{waveform_path}/{pick.network}/{begin_time.year}/{begin_time.year}.{begin_time.dayofyear:03d}/{pick.station}.{pick.network}.{pick.instrument}?.{pick.location}.?.{begin_time.year}.{begin_time.dayofyear:03d}"
# end_mseed_path = f"{waveform_path}/{pick.network}/{end_time.year}/{end_time.year}.{end_time.dayofyear:03d}/{pick.station}.{pick.network}.{pick.instrument}?.{pick.location}.?.{end_time.year}.{end_time.dayofyear:03d}"

########### SCEDC ###########
inv_path = f"{input_bucket}/{station_path}/{pick.network}/{pick.network}_{pick.station}.xml"
if not input_fs.exists(inv_path):
inv_path = f"{input_bucket}/{station_path}/unauthoritative-XML/{pick.network}.{pick.station}.xml"
if not input_fs.exists(inv_path):
print(f"{inv_path} not exists")
# print(f"{inv_path} not exists")
continue

with input_fs.open(inv_path) as f:
Expand All @@ -78,7 +78,9 @@ def cut_data(event, phases):
try:
st = obspy.Stream()
for mseed_path in set([begin_mseed_path, end_mseed_path]):
st += obspy.read(str(mseed_path))
for mseed in input_fs.glob(mseed_path):
with input_fs.open(mseed) as f:
st += obspy.read(f)
except Exception as e:
print(e)
continue
Expand All @@ -99,8 +101,8 @@ def cut_data(event, phases):
for tr in st:
tr.data = tr.data.astype("float32")

if not output_fs.exists(outfile_path):
output_fs.makedirs(outfile_path)
# if not output_fs.exists(outfile_path):
# output_fs.makedirs(outfile_path)

with output_fs.open(f"{outfile_path}/{outfile_name}", "wb") as f:
st.write(f, format="MSEED")
Expand All @@ -118,33 +120,44 @@ def cut_data(event, phases):

# %%
if __name__ == "__main__":
ncpu = min(mp.cpu_count(), 32)
event_list = sorted(list(glob(f"{catalog_path}/*.event.csv")))[::-1]
ncpu = min(mp.cpu_count() * 2, 32)
# event_list = sorted(list(glob(f"{catalog_path}/*.event.csv")))[::-1]
fs = fsspec.filesystem(output_protocol, token=output_token)
event_list = sorted(list(fs.glob(f"{result_path}/catalog/????/*.event.csv")), reverse=True)
start_year = "1967"
end_year = "2023"
tmp = []
for event_file in event_list:
if (
event_file.split("/")[-1].split(".")[0] >= start_year
and event_file.split("/")[-1].split(".")[0] <= end_year
## NCEDC
# event_file.split("/")[-1].split(".")[0] >= start_year
# and event_file.split("/")[-1].split(".")[0] <= end_year
## SCEDC
event_file.split("/")[-2] >= start_year
and event_file.split("/")[-2] <= end_year
):
tmp.append(event_file)
event_list = sorted(tmp, reverse=True)

for event_file in event_list:
print(event_file)
events = pd.read_csv(event_file, parse_dates=["time"])
phases = pd.read_csv(
f"{event_file.replace('event.csv', 'phase.csv')}",
parse_dates=["phase_time"],
keep_default_na=False,
)
# events = pd.read_csv(event_file, parse_dates=["time"])
with fs.open(event_file) as f:
events = pd.read_csv(f, parse_dates=["time"])
# phases = pd.read_csv(
# f"{event_file.replace('event.csv', 'phase.csv')}",
# parse_dates=["phase_time"],
# keep_default_na=False,
# )
with fs.open(event_file.replace("event.csv", "phase.csv")) as f:
phases = pd.read_csv(f, parse_dates=["phase_time"], keep_default_na=False)
phases = phases.loc[
phases.groupby(["event_id", "network", "station", "location", "instrument"]).phase_time.idxmin()
]
phases.set_index("event_id", inplace=True)

events = events[events.event_id.isin(phases.index)]

pbar = tqdm(events, total=len(events))
with mp.get_context("spawn").Pool(ncpu) as p:
for _, event in events.iterrows():
Expand Down
1 change: 1 addition & 0 deletions datasets/SCEDC/extract_ps.py
3 changes: 2 additions & 1 deletion datasets/SCEDC/run.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ setup: |
run: |
echo "Begin run."
python download_catalog.py
# python download_catalog.py
python download_waveform.py

0 comments on commit b6841f4

Please sign in to comment.