From 436cfce36d118da31e4d73a3e8f13c0f6ab1bfdb Mon Sep 17 00:00:00 2001 From: zhuwq Date: Wed, 18 Dec 2024 22:08:52 -0800 Subject: [PATCH] add cut template --- examples/california/cut_templates_cc.py | 694 +++++++++++++++--------- examples/california/run_cctorch.py | 107 ++++ examples/california/submit_adloc.py | 2 +- examples/california/submit_cctorch.py | 117 ++++ examples/california/submit_gamma.py | 2 +- examples/california/submit_phasenet.py | 2 +- scripts/load_cloud_templates.py | 76 +++ 7 files changed, 753 insertions(+), 247 deletions(-) create mode 100644 examples/california/run_cctorch.py create mode 100644 examples/california/submit_cctorch.py create mode 100644 scripts/load_cloud_templates.py diff --git a/examples/california/cut_templates_cc.py b/examples/california/cut_templates_cc.py index 57b2470..d00f1c6 100644 --- a/examples/california/cut_templates_cc.py +++ b/examples/california/cut_templates_cc.py @@ -1,4 +1,5 @@ # %% +import argparse import json import multiprocessing as mp import os @@ -11,7 +12,6 @@ import obspy import pandas as pd from adloc.eikonal2d import calc_traveltime, init_eikonal2d -from args import parse_args from pyproj import Proj from sklearn.neighbors import NearestNeighbors from tqdm import tqdm @@ -22,7 +22,8 @@ # %% def fillin_missing_picks(picks, events, stations, config): - reference_t0 = config["reference_t0"] + # reference_t0 = config["reference_t0"] + reference_t0 = pd.Timestamp(config["reference_t0"]) vp_vs_ratio = config["vp_vs_ratio"] min_phase_score = config["min_phase_score"] @@ -75,7 +76,8 @@ def fillin_missing_picks(picks, events, stations, config): def predict_full_picks(picks, events, stations, config): vp_vs_ratio = config["vp_vs_ratio"] - reference_t0 = config["reference_t0"] + # reference_t0 = config["reference_t0"] + reference_t0 = pd.Timestamp(config["reference_t0"]) eikonal = config["eikonal"] min_phase_score = config["min_phase_score"] @@ -148,7 +150,8 @@ def extract_template_numpy( lock, ): - reference_t0 = config["reference_t0"] + # reference_t0 = config["reference_t0"] + reference_t0 = pd.Timestamp(config["reference_t0"]) template_array = np.memmap(template_fname, dtype=np.float32, mode="r+", shape=tuple(config["template_shape"])) traveltime_array = np.memmap(traveltime_fname, dtype=np.float32, mode="r+", shape=tuple(config["traveltime_shape"])) @@ -157,6 +160,11 @@ def extract_template_numpy( ) traveltime_mask = np.memmap(traveltime_mask_fname, dtype=bool, mode="r+", shape=tuple(config["traveltime_shape"])) + # template_array = np.zeros(tuple(config["template_shape"]), dtype=np.float32) + # traveltime_array = np.zeros(tuple(config["traveltime_shape"]), dtype=np.float32) + # traveltime_index_array = np.zeros(tuple(config["traveltime_shape"]), dtype=np.float32) + # traveltime_mask = np.zeros(tuple(config["traveltime_shape"]), dtype=bool) + for picks in picks_group: waveforms_dict = {} @@ -171,23 +179,28 @@ def extract_template_numpy( event = events.loc[idx_eve] ENZ = pick["ENZ"].split(",") + for c in ENZ: if c not in waveforms_dict: - with fsspec.open(c, "rb", anon=True) as f: - stream = obspy.read(f) - stream.merge(fill_value="latest") - if len(stream) > 1: - print(f"More than one trace: {stream}") - trace = stream[0] - if trace.stats.sampling_rate != config["sampling_rate"]: - if trace.stats.sampling_rate % config["sampling_rate"] == 0: - trace.decimate(int(trace.stats.sampling_rate / config["sampling_rate"])) - else: - trace.resample(config["sampling_rate"]) - # trace.detrend("linear") - # trace.taper(max_percentage=0.05, type="cosine") - trace.filter("bandpass", freqmin=2.0, freqmax=12.0, corners=4, zerophase=True) - waveforms_dict[c] = trace + try: + with fsspec.open(c, "rb", anon=True) as f: + stream = obspy.read(f) + stream.merge(fill_value="latest") + if len(stream) > 1: + print(f"More than one trace: {stream}") + trace = stream[0] + if trace.stats.sampling_rate != config["sampling_rate"]: + if trace.stats.sampling_rate % config["sampling_rate"] == 0: + trace.decimate(int(trace.stats.sampling_rate / config["sampling_rate"])) + else: + trace.resample(config["sampling_rate"]) + # trace.detrend("linear") + # trace.taper(max_percentage=0.05, type="cosine") + trace.filter("bandpass", freqmin=2.0, freqmax=12.0, corners=4, zerophase=True) + waveforms_dict[c] = trace + except Exception as e: + print(f"Error: {e}") + continue else: trace = waveforms_dict[c] @@ -222,7 +235,7 @@ def extract_template_numpy( traveltime_mask[idx_pick, ic, 0] = True trace_data = trace.data[begin_time_index:end_time_index].astype(np.float32) - template_array[idx_pick, ic, 0, : len(trace_data)] = trace_data + template_array[idx_pick, ic, 0, : len(trace_data)] = trace_data # - np.mean(trace_data) if lock is not None: with lock: @@ -278,14 +291,16 @@ def generate_pairs(picks, events, stations, max_pair_dist=10, max_neighbors=50, # %% -def cut_templates(root_path, region, config): +def cut_templates(jdays, root_path, region, config, bucket, protocol, token): + + # %% + fs = fsspec.filesystem(protocol, token=token) # %% # data_path = f"{region}/adloc" # result_path = f"{region}/cctorch" - - data_path = f"{region}/adloc_gamma" - result_path = f"{region}/cctorch_ca" + data_path = f"{region}/adloc2" + result_path = f"{region}/cctorch" if not os.path.exists(f"{root_path}/{result_path}"): os.makedirs(f"{root_path}/{result_path}") @@ -305,6 +320,12 @@ def cut_templates(root_path, region, config): # time_before_s = 0.3 # time_after_s = 4.0 - time_before_s # no_overlapping = False + # DEBUG + # time_before_p = 1.0 + # time_after_p = 5.0 - time_before_p + # time_before_s = 1.0 + # time_after_s = 5.0 - time_before_s + # no_overlapping = True time_window = max((time_before_p + time_after_p), (time_before_s + time_after_s)) nt = int(round(time_window * sampling_rate)) @@ -342,37 +363,11 @@ def cut_templates(root_path, region, config): } ) - # %% - stations = pd.read_csv(f"{root_path}/{data_path}/ransac_stations.csv") - stations.sort_values(by=["latitude", "longitude"], inplace=True) - print(f"{len(stations) = }") - print(stations.iloc[:5]) - - # %% - events = pd.read_csv(f"{root_path}/{data_path}/ransac_events.csv", parse_dates=["time"]) - events.rename(columns={"time": "event_time"}, inplace=True) - events["event_time"] = pd.to_datetime(events["event_time"], utc=True) - reference_t0 = events["event_time"].min() - events["event_timestamp"] = events["event_time"].apply(lambda x: (x - reference_t0).total_seconds()) - print(f"{len(events) = }") - print(events.iloc[:5]) - # %% lon0 = (config["minlongitude"] + config["maxlongitude"]) / 2 lat0 = (config["minlatitude"] + config["maxlatitude"]) / 2 proj = Proj(f"+proj=sterea +lon_0={lon0} +lat_0={lat0} +units=km") - # %% - stations[["x_km", "y_km"]] = stations.apply( - lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 - ) - stations["z_km"] = stations["elevation_m"].apply(lambda x: -x / 1e3) - - events[["x_km", "y_km"]] = events.apply( - lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 - ) - events["z_km"] = events["depth_km"] - xmin, ymin = proj(config["minlongitude"], config["minlatitude"]) xmax, ymax = proj(config["maxlongitude"], config["maxlatitude"]) # zmin, zmax = config["mindepth"], config["maxdepth"] @@ -408,218 +403,429 @@ def cut_templates(root_path, region, config): } eikonal = init_eikonal2d(eikonal) - # %% - # picks = pd.read_csv(f"{root_path}/{region}/adloc/ransac_picks.csv") - picks = pd.read_csv(f"{root_path}/{data_path}/ransac_picks.csv") - picks = picks[picks["adloc_mask"] == 1] - picks["phase_time"] = pd.to_datetime(picks["phase_time"], utc=True) - min_phase_score = picks["phase_score"].min() - - picks = picks.merge(events[["event_index", "event_timestamp"]], on="event_index") - # picks = picks.merge(stations[["station_id", "station_term_time"]], on="station_id") - picks = picks.merge(stations[["station_id", "station_term_time_p", "station_term_time_s"]], on="station_id") - picks["station_term_time"] = picks.apply( - lambda x: x[f"station_term_time_{x['phase_type'].lower()}"], axis=1 - ) ## Separate P and S station term - picks.drop(columns=["station_term_time_p", "station_term_time_s"], inplace=True) - picks["phase_timestamp"] = picks["phase_time"].apply(lambda x: (x - reference_t0).total_seconds()) - picks["traveltime"] = ( - picks["phase_timestamp"] - picks["event_timestamp"] - picks["station_term_time"] - ) ## Separate P and S station term - - picks = fillin_missing_picks( - picks, - events, - stations, - config={"reference_t0": reference_t0, "vp_vs_ratio": vp_vs_ratio, "min_phase_score": min_phase_score}, - ) - if "dist_km" not in picks: - picks = picks.merge(stations[["station_id", "x_km", "y_km", "z_km"]], on="station_id") - picks.rename(columns={"x_km": "station_x_km", "y_km": "station_y_km", "z_km": "station_z_km"}, inplace=True) - picks = picks.merge(events[["event_index", "x_km", "y_km", "z_km"]], on="event_index") - picks.rename(columns={"x_km": "event_x_km", "y_km": "event_y_km", "z_km": "event_z_km"}, inplace=True) - picks["dist_km"] = np.linalg.norm( - picks[["event_x_km", "event_y_km", "event_z_km"]].values - - picks[["station_x_km", "station_y_km", "station_z_km"]].values, - axis=-1, + for jday in jdays: + year, jday = jday.split(".") + year, jday = int(year), int(jday) + + if not os.path.exists(f"{root_path}/{result_path}/{year:04d}"): + os.makedirs(f"{root_path}/{result_path}/{year:04d}") + + # %% + # stations = pd.read_csv(f"{root_path}/{data_path}/ransac_stations.csv") + # stations = pd.read_csv("adloc_stations.csv") + station_json = f"{region}/network/stations.json" + if protocol == "file": + stations = pd.read_json(f"{root_path}/{station_json}", orient="index") + else: + with fs.open(f"{bucket}/{station_json}", "r") as fp: + stations = pd.read_json(fp, orient="index") + stations["station_id"] = stations.index + stations.sort_values(by=["latitude", "longitude"], inplace=True) + print(f"{len(stations) = }") + print(stations.iloc[:5]) + + # %% + # events = pd.read_csv(f"{root_path}/{data_path}/ransac_events.csv", parse_dates=["time"]) + # events = pd.read_csv("adloc_events.csv", parse_dates=["time"]) + if protocol == "file": + events = pd.read_csv( + f"{root_path}/{data_path}/{year:04d}/adloc_events_{jday:03d}.csv", parse_dates=["time"] + ) + else: + with fs.open(f"{bucket}/{data_path}/{year:04d}/adloc_events_{jday:03d}.csv", "r") as fp: + events = pd.read_csv(fp, parse_dates=["time"]) + + events.rename(columns={"time": "event_time"}, inplace=True) + events["event_time"] = pd.to_datetime(events["event_time"], utc=True) + reference_t0 = events["event_time"].min() + events["event_timestamp"] = events["event_time"].apply(lambda x: (x - reference_t0).total_seconds()) + print(f"{len(events) = }") + print(events.iloc[:5]) + + # %% + stations[["x_km", "y_km"]] = stations.apply( + lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 ) - picks.drop( - columns=["station_x_km", "station_y_km", "station_z_km", "event_x_km", "event_y_km", "event_z_km"], - inplace=True, + stations["z_km"] = stations["elevation_m"].apply(lambda x: -x / 1e3) + if "station_term_time_p" not in stations: + stations["station_term_time_p"] = 0.0 + if "station_term_time_s" not in stations: + stations["station_term_time_s"] = 0.0 + + events[["x_km", "y_km"]] = events.apply( + lambda x: pd.Series(proj(longitude=x.longitude, latitude=x.latitude)), axis=1 ) - - print(f"{len(picks) = }") - picks = picks[picks["dist_km"] < config["max_epicenter_dist_km"]] - print(f"{len(picks) = } with dist_km < {config['max_epicenter_dist_km']} km") - - # %% Reindex event and station to make it continuous - stations["idx_sta"] = np.arange(len(stations)) - events["idx_eve"] = np.arange(len(events)) - picks = picks.merge(events[["event_index", "idx_eve"]], on="event_index") - picks = picks.merge(stations[["station_id", "idx_sta"]], on="station_id") - - # %% - stations.to_csv(f"{root_path}/{result_path}/cctorch_stations.csv", index=False) - events.to_csv(f"{root_path}/{result_path}/cctorch_events.csv", index=False) - picks.to_csv(f"{root_path}/{result_path}/cctorch_picks.csv", index=False) - - # %% - pair_fname = f"{root_path}/{result_path}/pairs.txt" - config["pair_file"] = pair_fname - - # %% - nch = 3 ## [E,N,Z] components - nt = config["nt"] - npk = len(picks) - nev = len(events) - nst = len(stations) - print(f"npk: {npk}, nev: {nev}, nch: {nch}, nst: {nst}, nt: {nt}") - template_shape = (npk, 3, 1, nt) - traveltime_shape = (npk, 3, 1) - config["template_shape"] = template_shape - config["traveltime_shape"] = traveltime_shape - - template_fname = f"{root_path}/{result_path}/template.dat" - traveltime_fname = f"{root_path}/{result_path}/traveltime.dat" - traveltime_index_fname = f"{root_path}/{result_path}/traveltime_index.dat" - traveltime_mask_fname = f"{root_path}/{result_path}/traveltime_mask.dat" - config["template_file"] = template_fname - config["traveltime_file"] = traveltime_fname - config["traveltime_index_file"] = traveltime_index_fname - config["traveltime_mask_file"] = traveltime_mask_fname - - template_array = np.memmap(template_fname, dtype=np.float32, mode="w+", shape=template_shape) - traveltime_array = np.memmap(traveltime_fname, dtype=np.float32, mode="w+", shape=traveltime_shape) - traveltime_index_array = np.memmap(traveltime_index_fname, dtype=np.float32, mode="w+", shape=traveltime_shape) - traveltime_mask = np.memmap(traveltime_mask_fname, dtype=bool, mode="w+", shape=traveltime_shape) - - with open(f"{root_path}/{result_path}/config.json", "w") as f: - json.dump(config, f, indent=4, sort_keys=True) - - # %% - config["reference_t0"] = reference_t0 - events = events[["idx_eve", "x_km", "y_km", "z_km", "event_index", "event_time", "event_timestamp"]] - stations = stations[["idx_sta", "x_km", "y_km", "z_km", "station_id", "component", "network", "station"]] - picks = picks[ - [ - "idx_eve", - "idx_sta", - "phase_type", - "phase_score", - "phase_time", - "phase_timestamp", - "phase_source", - "station_id", + events["z_km"] = events["depth_km"] + + # %% + # picks = pd.read_csv(f"{root_path}/{region}/adloc/ransac_picks.csv") + # picks = pd.read_csv(f"{root_path}/{data_path}/ransac_picks.csv") + # picks = pd.read_csv("adloc_picks.csv") + if protocol == "file": + picks = pd.read_csv(f"{root_path}/{data_path}/{year:04d}/adloc_picks_{jday:03d}.csv") + else: + with fs.open(f"{bucket}/{data_path}/{year:04d}/adloc_picks_{jday:03d}.csv", "r") as fp: + picks = pd.read_csv(fp) + + picks = picks[picks["adloc_mask"] == 1] + picks["phase_time"] = pd.to_datetime(picks["phase_time"], utc=True) + min_phase_score = picks["phase_score"].min() + + picks = picks.merge(events[["event_index", "event_timestamp"]], on="event_index") + # picks = picks.merge(stations[["station_id", "station_term_time"]], on="station_id") + picks = picks.merge(stations[["station_id", "station_term_time_p", "station_term_time_s"]], on="station_id") + picks["station_term_time"] = picks.apply( + lambda x: x[f"station_term_time_{x['phase_type'].lower()}"], axis=1 + ) ## Separate P and S station term + picks.drop(columns=["station_term_time_p", "station_term_time_s"], inplace=True) + picks["phase_timestamp"] = picks["phase_time"].apply(lambda x: (x - reference_t0).total_seconds()) + picks["traveltime"] = ( + picks["phase_timestamp"] - picks["event_timestamp"] - picks["station_term_time"] + ) ## Separate P and S station term + + picks = fillin_missing_picks( + picks, + events, + stations, + config={"reference_t0": reference_t0, "vp_vs_ratio": vp_vs_ratio, "min_phase_score": min_phase_score}, + ) + if "dist_km" not in picks: + picks = picks.merge(stations[["station_id", "x_km", "y_km", "z_km"]], on="station_id") + picks.rename(columns={"x_km": "station_x_km", "y_km": "station_y_km", "z_km": "station_z_km"}, inplace=True) + picks = picks.merge(events[["event_index", "x_km", "y_km", "z_km"]], on="event_index") + picks.rename(columns={"x_km": "event_x_km", "y_km": "event_y_km", "z_km": "event_z_km"}, inplace=True) + picks["dist_km"] = np.linalg.norm( + picks[["event_x_km", "event_y_km", "event_z_km"]].values + - picks[["station_x_km", "station_y_km", "station_z_km"]].values, + axis=-1, + ) + picks.drop( + columns=["station_x_km", "station_y_km", "station_z_km", "event_x_km", "event_y_km", "event_z_km"], + inplace=True, + ) + + print(f"{len(picks) = }") + picks = picks[picks["dist_km"] < config["max_epicenter_dist_km"]] + print(f"{len(picks) = } with dist_km < {config['max_epicenter_dist_km']} km") + + # %% Reindex event and station to make it continuous + stations["idx_sta"] = np.arange(len(stations)) + events["idx_eve"] = np.arange(len(events)) + picks = picks.merge(events[["event_index", "idx_eve"]], on="event_index") + picks = picks.merge(stations[["station_id", "idx_sta"]], on="station_id") + + # %% + # stations.to_csv(f"{root_path}/{result_path}/cctorch_stations.csv", index=False) + # events.to_csv(f"{root_path}/{result_path}/cctorch_events.csv", index=False) + # picks.to_csv(f"{root_path}/{result_path}/cctorch_picks.csv", index=False) + # if not os.path.exists(f"{root_path}/{result_path}/{year:04d}"): + # os.makedirs(f"{root_path}/{result_path}/{year:04d}") + stations.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_stations_{jday:03d}.csv", index=False) + events.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_events_{jday:03d}.csv", index=False) + # picks.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_picks_{jday:03d}.csv", index=False) + + # %% + # pair_fname = f"{root_path}/{result_path}/pairs.txt" + pair_fname = f"{root_path}/{result_path}/{year:04d}/pairs_{jday:03d}.txt" + config["pair_file"] = pair_fname + + # %% + nch = 3 ## [E,N,Z] components + nt = config["nt"] + npk = len(picks) + nev = len(events) + nst = len(stations) + print(f"npk: {npk}, nev: {nev}, nch: {nch}, nst: {nst}, nt: {nt}") + template_shape = (npk, 3, 1, nt) + traveltime_shape = (npk, 3, 1) + config["template_shape"] = template_shape + config["traveltime_shape"] = traveltime_shape + + # template_fname = f"{root_path}/{result_path}/template.dat" + # traveltime_fname = f"{root_path}/{result_path}/traveltime.dat" + # traveltime_index_fname = f"{root_path}/{result_path}/traveltime_index.dat" + # traveltime_mask_fname = f"{root_path}/{result_path}/traveltime_mask.dat" + template_fname = f"{root_path}/{result_path}/{year:04d}/template_{jday:03d}.dat" + traveltime_fname = f"{root_path}/{result_path}/{year:04d}/traveltime_{jday:03d}.dat" + traveltime_index_fname = f"{root_path}/{result_path}/{year:04d}/traveltime_index_{jday:03d}.dat" + traveltime_mask_fname = f"{root_path}/{result_path}/{year:04d}/traveltime_mask_{jday:03d}.dat" + + config["template_file"] = template_fname + config["traveltime_file"] = traveltime_fname + config["traveltime_index_file"] = traveltime_index_fname + config["traveltime_mask_file"] = traveltime_mask_fname + + template_array = np.memmap(template_fname, dtype=np.float32, mode="w+", shape=template_shape) + traveltime_array = np.memmap(traveltime_fname, dtype=np.float32, mode="w+", shape=traveltime_shape) + traveltime_index_array = np.memmap(traveltime_index_fname, dtype=np.float32, mode="w+", shape=traveltime_shape) + traveltime_mask = np.memmap(traveltime_mask_fname, dtype=bool, mode="w+", shape=traveltime_shape) + + # with open(f"{root_path}/{result_path}/config.json", "w") as f: + with open(f"{root_path}/{result_path}/{year:04d}/config_{jday:03d}.json", "w") as f: + json.dump(config, f, indent=4, sort_keys=True) + + # %% + config["reference_t0"] = reference_t0.strftime("%Y-%m-%dT%H:%M:%S.%fZ") + events = events[["idx_eve", "x_km", "y_km", "z_km", "event_index", "event_time", "event_timestamp"]] + stations = stations[["idx_sta", "x_km", "y_km", "z_km", "station_id", "component", "network", "station"]] + picks = picks[ + [ + "idx_eve", + "idx_sta", + "phase_type", + "phase_score", + "phase_time", + "phase_timestamp", + "phase_source", + "station_id", + ] ] - ] - events.set_index("idx_eve", inplace=True) - stations.set_index("idx_sta", inplace=True) - picks.sort_values(by=["idx_eve", "idx_sta", "phase_type"], inplace=True) - picks["idx_pick"] = np.arange(len(picks)) + events.set_index("idx_eve", inplace=True) + stations.set_index("idx_sta", inplace=True) + picks.sort_values(by=["idx_eve", "idx_sta", "phase_type"], inplace=True) + picks["idx_pick"] = np.arange(len(picks)) + + # picks.to_csv(f"{root_path}/{result_path}/cctorch_picks.csv", index=False) + picks.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_picks_{jday:03d}.csv", index=False) + + ############################# CLOUD ######################################### + # dirs = sorted(glob(f"{root_path}/{region}/waveforms/????/???/??"), reverse=True) + protocol = "gs" + bucket = "quakeflow_catalog" + folder = "SC" + token_json = "application_default_credentials.json" + with open(token_json, "r") as fp: + token = json.load(fp) + fs = fsspec.filesystem(protocol=protocol, token=token) + # year = 2019 + mseeds_df = [] + for folder in ["SC", "NC"]: + with fs.open(f"{bucket}/{folder}/mseed_list/{year}_3c.txt", "r") as f: + mseeds = f.readlines() + mseeds = [x.strip("\n") for x in mseeds] + mseeds = pd.DataFrame(mseeds, columns=["ENZ"]) + if folder == "SC": + mseeds["fname"] = mseeds["ENZ"].apply(lambda x: x.split("/")[-1]) + mseeds["network"] = mseeds["fname"].apply(lambda x: x[:2]) + mseeds["station"] = mseeds["fname"].apply(lambda x: x[2:7].strip("_")) + mseeds["instrument"] = mseeds["fname"].apply(lambda x: x[7:9]) + mseeds["location"] = mseeds["fname"].apply(lambda x: x[10:12].strip("_")) + mseeds["year"] = mseeds["fname"].apply(lambda x: x[13:17]) + mseeds["jday"] = mseeds["fname"].apply(lambda x: x[17:20]) + if folder == "NC": + mseeds["fname"] = mseeds["ENZ"].apply(lambda x: x.split("/")[-1]) + mseeds["network"] = mseeds["fname"].apply(lambda x: x.split(".")[1]) + mseeds["station"] = mseeds["fname"].apply(lambda x: x.split(".")[0]) + mseeds["instrument"] = mseeds["fname"].apply(lambda x: x.split(".")[2][:-1]) + mseeds["location"] = mseeds["fname"].apply(lambda x: x.split(".")[3]) + mseeds["year"] = mseeds["fname"].apply(lambda x: x.split(".")[5]) + mseeds["jday"] = mseeds["fname"].apply(lambda x: x.split(".")[6]) + mseeds_df.append(mseeds) + mseeds_df = pd.concat(mseeds_df) + print(mseeds_df.head()) + print(mseeds_df.tail()) + + picks["network"] = picks["station_id"].apply(lambda x: x.split(".")[0]) + picks["station"] = picks["station_id"].apply(lambda x: x.split(".")[1]) + picks["location"] = picks["station_id"].apply(lambda x: x.split(".")[2]) + picks["instrument"] = picks["station_id"].apply(lambda x: x.split(".")[3]) + picks["year"] = picks["phase_time"].dt.strftime("%Y") + picks["jday"] = picks["phase_time"].dt.strftime("%j") + + mseeds_df = mseeds_df[(mseeds_df["year"].astype(int) == year) & (mseeds_df["jday"].astype(int) == jday)] + picks = picks[(picks["year"].astype(int) == year) & (picks["jday"].astype(int) == jday)] + + picks = picks.merge(mseeds_df, on=["network", "station", "location", "instrument", "year", "jday"]) + picks.drop(columns=["fname", "station_id", "network", "location", "instrument", "year", "jday"], inplace=True) + + if len(picks) == 0: + print(f"No picks found for {year:04d}/{jday:03d}") + continue + + # #### + # out = picks.drop(columns=["ENZ"]) + # out.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_picks_{jday:03d}.csv", index=False) + # events.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_events_{jday:03d}.csv", index=False) + # stations.to_csv(f"{root_path}/{result_path}/{year:04d}/cctorch_stations_{jday:03d}.csv", index=False) + + ########################################################################### + + picks_group = picks.copy() + picks_group = picks_group.groupby("ENZ") + + ############################################################ + + # ncpu = min(32, mp.cpu_count()) + ncpu = 32 + # nsplit = min(ncpu * 8, len(picks_group)) + nsplit = len(picks_group) + print(f"Using {ncpu} cores") + + pbar = tqdm(total=nsplit, desc="Cutting templates") + ctx = mp.get_context("spawn") + + with ctx.Manager() as manager: + lock = manager.Lock() + with ctx.Pool(ncpu) as pool: + jobs = [] + + group_chunk = np.array_split(list(picks_group.groups.keys()), nsplit) + picks_group_chunk = [[picks_group.get_group(g) for g in group] for group in group_chunk] + + for picks_group in picks_group_chunk: + + job = pool.apply_async( + extract_template_numpy, + ( + template_fname, + traveltime_fname, + traveltime_index_fname, + traveltime_mask_fname, + picks_group, + events, + config, + lock, + ), + callback=lambda x: pbar.update(), + ) + jobs.append(job) + + pool.close() + pool.join() + for job in jobs: + out = job.get() + if out is not None: + print(out) + + pbar.close() + + # pairs = generate_pairs( + # picks, + # events, + # stations, + # max_pair_dist=config["max_pair_dist_km"], + # fname=config["pair_file"], + # ) + + if protocol == "gs": + fs.put( + f"{root_path}/{result_path}/{year:04d}/cctorch_picks_{jday:03d}.csv", + f"{bucket}/{result_path}/{year:04d}/cctorch_picks_{jday:03d}.csv", + ) + fs.put( + f"{root_path}/{result_path}/{year:04d}/cctorch_events_{jday:03d}.csv", + f"{bucket}/{result_path}/{year:04d}/cctorch_events_{jday:03d}.csv", + ) + # fs.put( + # f"{root_path}/{result_path}/{year:04d}/cctorch_stations_{jday:03d}.csv", + # f"{bucket}/{result_path}/{year:04d}/cctorch_stations_{jday:03d}.csv", + # ) + fs.put( + f"{root_path}/{result_path}/{year:04d}/config_{jday:03d}.json", + f"{bucket}/{result_path}/{year:04d}/config_{jday:03d}.json", + ) + fs.put( + f"{root_path}/{result_path}/{year:04d}/template_{jday:03d}.dat", + f"{bucket}/{result_path}/{year:04d}/template_{jday:03d}.dat", + ) + print( + f"{root_path}/{result_path}/{year:04d}/template_{jday:03d}.npy -> {bucket}/{result_path}/{year:04d}/template_{jday:03d}.npy" + ) + + +def parse_args(): + parser = argparse.ArgumentParser(description="Run Gamma on NCEDC/SCEDC data") + parser.add_argument("--num_nodes", type=int, default=1) + parser.add_argument("--node_rank", type=int, default=0) + parser.add_argument("--year", type=int, default=2023) + parser.add_argument("--root_path", type=str, default="local") + parser.add_argument("--region", type=str, default="Cal") + parser.add_argument("--bucket", type=str, default="quakeflow_catalog") + return parser.parse_args() - picks.to_csv(f"{root_path}/{result_path}/cctorch_picks.csv", index=False) - ############################# CLOUD ######################################### - # dirs = sorted(glob(f"{root_path}/{region}/waveforms/????/???/??"), reverse=True) +# %% +if __name__ == "__main__": + # %% protocol = "gs" - bucket = "quakeflow_catalog" - folder = "SC" - token_json = "application_default_credentials.json" + token_json = f"application_default_credentials.json" with open(token_json, "r") as fp: token = json.load(fp) - fs = fsspec.filesystem(protocol=protocol, token=token) - year = 2019 - with fs.open(f"{bucket}/{folder}/mseed_list/{year}_3c.txt", "r") as f: - mseeds = f.readlines() - mseeds = [x.strip("\n") for x in mseeds] - mseeds = pd.DataFrame(mseeds, columns=["ENZ"]) - mseeds["fname"] = mseeds["ENZ"].apply(lambda x: x.split("/")[-1]) - mseeds["network"] = mseeds["fname"].apply(lambda x: x[:2]) - mseeds["station"] = mseeds["fname"].apply(lambda x: x[2:7].strip("_")) - mseeds["instrument"] = mseeds["fname"].apply(lambda x: x[7:9]) - mseeds["location"] = mseeds["fname"].apply(lambda x: x[10:12].strip("_")) - mseeds["year"] = mseeds["fname"].apply(lambda x: x[13:17]) - mseeds["jday"] = mseeds["fname"].apply(lambda x: x[17:20]) - - picks["network"] = picks["station_id"].apply(lambda x: x.split(".")[0]) - picks["station"] = picks["station_id"].apply(lambda x: x.split(".")[1]) - picks["location"] = picks["station_id"].apply(lambda x: x.split(".")[2]) - picks["instrument"] = picks["station_id"].apply(lambda x: x.split(".")[3]) - picks["year"] = picks["phase_time"].dt.strftime("%Y") - picks["jday"] = picks["phase_time"].dt.strftime("%j") - picks = picks.merge(mseeds, on=["network", "station", "location", "instrument", "year", "jday"]) - picks.drop(columns=["fname", "station_id", "network", "location", "instrument", "year", "jday"], inplace=True) - - picks_group = picks.copy() - picks_group = picks_group.groupby("ENZ") - - ############################################################ - - ncpu = min(16, mp.cpu_count()) - nsplit = min(ncpu * 2, len(picks_group)) - print(f"Using {ncpu} cores") - - pbar = tqdm(total=nsplit, desc="Cutting templates") - ctx = mp.get_context("spawn") - - with ctx.Manager() as manager: - lock = manager.Lock() - with ctx.Pool(ncpu) as pool: - jobs = [] - - group_chunk = np.array_split(list(picks_group.groups.keys()), nsplit) - picks_group_chunk = [[picks_group.get_group(g) for g in group] for group in group_chunk] - - for picks_group in picks_group_chunk: - - job = pool.apply_async( - extract_template_numpy, - ( - template_fname, - traveltime_fname, - traveltime_index_fname, - traveltime_mask_fname, - picks_group, - events, - config, - lock, - ), - callback=lambda x: pbar.update(), - ) - jobs.append(job) - pool.close() - pool.join() - # for job in jobs: - # out = job.get() - # if out is not None: - # print(out) - - pbar.close() - - pairs = generate_pairs( - picks, - events, - stations, - max_pair_dist=config["max_pair_dist_km"], - fname=config["pair_file"], - ) + fs = fsspec.filesystem(protocol, token=token) -# %% -if __name__ == "__main__": + # # %% + # args = parse_args() + # root_path = args.root_path + # region = args.region + + # with open(f"{root_path}/{region}/config.json", "r") as fp: + # config = json.load(fp) + + # config.update(config["cctorch"]) # %% args = parse_args() - root_path = args.root_path region = args.region + root_path = args.root_path + bucket = args.bucket + num_nodes = args.num_nodes + node_rank = args.node_rank + year = args.year - with open(f"{root_path}/{region}/config.json", "r") as fp: + # %% + # with fs.open(f"{bucket}/{region}/config.json", "r") as fp: + # config = json.load(fp) + with open("config.json", "r") as fp: config = json.load(fp) - - config.update(config["cctorch"]) + config["world_size"] = num_nodes + config.update(vars(args)) # %% print(json.dumps(config, indent=4, sort_keys=True)) - cut_templates(root_path, region, config) + # cut_templates(root_path, region, config) + + # %% + # events = [] + # picks = [] + # jobs = [] + # ctx = mp.get_context("spawn") + # ncpu = min(32, mp.cpu_count()) + # years = [2023] + # num_days = sum([366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 for year in years]) + # pbar = tqdm(total=num_days, desc="Loading data") + # with ctx.Pool(processes=ncpu) as pool: + # for year in years: + # num_jday = 366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 + # for jday in range(1, num_jday + 1): + # cut_templates(year, jday, root_path, region, config, bucket, protocol, token) + # # job = pool.apply_async( + # # cut_templates, + # # args=(year, jday, root_path, region, config, bucket, protocol, token), + # # callback=lambda x: pbar.update(), + # # ) + # # jobs.append(job) + + # pool.close() + # pool.join() + + # pbar.close() + + # %% + # years = [2023] + # years = [2019] + # jdays = [] + # for year in years: + # num_jday = 366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 + # jdays.extend([f"{year}.{i:03d}" for i in range(1, num_jday + 1)]) + + num_jday = 366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 + jdays = [f"{year}.{i:03d}" for i in range(1, num_jday + 1)] + + jdays = np.array_split(jdays, num_nodes)[node_rank] + # jdays = ["2019.185"] + print(f"{node_rank}/{num_nodes}: {jdays[0] = }, {jdays[-1] = }") + + cut_templates(jdays, root_path, region, config, bucket, protocol, token) diff --git a/examples/california/run_cctorch.py b/examples/california/run_cctorch.py new file mode 100644 index 0000000..c789a3f --- /dev/null +++ b/examples/california/run_cctorch.py @@ -0,0 +1,107 @@ +# %% +import os + +import torch +from args import parse_args + +args = parse_args() +# %% +root_path = args.root_path +region = args.region + +data_path = f"{region}/cctorch" +result_path = f"{region}/cctorch/ccpairs" + +# data_path = f"{region}/cctorch_ca" +# result_path = f"{region}/cctorch_ca/ccpairs" + +# data_path = f"{region}/cctorch_gamma" +# result_path = f"{region}/cctorch_gamma/ccpairs" + +if not os.path.exists(f"{root_path}/{result_path}"): + os.makedirs(f"{root_path}/{result_path}") + + +## based on GPU memory + +batch = 1_024 +block_size1 = 1000_000 +block_size2 = 1000_000 + +if args.dtct_pair: + dt_ct = f"{root_path}/{region}/hypodd/dt.ct" + pair_list = f"{root_path}/{region}/hypodd/pairs.txt" + lines = [] + with open(dt_ct, "r") as fp: + for line in fp: + if line.startswith("#"): + ev1, ev2 = line.split()[1:3] + if ev1 > ev2: + ev1, ev2 = ev2, ev1 + lines.append(f"{ev1},{ev2}\n") + + print(f"Number of pairs from hypodd dt.ct: {len(lines)}") + with open(f"{root_path}/{region}/hypodd/pairs.txt", "w") as fp: + fp.writelines(lines) + base_cmd = f"../CCTorch/run.py --pair_list={root_path}/{region}/hypodd/pairs.txt --data_path1={root_path}/{region}/cctorch/template.dat --data_format1=memmap --config={root_path}/{region}/cctorch/config.json --batch_size={batch} --block_size1={block_size1} --block_size2={block_size2} --result_path={root_path}/{result_path}" + +else: + base_cmd = ( + f"../CCTorch/run.py --pair_list={root_path}/{data_path}/pairs.txt --data_path1={root_path}/{data_path}/template.dat --data_format1=memmap " + f"--data_list1={root_path}/{data_path}/cctorch_picks.csv " + f"--events_csv={root_path}/{data_path}/cctorch_events.csv --picks_csv={root_path}/{data_path}/cctorch_picks.csv --stations_csv={root_path}/{data_path}/cctorch_stations.csv " + f"--config={root_path}/{data_path}/config.json --batch_size={batch} --block_size1={block_size1} --block_size2={block_size2} " + f"--result_path={root_path}/{result_path}" + ) + + +if torch.cuda.is_available(): + device = "cuda" + num_gpu = torch.cuda.device_count() +elif torch.backends.mps.is_available(): + device = "mps" + num_gpu = 0 +else: + device = "cpu" + num_gpu = 0 + +if num_gpu > 0: + cmd = f"torchrun --standalone --nproc_per_node {num_gpu} {base_cmd} --device={device}" +else: + cmd = f"python {base_cmd} --device={device}" +print(cmd) +os.system(cmd) + +# %% +for rank in range(num_gpu): + if not os.path.exists(f"{root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv"): + continue + if rank == 0: + cmd = f"cat {root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv > {root_path}/{data_path}/dtcc.csv" + else: + cmd = ( + f"tail -n +2 {root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv >> {root_path}/{data_path}/dtcc.csv" + ) + print(cmd) + os.system(cmd) + + +cmd = f"cat {root_path}/{result_path}/CC_*_{num_gpu:03d}_dt.cc > {root_path}/{data_path}/dt.cc" +print(cmd) +os.system(cmd) + +# # %% +# os.chdir(f"{root_path}/{region}/cctorch") +# source_file = f"ccpairs/CC_{num_gpu:03d}_dt.cc" +# target_file = f"dt.cc" +# print(f"{source_file} -> {target_file}") +# if os.path.lexists(target_file): +# os.remove(target_file) +# os.symlink(source_file, target_file) + +# source_file = f"ccpairs/CC_{num_gpu:03d}.csv" +# target_file = f"dtcc.csv" +# print(f"{source_file} -> {target_file}") +# if os.path.lexists(target_file): +# os.remove(target_file) +# os.symlink(source_file, target_file) diff --git a/examples/california/submit_adloc.py b/examples/california/submit_adloc.py index c83b4c1..5f7b9a9 100644 --- a/examples/california/submit_adloc.py +++ b/examples/california/submit_adloc.py @@ -24,7 +24,7 @@ def parse_args(): setup=""" echo "Begin setup." echo export WANDB_API_KEY=$WANDB_API_KEY >> ~/.bashrc -pip install -U h5py tqdm wandb pandas numpy scipy scikit-learn +pip install -U h5py tqdm wandb pandas scipy scikit-learn numpy==1.26.4 pip install -U fsspec gcsfs pip install -U obspy pyproj pip install -e /opt/ADLoc diff --git a/examples/california/submit_cctorch.py b/examples/california/submit_cctorch.py new file mode 100644 index 0000000..074c573 --- /dev/null +++ b/examples/california/submit_cctorch.py @@ -0,0 +1,117 @@ +import argparse +import time +from concurrent.futures import ThreadPoolExecutor + +import sky + + +# NUM_NODES = 8 +def parse_args(): + parser = argparse.ArgumentParser() + parser.add_argument("--num_nodes", type=int, default=32) + parser.add_argument("--year", type=int, default=2023) + parser.add_argument("--region", type=str, default="CA") + return parser.parse_args() + + +args = parse_args() +NUM_NODES = args.num_nodes +YEAR = args.year +REGION = args.region + +task = sky.Task( + name="run_adloc", + setup=""" +echo "Begin setup." +echo export WANDB_API_KEY=$WANDB_API_KEY >> ~/.bashrc +pip install -U h5py tqdm wandb pandas scipy scikit-learn numpy==1.26.4 +pip install -U fsspec gcsfs s3fs +pip install -U obspy pyproj +pip install -e /opt/ADLoc +""", + run=""" +num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l` +master_addr=`echo "$SKYPILOT_NODE_IPS" | head -n1` +if [ "$SKYPILOT_NODE_RANK" == "0" ]; then + ls -al /opt + ls -al /data + ls -al ./ +fi +python cut_templates_cc.py --num_node $NUM_NODES --node_rank $NODE_RANK --year $YEAR +""", + workdir=".", + num_nodes=1, + envs={"NUM_NODES": NUM_NODES, "NODE_RANK": 0, "YEAR": YEAR}, +) + +task.set_file_mounts( + { + "/opt/ADLoc": "../../ADLoc", + }, +) +# task.set_storage_mounts({ +# '/remote/imagenet/': sky.Storage(name='my-bucket', +# source='/local/imagenet'), +# }) +task.set_resources( + sky.Resources( + cloud=sky.GCP(), + region="us-west1", # GCP + # region="us-west-2", # AWS + accelerators=None, + cpus=16, + disk_tier="low", + disk_size=50, # GB + memory=None, + use_spot=False, + ), +) + +# for NODE_RANK in range(NUM_NODES): +# task.update_envs({"NODE_RANK": NODE_RANK}) +# cluster_name = f"cctorch-{NODE_RANK:02d}" +# print(f"Launching cluster {cluster_name}-{NUM_NODES}...") +# sky.jobs.launch( +# task, +# name=f"{cluster_name}", +# ) + +jobs = [] +try: + sky.status(refresh=True) +except Exception as e: + print(e) + +with ThreadPoolExecutor(max_workers=NUM_NODES) as executor: + for NODE_RANK in range(NUM_NODES): + + task.update_envs({"NODE_RANK": NODE_RANK}) + cluster_name = f"cctorch-{YEAR}-{NODE_RANK:02d}" + + status = sky.status(cluster_names=[f"{cluster_name}"], refresh=True) + if len(status) > 0: + if status[0]["status"].value == "INIT": + sky.down(f"{cluster_name}") + if (not status[0]["to_down"]) and (not status[0]["status"].value == "INIT"): + sky.autostop(f"{cluster_name}", idle_minutes=10, down=True) + print(f"Cluster {cluster_name}/{NUM_NODES} already exists.") + continue + + status = sky.status(cluster_names=[f"{cluster_name}"]) + if len(status) == 0: + print(f"Launching cluster {cluster_name}/{NUM_NODES}...") + jobs.append( + executor.submit( + sky.launch, + task, + cluster_name=f"{cluster_name}", + idle_minutes_to_autostop=10, + down=True, + detach_setup=True, + detach_run=True, + ) + ) + time.sleep(5) + +for job in jobs: + print(job.result()) diff --git a/examples/california/submit_gamma.py b/examples/california/submit_gamma.py index ebdfe76..477a3fa 100644 --- a/examples/california/submit_gamma.py +++ b/examples/california/submit_gamma.py @@ -24,7 +24,7 @@ def parse_args(): setup=""" echo "Begin setup." echo export WANDB_API_KEY=$WANDB_API_KEY >> ~/.bashrc -pip install -U h5py tqdm wandb pandas numpy scipy +pip install -U h5py tqdm wandb pandas scipy numpy==1.26.4 pip install -U fsspec gcsfs pip install -U obspy pyproj pip install -e /opt/GaMMA diff --git a/examples/california/submit_phasenet.py b/examples/california/submit_phasenet.py index 00abe76..7eed14a 100644 --- a/examples/california/submit_phasenet.py +++ b/examples/california/submit_phasenet.py @@ -44,7 +44,7 @@ def parse_args(): setup=""" echo "Begin setup." echo export WANDB_API_KEY=$WANDB_API_KEY >> ~/.bashrc -pip install h5py tqdm wandb pandas numpy scipy +pip install h5py tqdm wandb pandas scipy numpy==1.26.4 pip install fsspec gcsfs s3fs pip install obspy pyproj # pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 diff --git a/scripts/load_cloud_templates.py b/scripts/load_cloud_templates.py new file mode 100644 index 0000000..05829b9 --- /dev/null +++ b/scripts/load_cloud_templates.py @@ -0,0 +1,76 @@ +# %% +import json +import os +from concurrent.futures import ThreadPoolExecutor + +import fsspec +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from tqdm import tqdm + +# %% +if __name__ == "__main__": + + # %% + result_path = "results/" + if not os.path.exists(result_path): + os.makedirs(result_path) + + # %% + protocol = "gs" + token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" + with open(token_json, "r") as fp: + token = json.load(fp) + + bucket = "quakeflow_catalog" + folder = "Cal/cctorch" + + fs = fsspec.filesystem(protocol, token=token) + + # %% + def plot_templates(templates, events, picks): + templates = templates - np.nanmean(templates, axis=(-1), keepdims=True) + std = np.std(templates, axis=(-1), keepdims=True) + std[std == 0] = 1.0 + templates = templates / std + + plt.figure(figsize=(10, 10)) + plt.imshow(templates[:, -1, 0, :], origin="lower", aspect="auto", vmin=-0.3, vmax=0.3, cmap="RdBu_r") + plt.colorbar() + plt.show() + + # %% + years = [2023] + + for year in years: + num_jday = 366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 + + for jday in range(1, num_jday + 1): + + if not fs.exists(f"{bucket}/{folder}/{year}/template_{jday:03d}.dat"): + continue + + with fs.open(f"{bucket}/{folder}/{year}/cctorch_picks_{jday:03d}.csv", "r") as fp: + picks = pd.read_csv(fp, dtype=str) + with fs.open(f"{bucket}/{folder}/{year}/cctorch_events_{jday:03d}.csv", "r") as fp: + events = pd.read_csv(fp, dtype=str) + with fs.open(f"{bucket}/{folder}/{year}/config_{jday:03d}.json", "r") as fp: + config = json.load(fp) + template_file = fs.open(f"{bucket}/{folder}/{year}/template_{jday:03d}.dat", "rb") + templates = np.frombuffer(template_file.read(), dtype=np.float32).reshape(tuple(config["template_shape"])) + template_file.close() + + print(f"events: {len(events):,} ") + print(f"picks: {len(picks):,} ") + print(f"templates: {templates.shape}") + + picks.to_csv(f"{result_path}/picks_{year:04d}_{jday:03d}.csv", index=False) + events.to_csv(f"{result_path}/events_{year:04d}_{jday:03d}.csv", index=False) + np.save(f"{result_path}/templates_{year:04d}_{jday:03d}.npy", templates) + + plot_templates(templates, events, picks) + + # break + +# %%