Skip to content

Commit

Permalink
Merge pull request #344 from nasa-fornax/panstarrs_hipscat
Browse files Browse the repository at this point in the history
panstarrs_get_lightcurve using lsdb
  • Loading branch information
bsipocz authored Oct 23, 2024
2 parents d1c236e + fedbd7e commit ee9f4f1
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 297 deletions.
376 changes: 101 additions & 275 deletions light_curves/code_src/panstarrs_functions.py
Original file line number Diff line number Diff line change
@@ -1,236 +1,14 @@
import numpy as np
import pandas as pd
import requests
from astropy.table import Table
from tqdm.auto import tqdm

import lsdb
from dask.distributed import Client
from data_structures import MultiIndexDFObject

# code partially taken from https://ps1images.stsci.edu/ps1_dr2_api.html
def ps1cone(ra,dec,radius,table="mean",release="dr1",format="csv",columns=None,
baseurl="https://catalogs.mast.stsci.edu/api/v0.1/panstarrs", verbose=False,
**kw):
"""Do a cone search of the PS1 catalog
Parameters
----------
ra: float (degrees)
J2000 Right Ascension
dec: float (degrees)
J2000 Declination
radius: float (degrees)
Search radius (<= 0.5 degrees)
table: string
mean, stack, or detection
release: string
dr1 or dr2
format: string
csv, votable, json
columns: list of strings
list of column names to include (None means use defaults)
baseurl: stirng
base URL for the request
verbose: int
print info about request
**kw:
other parameters (e.g., 'nDetections.min':2)
Returns
-------
cone search results
"""

data = kw.copy()
data['ra'] = ra
data['dec'] = dec
data['radius'] = radius
return ps1search(table=table,release=release,format=format,columns=columns,
baseurl=baseurl, verbose=verbose, **data)


def ps1search(table="mean",release="dr1",format="csv",columns=None,
baseurl="https://catalogs.mast.stsci.edu/api/v0.1/panstarrs", verbose=False,
**kw):
"""Do a general search of the PS1 catalog (possibly without ra/dec/radius)
Parameters
----------
table: string
mean, stack, or detection
release: string
dr1 or dr2
format: string
csv, votable, json
columns: list of strings
list of column names to include (None means use defaults)
baseurl: string
base URL for the request
verbose: int
print info about request
**kw:
other parameters (e.g., 'nDetections.min':2). Note this is required!
Returns
-------
search results
"""

data = kw.copy()
if not data:
raise ValueError("You must specify some parameters for search")
checklegal(table,release)
if format not in ("csv","votable","json"):
raise ValueError("Bad value for format")
url = f"{baseurl}/{release}/{table}.{format}"
if columns:
# check that column values are legal
# create a dictionary to speed this up
dcols = {}
for col in ps1metadata(table,release)['name']:
dcols[col.lower()] = 1
badcols = []
for col in columns:
if col.lower().strip() not in dcols:
badcols.append(col)
if badcols:
raise ValueError('Some columns not found in table: {}'.format(', '.join(badcols)))
# two different ways to specify a list of column values in the API
# data['columns'] = columns
data['columns'] = '[{}]'.format(','.join(columns))

# either get or post works
# r = requests.post(url, data=data)
r = requests.get(url, params=data)

if verbose:
print(r.url)
r.raise_for_status()
if format == "json":
return r.json()
else:
return r.text


def checklegal(table,release):
"""Checks if this combination of table and release is acceptable
Raises a ValueError exception if there is problem
Parameters
----------
table: string
mean, stack, or detection
release: string
dr1 or dr2
"""

releaselist = ("dr1", "dr2")
if release not in ("dr1","dr2"):
raise ValueError("Bad value for release (must be one of {})".format(', '.join(releaselist)))
if release=="dr1":
tablelist = ("mean", "stack")
else:
tablelist = ("mean", "stack", "detection")
if table not in tablelist:
raise ValueError("Bad value for table (for {} must be one of {})".format(release, ", ".join(tablelist)))


def ps1metadata(table="mean",release="dr1",
baseurl="https://catalogs.mast.stsci.edu/api/v0.1/panstarrs"):
"""Return metadata for the specified catalog and table
Parameters
----------
table: string
mean, stack, or detection
release: string
dr1 or dr2
baseurl: string
base URL for the request
Returns
-------
astropy table with columns name, type, description
"""

checklegal(table,release)
url = f"{baseurl}/{release}/{table}/metadata"
r = requests.get(url)
r.raise_for_status()
v = r.json()
# convert to astropy table
tab = Table(rows=[(x['name'],x['type'],x['description']) for x in v],
names=('name','type','description'))
return tab


def addfilter(dtab):
"""Add filter name as column in detection table by translating filterID
This modifies the table in place. If the 'filter' column already exists,
the table is returned unchanged.
Parameters
----------
dtab: table
detection table
Returns
-------
dtab: table
"""
if 'filter' not in dtab.colnames:
# the filterID value goes from 1 to 5 for grizy
#id2filter = np.array(list('grizy'))
id2filter = np.array(['panstarrs g','panstarrs r','panstarrs i','panstarrs z','panstarrs y'])
dtab['filter'] = id2filter[(dtab['filterID']-1).data]
return dtab

def improve_filter_format(tab):
"""Add filter string to column name
Parameters
----------
tab:table
Returns
-------
tab: table
"""
for filter in 'grizy':
col = filter+'MeanPSFMag'
tab[col].format = ".4f"
tab[col][tab[col] == -999.0] = np.nan

return(tab)

def search_lightcurve(objid):
"""setup to pull light curve info
Parameters
----------
objid: string
Returns
-------
dresults: search results
"""
dconstraints = {'objID': objid}
dcolumns = ("""objID,detectID,filterID,obsTime,ra,dec,psfFlux,psfFluxErr,psfMajorFWHM,psfMinorFWHM,
psfQfPerfect,apFlux,apFluxErr,infoFlag,infoFlag2,infoFlag3""").split(',')
# strip blanks and weed out blank and commented-out values
dcolumns = [x.strip() for x in dcolumns]
dcolumns = [x for x in dcolumns if x and not x.startswith('#')]


#get the actual detections and light curve info for this target
dresults = ps1search(table='detection',release='dr2',columns=dcolumns,**dconstraints)

return(dresults)


#Do a panstarrs search
def panstarrs_get_lightcurves(sample_table, *, radius=1/3600):
"""Searches panstarrs for light curves from a list of input coordinates. This is the MAIN function.
# panstarrs light curves from hipscat catalog in S3 using lsdb
def panstarrs_get_lightcurves(sample_table, *, radius=1):
"""Searches panstarrs hipscat files for light curves from a list of input coordinates.
Parameters
----------
Expand All @@ -244,53 +22,101 @@ def panstarrs_get_lightcurves(sample_table, *, radius=1/3600):
df_lc : MultiIndexDFObject
the main data structure to store all light curves
"""

df_lc = MultiIndexDFObject()

#for all objects in our catalog
for row in tqdm(sample_table):
#doesn't take SkyCoord, convert to floats
ra = row["coord"].ra.deg
dec = row["coord"].dec.deg
lab = row["label"]
objectid = row["objectid"]

# see if there is an object in panSTARRS at this location. if not, continue to the next object.
results = ps1cone(ra,dec,radius,release='dr2')
if not results:
continue
tab = Table.read(results, format='ascii')

# improve the format of the table
tab = improve_filter_format(tab)

#in case there is more than one object within 1 arcsec, sort them by match distance
tab.sort('distance')

#take the closest match as the best match
objid = tab['objID'][0]

#get the actual detections and light curve info for this target
dresults = search_lightcurve(objid)
if not dresults:
continue

#fix the column names to include filter names
dtab = addfilter(Table.read(dresults, format='ascii'))

dtab.sort('obsTime')
dtab['psfFlux'][dtab['psfFlux'] == -999.0] = np.nan
dtab['psfFluxErr'][dtab['psfFluxErr'] == -999.0] = np.nan

#here is the light curve mixed from all 5 bands
t_panstarrs = dtab['obsTime']
flux_panstarrs = dtab['psfFlux']*1E3 # in mJy
err_panstarrs = dtab['psfFluxErr'] *1E3
filtername = dtab['filter']
#read in the panstarrs object table to lsdb
#this table will be used for cross matching with our sample's ra and decs
#but does not have light curve information
panstarrs_object = lsdb.read_hipscat(
's3://stpubdata/panstarrs/ps1/public/hipscat/otmo',
storage_options={'anon': True},
columns=[
"objID", # PS1 ID
"raMean", "decMean", # coordinates to use for cross-matching
"nStackDetections", # some other data to use
]
)
#read in the panstarrs light curves to lsdb
#panstarrs recommendation is not to index into this table with ra and dec
#but to use object ids from the above object table
panstarrs_detect = lsdb.read_hipscat(
's3://stpubdata/panstarrs/ps1/public/hipscat/detection',
storage_options={'anon': True},
columns=[
"objID", # PS1 object ID
"detectID", # PS1 detection ID
# light-curve stuff
"obsTime", "filterID", "psfFlux", "psfFluxErr",
],
)
#convert astropy table to pandas dataframe
#special care for the SkyCoords in the table
sample_df = pd.DataFrame({'objectid': sample_table['objectid'],
'ra_deg': sample_table['coord'].ra.deg,
'dec_deg':sample_table['coord'].dec.deg,
'label':sample_table['label']})

#convert dataframe to hipscat
sample_lsdb = lsdb.from_dataframe(
sample_df,
ra_column="ra_deg",
dec_column="dec_deg",
margin_threshold=10,
# Optimize partition size
drop_empty_siblings=True
)

#plan to cross match panstarrs object with my sample
#only keep the best match
matched_objects = panstarrs_object.crossmatch(
sample_lsdb,
radius_arcsec=radius,
n_neighbors=1,
suffixes=("", "")
)

#put this single object light curves into a pandas multiindex dataframe
dfsingle = pd.DataFrame(dict(flux=flux_panstarrs, err=err_panstarrs, time=t_panstarrs, objectid=objectid, band=filtername, label=lab)).set_index(["objectid","label", "band", "time"])
#then concatenate each individual df together
df_lc.append(dfsingle)

return(df_lc)
# plan to join that cross match with detections to get light-curves
matched_lc = matched_objects.join(
panstarrs_detect,
left_on="objID",
right_on="objID",
output_catalog_name="yang_ps_lc",
suffixes = ["",""]
)

# Create default local cluster
# here is where the actual work gets done
with Client():
#compute the cross match with object table
#and the join with the detections table
matched_df = matched_lc.compute()

#handle the case where there are no matches and return empty df
if len(matched_df["filterID"]) == 0:
return MultiIndexDFObject(data=pd.DataFrame())

#cleanup the filter names to the expected letters
filter_id_to_name = {
1: 'Pan-STARRS g',
2: 'Pan-STARRS r',
3: 'Pan-STARRS i',
4: 'Pan-STARRS z',
5: 'Pan-STARRS y'
}

get_name_from_filter_id = np.vectorize(filter_id_to_name.get)
filtername = get_name_from_filter_id(matched_df["filterID"])

#make the dataframe of light curves
#the data conversions are to change from pyarrow datatypes to numpy datatypes

df_lc = pd.DataFrame({
'flux': pd.to_numeric(matched_df['psfFlux'] * 1e3, errors='coerce').astype(np.float64),
'err': pd.to_numeric(matched_df['psfFluxErr'] * 1e3, errors='coerce').astype(np.float64),
'time': pd.to_numeric(matched_df['obsTime'], errors='coerce').astype(np.float64),
'objectid': matched_df['objectid'].astype(np.int64),
'band': filtername,
'label': matched_df['label'].astype(str)
}).set_index(["objectid", "label", "band", "time"])

return MultiIndexDFObject(data=df_lc)

Loading

0 comments on commit ee9f4f1

Please sign in to comment.