Skip to content

Commit

Permalink
Merge pull request #343 from lincc-frameworks/save_to_parquet
Browse files Browse the repository at this point in the history
Save to parquet
  • Loading branch information
dougbrn authored Jan 11, 2024
2 parents 1644284 + fcf67da commit 3c20f13
Show file tree
Hide file tree
Showing 6 changed files with 532 additions and 50 deletions.
82 changes: 57 additions & 25 deletions docs/tutorials/working_with_the_ensemble.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@
"import numpy as np\n",
"import pandas as pd\n",
"\n",
"np.random.seed(1) \n",
"np.random.seed(1)\n",
"\n",
"# Generate 10 astronomical objects\n",
"n_obj = 10\n",
"ids = 8000 + np.arange(n_obj)\n",
"names = ids.astype(str)\n",
"object_table = pd.DataFrame(\n",
" {\n",
" \"id\": ids, \n",
" \"id\": ids,\n",
" \"name\": names,\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"ddf_bool\": np.random.randint(0, 2, n_obj), # 0 if from deep drilling field, 1 otherwise\n",
" \"libid_cadence\": np.random.randint(1, 130, n_obj),\n",
" }\n",
")\n",
Expand All @@ -49,7 +49,7 @@
" {\n",
" \"id\": 8000 + (np.arange(num_points) % n_obj),\n",
" \"time\": np.arange(num_points),\n",
" \"flux\": np.random.random_sample(size=num_points)*10,\n",
" \"flux\": np.random.random_sample(size=num_points) * 10,\n",
" \"band\": np.repeat(all_bands, num_points / len(all_bands)),\n",
" \"error\": np.random.random_sample(size=num_points),\n",
" \"count\": np.arange(num_points),\n",
Expand Down Expand Up @@ -89,7 +89,8 @@
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\",\n",
" npartitions=1)"
" npartitions=1,\n",
")"
]
},
{
Expand Down Expand Up @@ -124,18 +125,12 @@
"from tape.utils import ColumnMapper\n",
"\n",
"# columns assigned manually\n",
"col_map = ColumnMapper().assign(id_col=\"id\",\n",
" time_col=\"time\",\n",
" flux_col=\"flux\",\n",
" err_col=\"error\",\n",
" band_col=\"band\")\n",
"col_map = ColumnMapper().assign(\n",
" id_col=\"id\", time_col=\"time\", flux_col=\"flux\", err_col=\"error\", band_col=\"band\"\n",
")\n",
"\n",
"# Pass the ColumnMapper along to from_pandas\n",
"ens.from_pandas(\n",
" source_frame=source_table,\n",
" object_frame=object_table,\n",
" column_mapper=col_map,\n",
" npartitions=1)"
"ens.from_pandas(source_frame=source_table, object_frame=object_table, column_mapper=col_map, npartitions=1)"
]
},
{
Expand Down Expand Up @@ -616,8 +611,8 @@
"metadata": {},
"outputs": [],
"source": [
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"ens.add_frame(ens.select_frame(\"stetson_j\"), \"stetson_j_result_1\") # Add result under new label\n",
"ens.drop_frame(\"stetson_j\") # Drop original label\n",
"\n",
"ens.select_frame(\"stetson_j_result_1\").compute()"
]
Expand Down Expand Up @@ -655,7 +650,7 @@
"ens.drop_frame(\"result_1\")\n",
"\n",
"try:\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
" ens.select_frame(\"result_1\") # This should result in a KeyError since the frame has been dropped.\n",
"except Exception as e:\n",
" print(\"As expected, the frame 'result_1 was dropped.\\n\" + str(e))"
]
Expand Down Expand Up @@ -825,6 +820,50 @@
"We see that we now have a `Pandas.series` of `my_average_flux` result by object_id (lightcurve). In many cases, this may not be the ideal output for your function. This output is controlled by the `Dask` `meta` parameter. For more information on this parameter, you can read the `Dask` [documentation](https://blog.dask.org/2022/08/09/understanding-meta-keyword-argument). You may pass the `meta` parameter through `Ensemble.batch`, as shown above."
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Saving the Ensemble to Disk\n",
"\n",
"In some situations, you may find yourself running a given workflow many times. Due to the nature of lazy-computation, this will involve repeated execution of data I/O, pre-processing steps, initial analysis, etc. In these situations, it may be effective to instead save the ensemble state to disk after completion of these initial processing steps. To accomplish this, we can use the `Ensemble.save_ensemble` function."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ens.save_ensemble(\n",
" \".\",\n",
" \"ensemble\",\n",
" additional_frames=[\"result_3\"],\n",
") # Saves object, source, and result_3 to disk"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"The above command creates an \"ensemble\" directory in the current working directory. This directory contains a subdirectory of parquet files for each `EnsembleFrame` object that was included in the `additional_frames` kwarg. Note that if `additional_frames` was set to True or False this would save all or none of the additional `EnsembleFrame` objects respectively, and that the object (unless it has no columns) and source frames are always saved.\n",
"\n",
"From here, we can just load the ensemble from disk."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"new_ens = Ensemble(client=ens.client) # use the same client\n",
"new_ens.from_ensemble(\"./ensemble\", additional_frames=True)\n",
"new_ens.select_frame(\"result_3\").head(5)"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand All @@ -838,13 +877,6 @@
"source": [
"ens.client.close() # Tear down the ensemble client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
Loading

0 comments on commit 3c20f13

Please sign in to comment.