diff --git a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/linearity_test.py b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/linearity_test.py index ad32295e..7c6b8d52 100644 --- a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/linearity_test.py +++ b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/linearity_test.py @@ -1,39 +1,81 @@ -#don't forget to set environment variable NECTARCAMDATA +# don't forget to set environment variable NECTARCAMDATA -import numpy as np -import pathlib +import argparse import os +import pathlib +import pickle import sys -import matplotlib.pyplot as plt -from utils import filters, adc_to_pe, optical_density_390ns, trasmission_390ns, fit_function, err_ratio, err_sum, plot_parameters +import matplotlib.pyplot as plt +import numpy as np from lmfit.models import Model from test_tools_components import LinearityTestTool -import argparse -import pickle +from utils import ( + adc_to_pe, + err_ratio, + err_sum, + filters, + linear_fit_function, + optical_density_390ns, + plot_parameters, + trasmission_390ns, +) def get_args(): """ Parses command-line arguments for the linearity test script. - + Returns: argparse.ArgumentParser: The parsed command-line arguments. """ - - parser = argparse.ArgumentParser(description='Linearity test B-TEL-1390 & Intensity resolution B-TEL-1010. \n' - +'According to the nectarchain component interface, you have to set a NECTARCAMDATA environment variable in the folder where you have the data from your runs or where you want them to be downloaded.\n' - +'You have to give a list of runs (run numbers with spaces inbetween), a corresponding transmission list and an output directory to save the final plot.\n' - +'If the data is not in NECTARCAMDATA, the files will be downloaded through DIRAC.\n For the purposes of testing this script, default data is from the runs used for this test in the TRR document.\n' - +'You can optionally specify the number of events to be processed (default 500) and the number of pixels used (default 70).\n') - parser.add_argument('-r','--runlist', type=int,nargs='+', help='List of runs (numbers separated by space)', required=False, default = [i for i in range (3404,3424)]+[i for i in range(3435,3444)]) - parser.add_argument('-t','--trans', type=float,nargs='+', help='List of corresponding transmission for each run', required=False , default = trasmission_390ns) - parser.add_argument('-e','--evts', type = int, help='Number of events to process from each run. Default is 500', required=False, default=500) - parser.add_argument('-o','--output', type=str, help='Output directory. If none, plot will be saved in current directory', required=False, default='./') - parser.add_argument("--temp_output", help="Temporary output directory for GUI", default=None) - - return parser + parser = argparse.ArgumentParser( + description="Linearity test B-TEL-1390 & Intensity resolution B-TEL-1010. \n" + + "According to the nectarchain component interface, you have to set a NECTARCAMDATA environment variable in the folder where you have the data from your runs or where you want them to be downloaded.\n" + + "You have to give a list of runs (run numbers with spaces inbetween), a corresponding transmission list and an output directory to save the final plot.\n" + + "If the data is not in NECTARCAMDATA, the files will be downloaded through DIRAC.\n For the purposes of testing this script, default data is from the runs used for this test in the TRR document.\n" + + "You can optionally specify the number of events to be processed (default 500) and the number of pixels used (default 70).\n" + ) + parser.add_argument( + "-r", + "--runlist", + type=int, + nargs="+", + help="List of runs (numbers separated by space)", + required=False, + default=[i for i in range(3404, 3424)] + [i for i in range(3435, 3444)], + ) + parser.add_argument( + "-t", + "--trans", + type=float, + nargs="+", + help="List of corresponding transmission for each run", + required=False, + default=trasmission_390ns, + ) + parser.add_argument( + "-e", + "--evts", + type=int, + help="Number of events to process from each run. Default is 500", + required=False, + default=500, + ) + parser.add_argument( + "-o", + "--output", + type=str, + help="Output directory. If none, plot will be saved in current directory", + required=False, + default="./", + ) + parser.add_argument( + "--temp_output", help="Temporary output directory for GUI", default=None + ) + + return parser def main(): @@ -54,239 +96,354 @@ def main(): parser = get_args() args = parser.parse_args() - - - runlist = args.runlist - transmission=args.trans #corresponding transmission for above data + transmission = args.trans # corresponding transmission for above data nevents = args.evts - + output_dir = os.path.abspath(args.output) - temp_output= os.path.abspath(args.temp_output) if args.temp_output else None + temp_output = os.path.abspath(args.temp_output) if args.temp_output else None print(f"Output directory: {output_dir}") # Debug print print(f"Temporary output file: {temp_output}") # Debug print - sys.argv = sys.argv[:1] - #runlist = [3441] + # runlist = [3441] - charge = np.zeros((len(runlist),2)) - std = np.zeros((len(runlist),2)) - std_err = np.zeros((len(runlist),2)) + charge = np.zeros((len(runlist), 2)) + std = np.zeros((len(runlist), 2)) + std_err = np.zeros((len(runlist), 2)) index = 0 for run in runlist: print("PROCESSING RUN {}".format(run)) tool = LinearityTestTool( - progress_bar=True, run_number=run, events_per_slice = 999, max_events=nevents, log_level=20, window_width=14, overwrite=True - + progress_bar=True, + run_number=run, + events_per_slice=999, + max_events=nevents, + log_level=20, + window_width=14, + overwrite=True, ) tool.initialize() tool.setup() tool.start() output = tool.finish() - + charge[index], std[index], std_err[index], npixels = output index += 1 - - #print("FINAL",charge) - - #we assume that they overlap at 0.01 so they should have the same value - #normalise high gain and low gain using charge value at 0.01 - transmission=np.array(transmission) - norm_factor_hg = charge[np.argwhere((transmission<1.1e-2) & (transmission>9e-3)),0][0] - #print(norm_factor_hg) - norm_factor_lg = charge[np.argwhere((transmission<1.1e-2) & (transmission>9e-3)),1][0] - #print(norm_factor_lg) - charge_norm_hg = charge[:,0]/norm_factor_hg - charge_norm_lg = charge[:,1]/norm_factor_lg - std_norm_hg = std[:,0]/norm_factor_hg - std_norm_lg = std[:,1]/norm_factor_lg - - #true charge is transmission as percentage of hg charge at 0.01 - true_charge = transmission/0.01 * norm_factor_hg - - - - fig, axs = plt.subplots(3, 1, sharex='col', sharey='row', figsize=(10,11), gridspec_kw={'height_ratios': [4,2,2]}) + # print("FINAL",charge) + + # we assume that they overlap at 0.01 so they should have the same value + # normalise high gain and low gain using charge value at 0.01 + transmission = np.array(transmission) + norm_factor_hg = charge[ + np.argwhere((transmission < 1.1e-2) & (transmission > 9e-3)), 0 + ][0] + # print(norm_factor_hg) + norm_factor_lg = charge[ + np.argwhere((transmission < 1.1e-2) & (transmission > 9e-3)), 1 + ][0] + # print(norm_factor_lg) + charge_norm_hg = charge[:, 0] / norm_factor_hg + charge_norm_lg = charge[:, 1] / norm_factor_lg + std_norm_hg = std[:, 0] / norm_factor_hg + std_norm_lg = std[:, 1] / norm_factor_lg + + # true charge is transmission as percentage of hg charge at 0.01 + true_charge = transmission / 0.01 * norm_factor_hg + + fig, axs = plt.subplots( + 3, + 1, + sharex="col", + sharey="row", + figsize=(10, 11), + gridspec_kw={"height_ratios": [4, 2, 2]}, + ) axs[0].grid(True, which="both") axs[0].set_ylabel("Estimated charge [p.e.]") - axs[0].set_yscale('log') - axs[0].set_xscale('log') - axs[0].axvspan(10,1000,alpha=0.2,color="orange") + axs[0].set_yscale("log") + axs[0].set_xscale("log") + axs[0].axvspan(10, 1000, alpha=0.2, color="orange") axs[1].grid(True, which="both") axs[1].set_ylabel("Residuals [%]") - axs[1].set_xscale('log') - axs[1].set_ylim((-100,100)) - axs[1].axvspan(10,1000,alpha=0.2,color="orange") + axs[1].set_xscale("log") + axs[1].set_ylim((-100, 100)) + axs[1].axvspan(10, 1000, alpha=0.2, color="orange") axs[2].grid(True, which="both") axs[2].set_ylabel("HG/LG ratio") - axs[2].set_yscale('log') - axs[2].set_xscale('log') - axs[2].set_ylim((0.5,20)) - axs[2].axvspan(10,1000,alpha=0.2,color="orange") + axs[2].set_yscale("log") + axs[2].set_xscale("log") + axs[2].set_ylim((0.5, 20)) + axs[2].axvspan(10, 1000, alpha=0.2, color="orange") axs[2].set_xlabel("Illumination charge [p.e.]") + for channel, (channel_charge, channel_std, name) in enumerate( + zip( + [charge_norm_hg, charge_norm_lg], + [std_norm_hg, std_norm_lg], + ["High Gain", "Low Gain"], + ) + ): + yx = zip( + true_charge, channel_charge, channel_std, runlist + ) # sort by true charge + ch_sorted = np.array(sorted(yx)) + # print(ch_sorted) + + # linearity + model = Model(linear_fit_function) + params = model.make_params(a=100, b=0) + true = ch_sorted[:, 0] + + ch_charge = ch_sorted[:, 1] * norm_factor_hg[0] + ch_std = ch_sorted[:, 2] * norm_factor_hg[0] + ch_err = ch_std / np.sqrt(npixels) + + ch_fit = model.fit( + ch_charge[ + plot_parameters[name]["linearity_range"][0] : plot_parameters[name][ + "linearity_range" + ][1] + ], + params, + weights=1 + / ch_err[ + plot_parameters[name]["linearity_range"][0] : plot_parameters[name][ + "linearity_range" + ][1] + ], + x=true[ + plot_parameters[name]["linearity_range"][0] : plot_parameters[name][ + "linearity_range" + ][1] + ], + ) + # print(ch_fit.fit_report()) + + a = ch_fit.params["a"].value + b = ch_fit.params["b"].value + a_err = ch_fit.params["a"].stderr + b_err = ch_fit.params["b"].stderr + + axs[0].errorbar( + true, + ch_charge, + yerr=ch_err, + label=name, + ls="", + marker="o", + color=plot_parameters[name]["color"], + ) + axs[0].plot( + true[ + plot_parameters[name]["linearity_range"][0] : plot_parameters[name][ + "linearity_range" + ][1] + ], + fit_function( + true[ + plot_parameters[name]["linearity_range"][0] : plot_parameters[name][ + "linearity_range" + ][1] + ], + a, + b, + ), + color=plot_parameters[name]["color"], + ) + axs[0].text( + plot_parameters[name]["label_coords"][0], + plot_parameters[name]["label_coords"][0], + name, + va="top", + fontsize=20, + color=plot_parameters[name]["color"], + alpha=0.9, + ) + axs[0].text( + plot_parameters[name]["text_coords"][0], + plot_parameters[name]["text_coords"][1], + "y = ax + b\n" + + r"a$_{\rm %s}=%2.2f \pm %2.2f$ " + % (plot_parameters[name]["initials"], round(a, 2), round(a_err, 2)) + + "\n" + + r"b$_{\rm %s}=(%2.2f \pm %2.2f) $ p.e." + % (plot_parameters[name]["initials"], round(b, 2), round(b_err, 2)) + + "\n" + r"$\chi_{\mathrm{%s}}^2/{\mathrm{dof}} =%2.1f/%d$" + % ( + plot_parameters[name]["initials"], + round(ch_fit.chisqr, 1), + int(ch_fit.chisqr / ch_fit.redchi), + ), + backgroundcolor="white", + bbox=dict( + facecolor="white", + edgecolor=plot_parameters[name]["color"], + lw=3, + boxstyle="round,pad=0.5", + ), + ha="left", + va="top", + fontsize=15, + color="k", + alpha=0.9, + ) + # residuals - for channel, (channel_charge, channel_std, name) in enumerate(zip([charge_norm_hg,charge_norm_lg],[std_norm_hg,std_norm_lg],["High Gain","Low Gain"])): - yx = zip(true_charge,channel_charge,channel_std,runlist) #sort by true charge - - ch_sorted = np.array(sorted(yx)) - #print(ch_sorted) - - #linearity - model = Model(fit_function) - params = model.make_params(a=100, b=0) - true = ch_sorted[:,0] - - ch_charge = ch_sorted[:,1] * norm_factor_hg[0] - ch_std = ch_sorted[:,2] * norm_factor_hg[0] - ch_err = ch_std/np.sqrt(npixels) - - - ch_fit = model.fit(ch_charge[plot_parameters[name]["linearity_range"][0]:plot_parameters[name]["linearity_range"][1]], params, - weights=1/ch_err[plot_parameters[name]["linearity_range"][0]:plot_parameters[name]["linearity_range"][1]], - x=true[plot_parameters[name]["linearity_range"][0]:plot_parameters[name]["linearity_range"][1]]) - - #print(ch_fit.fit_report()) - - - a = ch_fit.params['a'].value - b = ch_fit.params['b'].value - a_err = ch_fit.params['a'].stderr - b_err = ch_fit.params['b'].stderr - - - - - axs[0].errorbar(true,ch_charge,yerr = ch_err, label = name, ls='',marker='o',color = plot_parameters[name]["color"]) - axs[0].plot(true[plot_parameters[name]["linearity_range"][0]:plot_parameters[name]["linearity_range"][1]], - fit_function(true[plot_parameters[name]["linearity_range"][0]:plot_parameters[name]["linearity_range"][1]],a,b),color = plot_parameters[name]["color"]) - - - - - axs[0].text(plot_parameters[name]["label_coords"][0], plot_parameters[name]["label_coords"][0], name, - va='top', fontsize=20, color=plot_parameters[name]["color"], alpha=0.9) - axs[0].text(plot_parameters[name]["text_coords"][0], plot_parameters[name]["text_coords"][1], 'y = ax + b\n' - + r'a$_{\rm %s}=%2.2f \pm %2.2f$ '%(plot_parameters[name]["initials"],round(a,2), - round(a_err,2)) - + '\n' + r'b$_{\rm %s}=(%2.2f \pm %2.2f) $ p.e.' %(plot_parameters[name]["initials"],round(b,2), - round(b_err,2)) - + "\n" r"$\chi_{\mathrm{%s}}^2/{\mathrm{dof}} =%2.1f/%d$" %(plot_parameters[name]["initials"],round(ch_fit.chisqr,1), - int(ch_fit.chisqr/ch_fit.redchi)), - backgroundcolor='white', bbox=dict(facecolor='white', edgecolor=plot_parameters[name]["color"], lw=3, - boxstyle='round,pad=0.5'), - ha='left', va='top', fontsize=15, color='k', alpha=0.9) - - #residuals - - pred = fit_function(true,a,b) #prediction - pred_err = a_err*true + b_err #a,b uncorrelated - - resid = (ch_charge - pred)/ch_charge - - numerator_err = err_sum(ch_err,pred_err) - - resid_err = err_ratio((ch_charge-pred),ch_charge,numerator_err,ch_std) - - axs[1].errorbar(true,resid*100, yerr=abs(resid_err)*100, ls='', marker = 'o',color = plot_parameters[name]["color"]) #percentage - axs[1].axhline(8, color='k', alpha=0.4, ls='--', lw=2) - axs[1].axhline(-8, color='k', alpha=0.4, ls='--', lw=2) - - - #hg/lg ratio - ratio = charge[:,0]/charge[:,1] - ratio_std = err_ratio(charge[:,0],charge[:,1],std[:,0],std[:,1]) - - yx = zip(true_charge,ratio,ratio_std) #sort by true charge - ratio_sorted = np.array(sorted(yx)) - true = ratio_sorted[:,0] - ratio = ratio_sorted[:,1] - ratio_std = ratio_sorted[:,2] + pred = fit_function(true, a, b) # prediction + pred_err = a_err * true + b_err # a,b uncorrelated - model = model = Model(fit_function) - params = model.make_params(a=100, b=0) - ratio_fit = model.fit(ratio[10:-4], params, weights=1/ratio_std[10:-4], x=true[10:-4]) + resid = (ch_charge - pred) / ch_charge + numerator_err = err_sum(ch_err, pred_err) - axs[2].set_ylabel("hg/lg") - axs[2].set_xlabel("charge(p.e.)") - axs[2].errorbar(true,ratio,yerr=ratio_std,ls='',color='C1',marker = 'o') - axs[2].plot(true[10:-4],fit_function(true[10:-4], ratio_fit.params['a'].value, ratio_fit.params['b'].value), ls='-',color='C1', alpha=0.9) - axs[2].text(8, 1, 'y = ax + b\n' - + r"$a_{\mathrm{HG/LG}}= (%2.1f\pm%2.1f)$ p.e.$^{-1}$ " %(round(ratio_fit.params['a'].value,1), - round(ratio_fit.params['a'].stderr,1)) - + "\n" r"$b_{\mathrm{HG/LG}}=%2.1f\pm%2.1f$" %(round(ratio_fit.params['b'].value,1), - round(ratio_fit.params['b'].stderr,1)) - + "\n" r"$\chi_{\mathrm{HG/LG}}^2/{\mathrm{dof}} =%2.1f/%d$" %(round(ratio_fit.chisqr,1), - int(ratio_fit.chisqr/ratio_fit.redchi)), - - backgroundcolor='white', bbox=dict(facecolor='white', edgecolor='C1', lw=3, - boxstyle='round,pad=0.5'), - ha='left', va='bottom', fontsize=11, color='k', alpha=0.9) + resid_err = err_ratio((ch_charge - pred), ch_charge, numerator_err, ch_std) + + axs[1].errorbar( + true, + resid * 100, + yerr=abs(resid_err) * 100, + ls="", + marker="o", + color=plot_parameters[name]["color"], + ) # percentage + axs[1].axhline(8, color="k", alpha=0.4, ls="--", lw=2) + axs[1].axhline(-8, color="k", alpha=0.4, ls="--", lw=2) + # hg/lg ratio + ratio = charge[:, 0] / charge[:, 1] + ratio_std = err_ratio(charge[:, 0], charge[:, 1], std[:, 0], std[:, 1]) + yx = zip(true_charge, ratio, ratio_std) # sort by true charge + ratio_sorted = np.array(sorted(yx)) + true = ratio_sorted[:, 0] + ratio = ratio_sorted[:, 1] + ratio_std = ratio_sorted[:, 2] + model = model = Model(fit_function) + params = model.make_params(a=100, b=0) + ratio_fit = model.fit( + ratio[10:-4], params, weights=1 / ratio_std[10:-4], x=true[10:-4] + ) - plt.savefig(os.path.join(output_dir,"linearity_test.png")) + axs[2].set_ylabel("hg/lg") + axs[2].set_xlabel("charge(p.e.)") + axs[2].errorbar(true, ratio, yerr=ratio_std, ls="", color="C1", marker="o") + axs[2].plot( + true[10:-4], + fit_function( + true[10:-4], ratio_fit.params["a"].value, ratio_fit.params["b"].value + ), + ls="-", + color="C1", + alpha=0.9, + ) + axs[2].text( + 8, + 1, + "y = ax + b\n" + + r"$a_{\mathrm{HG/LG}}= (%2.1f\pm%2.1f)$ p.e.$^{-1}$ " + % ( + round(ratio_fit.params["a"].value, 1), + round(ratio_fit.params["a"].stderr, 1), + ) + + "\n" + r"$b_{\mathrm{HG/LG}}=%2.1f\pm%2.1f$" + % ( + round(ratio_fit.params["b"].value, 1), + round(ratio_fit.params["b"].stderr, 1), + ) + + "\n" + r"$\chi_{\mathrm{HG/LG}}^2/{\mathrm{dof}} =%2.1f/%d$" + % (round(ratio_fit.chisqr, 1), int(ratio_fit.chisqr / ratio_fit.redchi)), + backgroundcolor="white", + bbox=dict(facecolor="white", edgecolor="C1", lw=3, boxstyle="round,pad=0.5"), + ha="left", + va="bottom", + fontsize=11, + color="k", + alpha=0.9, + ) + + plt.savefig(os.path.join(output_dir, "linearity_test.png")) if temp_output: - with open(os.path.join(args.temp_output, 'plot1.pkl'), 'wb') as f: + with open(os.path.join(args.temp_output, "plot1.pkl"), "wb") as f: pickle.dump(fig, f) - - - - - #charge resolution - charge_hg = charge[:,0] - std_hg = std[:,0] - std_hg_err = std_err[:,0] - charge_lg = charge[:,1] - std_lg = std[:,1] - std_lg_err = std_err[:,1] + # charge resolution + charge_hg = charge[:, 0] + std_hg = std[:, 0] + std_hg_err = std_err[:, 0] + charge_lg = charge[:, 1] + std_lg = std[:, 1] + std_lg_err = std_err[:, 1] fig = plt.figure() - charge_plot = np.linspace(3e-2,1e4) - stat_limit = 1/np.sqrt(charge_plot) #statistical limit - - hg_res = (std_hg/charge_hg) - hg_res_err = err_ratio(std_hg,charge_hg,std_hg_err,std_hg) - - lg_res = (std_lg/charge_lg) - lg_res_err = err_ratio(std_lg,charge_lg,std_lg_err,std_lg) - plt.xscale(u'log') - plt.yscale(u'log') - plt.plot(charge_plot,stat_limit,color='gray', ls='-', lw=3,alpha=0.8,label='Statistical limit ') - mask = charge_hg<3e2 - - plt.errorbar(charge_hg[mask], hg_res[mask], - xerr=std_hg[mask]/np.sqrt(npixels),yerr=hg_res_err[mask]/np.sqrt(npixels), - ls = '', marker ='o', label=None,color='C0') + charge_plot = np.linspace(3e-2, 1e4) + stat_limit = 1 / np.sqrt(charge_plot) # statistical limit + + hg_res = std_hg / charge_hg + hg_res_err = err_ratio(std_hg, charge_hg, std_hg_err, std_hg) + + lg_res = std_lg / charge_lg + lg_res_err = err_ratio(std_lg, charge_lg, std_lg_err, std_lg) + plt.xscale("log") + plt.yscale("log") + plt.plot( + charge_plot, + stat_limit, + color="gray", + ls="-", + lw=3, + alpha=0.8, + label="Statistical limit ", + ) + mask = charge_hg < 3e2 + + plt.errorbar( + charge_hg[mask], + hg_res[mask], + xerr=std_hg[mask] / np.sqrt(npixels), + yerr=hg_res_err[mask] / np.sqrt(npixels), + ls="", + marker="o", + label=None, + color="C0", + ) mask = np.invert(mask) - plt.errorbar(charge_lg[mask]*ratio_fit.params['b'].value,lg_res[mask], - xerr=std_lg[mask]/np.sqrt(npixels),yerr=lg_res_err[mask]/np.sqrt(npixels), ls = '', marker ='o', label=None,color='C0') - - plt.xlabel(r'Charge $\overline{Q}$ [p.e.]') - plt.ylabel(r'Charge resolution $\frac{\sigma_{Q}}{\overline{Q}}$') - - plt.xlim(3e-2,4e3) + plt.errorbar( + charge_lg[mask] * ratio_fit.params["b"].value, + lg_res[mask], + xerr=std_lg[mask] / np.sqrt(npixels), + yerr=lg_res_err[mask] / np.sqrt(npixels), + ls="", + marker="o", + label=None, + color="C0", + ) + + plt.xlabel(r"Charge $\overline{Q}$ [p.e.]") + plt.ylabel(r"Charge resolution $\frac{\sigma_{Q}}{\overline{Q}}$") + + plt.xlim(3e-2, 4e3) plt.legend(frameon=False) - plt.savefig(os.path.join(output_dir,"charge_resolution.png")) + plt.savefig(os.path.join(output_dir, "charge_resolution.png")) if temp_output: - with open(os.path.join(args.temp_output, 'plot2.pkl'), 'wb') as f: + with open(os.path.join(args.temp_output, "plot2.pkl"), "wb") as f: pickle.dump(fig, f) - plt.close('all') + plt.close("all") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/pix_couple_tim_uncertainty_test.py b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/pix_couple_tim_uncertainty_test.py index 532ad9d6..637d5de2 100644 --- a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/pix_couple_tim_uncertainty_test.py +++ b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/pix_couple_tim_uncertainty_test.py @@ -1,33 +1,65 @@ -import matplotlib.pyplot as plt -import numpy as np -from test_tools_components import ToMPairsTool +import argparse import os -import sys import pickle +import sys -import argparse +import matplotlib.pyplot as plt +import numpy as np +from test_tools_components import ToMPairsTool def get_args(): """ Parses command-line arguments for the pix_couple_tim_uncertainty_test.py script. - + Returns: argparse.ArgumentParser: The parsed command-line arguments. """ - parser = argparse.ArgumentParser(description='Time resolution (timing uncertainty between couples of pixels) test B-TEL-1030.\n' - +'According to the nectarchain component interface, you have to set a NECTARCAMDATA environment variable in the folder where you have the data from your runs or where you want them to be downloaded.\n' - +'You have to give a list of runs (run numbers with spaces inbetween) and an output directory to save the final plot.\n' - +'If the data is not in NECTARCAMDATA, the files will be downloaded through DIRAC.\n For the purposes of testing this script, default data is from the runs used for this test in the TRR document.\n' - +'You can optionally specify the number of events to be processed (default 1000). Takes a lot of time.\n') - parser.add_argument('-r','--runlist', type=int, nargs='+', help='List of runs (numbers separated by space). You can put just one run, default 3292', required=False, default = [3292]) - parser.add_argument('-e','--evts', type = int, help='Number of events to process from each run. Default is 100. 1000 or more gives best results but takes some time', required=False, default=100) - parser.add_argument('-t','--pmt_transit_time', type=str, help='.csv file with pmt transit time corrections', required=False, default="/Users/dm277349/nectarchain_data/transit_time/hv_pmt_tom_correction_laser_measurement_per_pixel_fit_sqrt_hv_newmethod.csv") - parser.add_argument('-o','--output', type=str, help='Output directory. If none, plot will be saved in current directory', required=False, default='./') - parser.add_argument("--temp_output", help="Temporary output directory for GUI", default=None) - - return parser + parser = argparse.ArgumentParser( + description="Time resolution (timing uncertainty between couples of pixels) test B-TEL-1030.\n" + + "According to the nectarchain component interface, you have to set a NECTARCAMDATA environment variable in the folder where you have the data from your runs or where you want them to be downloaded.\n" + + "You have to give a list of runs (run numbers with spaces inbetween) and an output directory to save the final plot.\n" + + "If the data is not in NECTARCAMDATA, the files will be downloaded through DIRAC.\n For the purposes of testing this script, default data is from the runs used for this test in the TRR document.\n" + + "You can optionally specify the number of events to be processed (default 1000). Takes a lot of time.\n" + ) + parser.add_argument( + "-r", + "--runlist", + type=int, + nargs="+", + help="List of runs (numbers separated by space). You can put just one run, default 3292", + required=False, + default=[3292], + ) + parser.add_argument( + "-e", + "--evts", + type=int, + help="Number of events to process from each run. Default is 100. 1000 or more gives best results but takes some time", + required=False, + default=100, + ) + parser.add_argument( + "-t", + "--pmt_transit_time", + type=str, + help=".csv file with pmt transit time corrections", + required=False, + default="../transit_time/hv_pmt_tom_correction_laser_measurement_per_pixel_fit_sqrt_hv_newmethod.csv", + ) + parser.add_argument( + "-o", + "--output", + type=str, + help="Output directory. If none, plot will be saved in current directory", + required=False, + default="./", + ) + parser.add_argument( + "--temp_output", help="Temporary output directory for GUI", default=None + ) + return parser def main(): @@ -41,12 +73,9 @@ def main(): If a temporary output directory is specified, the plot is also saved to a pickle file in that directory for the gui to use. """ - parser = get_args() args = parser.parse_args() - - tt_path = "/Users/dm277349/nectarchain_data/transit_time/hv_pmt_tom_correction_laser_measurement_per_pixel_fit_sqrt_hv_newmethod.csv" runlist = args.runlist @@ -58,11 +87,10 @@ def main(): print(f"Output directory: {output_dir}") # Debug print print(f"Temporary output file: {temp_output}") # Debug print - sys.argv = sys.argv[:1] tom = [] - pixel_ids = [] #pixel ids for run + pixel_ids = [] # pixel ids for run tom_corrected = [] @@ -70,12 +98,17 @@ def main(): dt_corrected = [] pixel_pairs = [] - for run in runlist: - print("PROCESSING RUN {}".format(run)) tool = ToMPairsTool( - progress_bar=True, run_number=run, events_per_slice = 501 ,max_events=nevents, log_level=20, peak_height=10, window_width=16, overwrite=True + progress_bar=True, + run_number=run, + events_per_slice=501, + max_events=nevents, + log_level=20, + peak_height=10, + window_width=16, + overwrite=True, ) tool.initialize() tool.setup() @@ -87,45 +120,52 @@ def main(): dt_no_correction.append(output[3]) dt_corrected.append(output[4]) pixel_pairs.append(output[5]) - dt_no_correction = np.array(dt_no_correction[0]) dt_corrected = np.array(output[4]) - dt_corrected[abs(dt_corrected)>7]=np.nan - dt_corrected[abs(dt_no_correction)>7]=np.nan - std_corrected = np.nanstd(dt_corrected, axis = 1) - - fig, ax = plt.subplots(figsize=(10,10/1.61)) - plt.hist(std_corrected,range=(0,5),density=True, - histtype='step', lw=3, bins=200) - - plt.axvline(2, color='C4', alpha=0.8) - ax.text(2.1, 0.5, 'CTA requirement', color='C4', fontsize=20, - rotation=-90) - - - ax.annotate("", xy=(1.6, 0.25), xytext=(2, 0.25), color='C4', alpha=0.5, - transform=ax.transAxes, - arrowprops=dict(color='C4', alpha=0.7, lw=3, arrowstyle='->')) - - ax.annotate("", xy=(1.6, 0.75), xytext=(2, 0.75), color='C4', alpha=0.5, - transform=ax.transAxes, - arrowprops=dict(color='C4', alpha=0.7, lw=3, arrowstyle='->')) + dt_corrected[abs(dt_corrected) > 7] = np.nan + dt_corrected[abs(dt_no_correction) > 7] = np.nan + std_corrected = np.nanstd(dt_corrected, axis=1) + + fig, ax = plt.subplots(figsize=(10, 10 / 1.61)) + plt.hist(std_corrected, range=(0, 5), density=True, histtype="step", lw=3, bins=200) + + plt.axvline(2, color="C4", alpha=0.8) + ax.text(2.1, 0.5, "CTA requirement", color="C4", fontsize=20, rotation=-90) + + ax.annotate( + "", + xy=(1.6, 0.25), + xytext=(2, 0.25), + color="C4", + alpha=0.5, + transform=ax.transAxes, + arrowprops=dict(color="C4", alpha=0.7, lw=3, arrowstyle="->"), + ) + + ax.annotate( + "", + xy=(1.6, 0.75), + xytext=(2, 0.75), + color="C4", + alpha=0.5, + transform=ax.transAxes, + arrowprops=dict(color="C4", alpha=0.7, lw=3, arrowstyle="->"), + ) + + plt.xlabel("RMS of $\Delta t_{\mathrm{TOM}}$ for pairs of pixels [ns]") + plt.ylabel("Normalized entries") - plt.xlabel('RMS of $\Delta t_{\mathrm{TOM}}$ for pairs of pixels [ns]') - plt.ylabel('Normalized entries') - plt.gcf() - plt.savefig(os.path.join(output_dir,"pix_couple_tim_uncertainty.png")) + plt.savefig(os.path.join(output_dir, "pix_couple_tim_uncertainty.png")) if temp_output: - with open(os.path.join(args.temp_output, 'plot1.pkl'), 'wb') as f: + with open(os.path.join(args.temp_output, "plot1.pkl"), "wb") as f: pickle.dump(fig, f) - - plt.close('all') + plt.close("all") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/test_tools_components.py b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/test_tools_components.py index d827779d..966a15a4 100644 --- a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/test_tools_components.py +++ b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/test_tools_components.py @@ -1,61 +1,56 @@ +import os +import pathlib +import random +from itertools import combinations + +import h5py +import hdf5plugin +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd from astropy import units as u from ctapipe.containers import EventType, Field from ctapipe.core.traits import ComponentNameList, Integer from ctapipe_io_nectarcam import constants from ctapipe_io_nectarcam.containers import NectarCAMDataContainer -from itertools import combinations from lmfit.models import Model -from nectarchain.data.container import ( - NectarCAMContainer, -) +from scipy.interpolate import InterpolatedUnivariateSpline +from scipy.signal import find_peaks +from utils import adc_to_pe, argmedian + +from nectarchain.data.container import NectarCAMContainer from nectarchain.makers import EventsLoopNectarCAMCalibrationTool from nectarchain.makers.component import NectarCAMComponent from nectarchain.utils import ComponentUtils -import h5py -import hdf5plugin -import matplotlib.pyplot as plt -import numpy as np -import os -import pandas as pd -import pathlib -import random -from scipy.interpolate import InterpolatedUnivariateSpline -from scipy.signal import find_peaks -from utils import ( - adc_to_pe, - argmedian, - closest_value, -) -#overriding so we can have maxevents in the path +# overriding so we can have maxevents in the path def _init_output_path(self): - """ - Initializes the output path for the NectarCAMCalibrationTool. - - If `max_events` is `None`, the output file name will be in the format `{self.name}_run{self.run_number}.h5`. Otherwise, the file name will be in the format `{self.name}_run{self.run_number}_maxevents{self.max_events}.h5`. - - The output path is constructed by joining the `NECTARCAMDATA` environment variable (or `/tmp` if not set) with the `tests` subdirectory and the generated file name. - """ - - if self.max_events is None: - filename = f"{self.name}_run{self.run_number}.h5" - else: - filename = f"{self.name}_run{self.run_number}_maxevents{self.max_events}.h5" - self.output_path = pathlib.Path( - f"{os.environ.get('NECTARCAMDATA','/tmp')}/tests/{filename}" - ) + """ + Initializes the output path for the NectarCAMCalibrationTool. -EventsLoopNectarCAMCalibrationTool._init_output_path = _init_output_path + If `max_events` is `None`, the output file name will be in the format `{self.name}_run{self.run_number}.h5`. Otherwise, the file name will be in the format `{self.name}_run{self.run_number}_maxevents{self.max_events}.h5`. + + The output path is constructed by joining the `NECTARCAMDATA` environment variable (or `/tmp` if not set) with the `tests` subdirectory and the generated file name. + """ + + if self.max_events is None: + filename = f"{self.name}_run{self.run_number}.h5" + else: + filename = f"{self.name}_run{self.run_number}_maxevents{self.max_events}.h5" + self.output_path = pathlib.Path( + f"{os.environ.get('NECTARCAMDATA','/tmp')}/tests/{filename}" + ) +EventsLoopNectarCAMCalibrationTool._init_output_path = _init_output_path class ChargeContainer(NectarCAMContainer): """ This class contains fields that store various properties and data related to NectarCAM events, including: - + - `run_number`: The run number associated with the waveforms. - `npixels`: The number of effective pixels. - `pixels_id`: An array of pixel IDs. @@ -65,6 +60,7 @@ class ChargeContainer(NectarCAMContainer): - `charge_hg`: A 2D array of high gain charge values. - `charge_lg`: A 2D array of low gain charge values. """ + run_number = Field( type=np.uint16, description="run number associated to the waveforms", @@ -82,27 +78,27 @@ class ChargeContainer(NectarCAMContainer): ) event_id = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event ids") - charge_hg = Field( type=np.ndarray, dtype=np.float64, ndim=2, description="The high gain charge" ) charge_lg = Field( type=np.ndarray, dtype=np.float64, ndim=2, description="The low gain charge" ) - + class ChargeComp(NectarCAMComponent): - + """ This class `ChargeComp` is a NectarCAMComponent that processes NectarCAM event data. It extracts the charge information from the waveforms of each event, handling cases of saturated or noisy events. The class has the following configurable parameters: - + - `window_shift`: The time in ns before the peak to extract the charge. - `window_width`: The duration of the charge extraction window in ns. - + The `__init__` method initializes important members of the component, such as timestamps, event type, event ids, pedestal and charge for both gain channels. The `__call__` method is the main processing logic, which is called for each event. It extracts the charge information for both high gain and low gain channels, handling various cases such as saturated events and events with no signal. The `finish` method collects all the processed data and returns a `ChargeContainer` object containing the run number, number of pixels, pixel IDs, UCTS timestamps, event types, event IDs, and the high and low gain charge values. """ + window_shift = Integer( default_value=6, help="the time in ns before the peak to extract charge", @@ -113,9 +109,6 @@ class ChargeComp(NectarCAMComponent): help="the duration of the extraction window in ns", ).tag(config=True) - - - def __init__(self, subarray, config=None, parent=None, *args, **kwargs): super().__init__( subarray=subarray, config=config, parent=parent, *args, **kwargs @@ -131,15 +124,13 @@ def __init__(self, subarray, config=None, parent=None, *args, **kwargs): self.__charge_hg = [] self.__charge_lg = [] - - ##This method need to be defined ! def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): self.__event_id.append(np.uint32(event.index.event_id)) - - #print(event.index.event_id) - + + # print(event.index.event_id) + self.__event_type.append(event.trigger.event_type.value) self.__ucts_timestamp.append(event.nectarcam.tel[0].evt.ucts_timestamp) @@ -147,79 +138,70 @@ def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): wfs.append(event.r0.tel[0].waveform[constants.HIGH_GAIN][self.pixels_id]) wfs.append(event.r0.tel[0].waveform[constants.LOW_GAIN][self.pixels_id]) - - - #####THE JOB IS HERE###### - for i, (pedestal,charge) in enumerate(zip([self.__pedestal_hg, self.__pedestal_lg],[self.__charge_hg,self.__charge_lg])): - wf = np.array(wfs[i],dtype=np.float16) - - index_peak = np.argmax(wf,axis=1) - index_peak[index_peak<20]=20 - index_peak[index_peak>40]=40 - + for i, (pedestal, charge) in enumerate( + zip( + [self.__pedestal_hg, self.__pedestal_lg], + [self.__charge_hg, self.__charge_lg], + ) + ): + wf = np.array(wfs[i], dtype=np.float16) + + index_peak = np.argmax(wf, axis=1) + index_peak[index_peak < 20] = 20 + index_peak[index_peak > 40] = 40 + signal_start = index_peak - self.window_shift signal_stop = index_peak + self.window_width - self.window_shift - - - - - if event.trigger.event_type==EventType.FLATFIELD: + + if event.trigger.event_type == EventType.FLATFIELD: integral = np.zeros(len(self.pixels_id)) - + for pix in range(len(self.pixels_id)): - #search for saturated events or events with no signal - ped = np.round(np.mean(wf[pix, 0:signal_start[pix]])) - - - peaks_sat = find_peaks(wf[pix,20:45],height=1000,plateau_size=self.window_width) - if len(peaks_sat[0])==1: - #saturated - - #print("saturated") - - signal_start[pix]=argmedian(wf[pix,20:45])+20-self.window_shift - signal_stop[pix]=signal_start[pix]+self.window_width - integral[pix] = np.sum(wf[pix,signal_start[pix]:signal_stop[pix]])-ped*(signal_stop[pix]-signal_start[pix]) - + # search for saturated events or events with no signal + ped = np.round(np.mean(wf[pix, 0 : signal_start[pix]])) + + peaks_sat = find_peaks( + wf[pix, 20:45], height=1000, plateau_size=self.window_width + ) + if len(peaks_sat[0]) == 1: + # saturated + + # print("saturated") + + signal_start[pix] = ( + argmedian(wf[pix, 20:45]) + 20 - self.window_shift + ) + signal_stop[pix] = signal_start[pix] + self.window_width + integral[pix] = np.sum( + wf[pix, signal_start[pix] : signal_stop[pix]] + ) - ped * (signal_stop[pix] - signal_start[pix]) + else: - peaks_signal = find_peaks(wf[pix],prominence=10) - if len(peaks_signal[0])>=12: - #print("noisy event") - integral[pix]=0 + peaks_signal = find_peaks(wf[pix], prominence=10) + if len(peaks_signal[0]) >= 12: + # print("noisy event") + integral[pix] = 0 - elif len(peaks_signal[0])<1: - #flat - integral[pix]=0 + elif len(peaks_signal[0]) < 1: + # flat + integral[pix] = 0 - else: + else: # x = np.linspace(0,signal_stop[pix]-signal_start[pix],signal_stop[pix]-signal_start[pix]) # spl = UnivariateSpline(x,y) # integral[pix] = spl.integral(0,signal_stop[pix]-signal_start[pix]) - - - integral[pix] = np.sum(wf[pix,signal_start[pix]:signal_stop[pix]])-ped*(signal_stop[pix]-signal_start[pix]) - - - chg = integral - - charge.append(chg) - - - - - - - - - + integral[pix] = np.sum( + wf[pix, signal_start[pix] : signal_stop[pix]] + ) - ped * (signal_stop[pix] - signal_start[pix]) + chg = integral + charge.append(chg) ##This method need to be defined ! def finish(self): - output = ChargeContainer( run_number=ChargeContainer.fields["run_number"].type(self._run_number), npixels=ChargeContainer.fields["npixels"].type(self._npixels), @@ -227,9 +209,10 @@ def finish(self): ucts_timestamp=ChargeContainer.fields["ucts_timestamp"].dtype.type( self.__ucts_timestamp ), - event_type=ChargeContainer.fields["event_type"].dtype.type(self.__event_type), + event_type=ChargeContainer.fields["event_type"].dtype.type( + self.__event_type + ), event_id=ChargeContainer.fields["event_id"].dtype.type(self.__event_id), - charge_hg=ChargeContainer.fields["charge_hg"].dtype.type( np.array(self.__charge_hg) ), @@ -238,15 +221,13 @@ def finish(self): ), ) - return output - class LinearityTestTool(EventsLoopNectarCAMCalibrationTool): """ This class, `LinearityTestTool`, is a subclass of `EventsLoopNectarCAMCalibrationTool`. It is responsible for performing a linearity test on NectarCAM data. The class has a `componentsList` attribute that specifies the list of NectarCAM components to be applied. - + The `finish` method is the main functionality of this class. It reads the charge data from the output file, calculates the mean charge, standard deviation, and standard error for both the high gain and low gain channels, and returns these values. This information can be used to assess the linearity of the NectarCAM system. """ @@ -258,61 +239,50 @@ class LinearityTestTool(EventsLoopNectarCAMCalibrationTool): help="List of Component names to be apply, the order will be respected", ).tag(config=True) - def finish(self, *args, **kwargs): - super().finish(return_output_component=True, *args, **kwargs) - - - mean_charge = [0,0] #per channel - std_charge = [0,0] - std_err = [0,0] + + mean_charge = [0, 0] # per channel + std_charge = [0, 0] + std_err = [0, 0] charge_hg = [] charge_lg = [] - - + output_file = h5py.File(self.output_path) - - + for thing in output_file: group = output_file[thing] - dataset = group['ChargeContainer'] + dataset = group["ChargeContainer"] data = dataset[:] - #print("data",data) + # print("data",data) for tup in data: try: npixels = tup[1] charge_hg.extend(tup[6]) charge_lg.extend(tup[7]) - + except: break - + output_file.close() - - - charge_pe_hg=np.array(charge_hg)/adc_to_pe - charge_pe_lg=np.array(charge_lg)/adc_to_pe - - for channel,charge in enumerate([charge_pe_hg,charge_pe_lg]): - - - pix_mean_charge = np.mean(charge, axis=0) #in pe - + + charge_pe_hg = np.array(charge_hg) / adc_to_pe + charge_pe_lg = np.array(charge_lg) / adc_to_pe + + for channel, charge in enumerate([charge_pe_hg, charge_pe_lg]): + pix_mean_charge = np.mean(charge, axis=0) # in pe + pix_std_charge = np.std(charge, axis=0) - - #average of all pixels + + # average of all pixels mean_charge[channel] = np.mean(pix_mean_charge) - + std_charge[channel] = np.mean(pix_std_charge) - #for the charge resolution + # for the charge resolution std_err[channel] = np.std(pix_std_charge) return mean_charge, std_charge, std_err, npixels - - - class ToMContainer(NectarCAMContainer): @@ -346,35 +316,45 @@ class ToMContainer(NectarCAMContainer): ) event_id = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event ids") - charge_hg = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="The mean high gain charge per event" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="The mean high gain charge per event", ) - + # tom_mu = Field( # type=np.ndarray, dtype=np.float64, ndim=2, description="Time of maximum of signal fitted with gaussian" # ) - + # tom_sigma = Field( # type=np.ndarray, dtype=np.float64, ndim=2, description="Time of fitted maximum sigma" # ) tom_no_fit = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="Time of maximum from data (no fitting)" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="Time of maximum from data (no fitting)", + ) + good_evts = Field( + type=np.ndarray, + dtype=np.uint32, + ndim=1, + description="good (non cosmic ray) event ids", ) - good_evts = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="good (non cosmic ray) event ids") - class ToMComp(NectarCAMComponent): """ This class, `ToMComp`, is a component of the NectarCAM system that is responsible for processing waveform data. It has several configurable parameters, including the width and shift before the peak of the time window for charge extraction, the peak height threshold. - + The `__init__` method initializes some important component members, such as timestamps, event type, event ids, pedestal and charge values for both gain channels. The `__call__` method is the main entry point for processing an event. It extracts the waveform data, calculates the pedestal, charge, and time of maximum (ToM) for each pixel, and filters out events that do not meet the peak height threshold. The results are stored in various member variables, which are then returned in the `finish` method. - + The `finish` method collects the processed data from the member variables and returns a `ToMContainer` object, which contains the run number, number of pixels, pixel IDs, UCTS timestamps, event types, event IDs, high-gain charge, ToM without fitting, and IDs of good (non-cosmic ray) events. """ + window_shift = Integer( default_value=6, help="the time in ns before the peak to extract charge", @@ -399,94 +379,103 @@ def __init__(self, subarray, config=None, parent=None, *args, **kwargs): self.__ucts_timestamp = [] self.__event_type = [] self.__event_id = [] - + self.__charge_hg = [] self.__pedestal_hg = [] - # self.__tom_mu = [] - + # self.__tom_mu = [] + # self.__tom_sigma = [] self.__tom_no_fit = [] - + self.__good_evts = [] self.__ff_event_ind = -1 - - - ##This method need to be defined ! def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): self.__event_id.append(np.uint32(event.index.event_id)) - - - - + self.__event_type.append(event.trigger.event_type.value) self.__ucts_timestamp.append(event.nectarcam.tel[0].evt.ucts_timestamp) - wfs = [] wfs.append(event.r0.tel[0].waveform[constants.HIGH_GAIN][self.pixels_id]) - - self.__ff_event_ind += 1 + self.__ff_event_ind += 1 #####THE JOB IS HERE###### - - for i, (pedestal,charge,tom_no_fit) in enumerate(zip([self.__pedestal_hg],[self.__charge_hg],[self.__tom_no_fit])): - wf = np.array(wfs[i]) #waveform per gain - index_peak = np.argmax(wf,axis=1) #tom per event/pixel - #use it to first find pedestal and then will filter out no signal events - index_peak[index_peak<20]=20 - index_peak[index_peak>40]=40 + + for i, (pedestal, charge, tom_no_fit) in enumerate( + zip([self.__pedestal_hg], [self.__charge_hg], [self.__tom_no_fit]) + ): + wf = np.array(wfs[i]) # waveform per gain + index_peak = np.argmax(wf, axis=1) # tom per event/pixel + # use it to first find pedestal and then will filter out no signal events + index_peak[index_peak < 20] = 20 + index_peak[index_peak > 40] = 40 signal_start = index_peak - self.window_shift signal_stop = index_peak + self.window_width - self.window_shift - - ped = np.array([np.mean(wf[pix, 0:signal_start[pix]]) for pix in range(len(self.pixels_id))]) - + + ped = np.array( + [ + np.mean(wf[pix, 0 : signal_start[pix]]) + for pix in range(len(self.pixels_id)) + ] + ) + pedestal.append(ped) - tom_no_fit_evt = np.zeros(len(self.pixels_id)) chg = np.zeros(len(self.pixels_id)) - #will calculate tom & charge like federica + # will calculate tom & charge like federica for pix in range(len(self.pixels_id)): - y = wf[pix] - ped[pix]#np.maximum(wf[pix] - ped[pix],np.zeros(len(wf[pix]))) - - + y = ( + wf[pix] - ped[pix] + ) # np.maximum(wf[pix] - ped[pix],np.zeros(len(wf[pix]))) + x = np.linspace(0, len(y), len(y)) xi = np.linspace(0, len(y), 251) ius = InterpolatedUnivariateSpline(x, y) yi = ius(xi) - peaks, _ = find_peaks(yi, height=self.peak_height, ) - #print(peaks) - peaks = peaks[xi[peaks]>20] - peaks = peaks[xi[peaks]<32] - #print(peaks) - - if len(peaks)>0: + peaks, _ = find_peaks( + yi, + height=self.peak_height, + ) + # print(peaks) + peaks = peaks[xi[peaks] > 20] + peaks = peaks[xi[peaks] < 32] + # print(peaks) + + if len(peaks) > 0: # find the max peak # max_peak = max(yi[peaks]) max_peak_index = np.argmax(yi[peaks], axis=0) - + # Check if there is not a peak but a plateaux - yi_rounded = np.around(yi[peaks]/max(yi[peaks]),1) - maxima_peak_index = np.argwhere(yi_rounded== np.amax(yi_rounded)) - - - #saturated events - if ((len(maxima_peak_index) > 1) and (min(xi[peaks[maxima_peak_index]]) > signal_start[pix]) and ( - max(xi[peaks[maxima_peak_index]]) < signal_stop[pix])): + yi_rounded = np.around(yi[peaks] / max(yi[peaks]), 1) + maxima_peak_index = np.argwhere(yi_rounded == np.amax(yi_rounded)) + + # saturated events + if ( + (len(maxima_peak_index) > 1) + and (min(xi[peaks[maxima_peak_index]]) > signal_start[pix]) + and (max(xi[peaks[maxima_peak_index]]) < signal_stop[pix]) + ): # saturated event - + max_peak_index = int(np.median(maxima_peak_index)) - # simple sum integration - x_max = closest_value(x, xi[peaks[max_peak_index]]) # find the maximum in the not splined array - charge_sum = y[(list(x).index(x_max) - (self.window_width - 10)):(list(x).index(x_max) + (self.window_width - 6))].sum() + x_max_pos = np.argmin( + np.abs(x - xi[peaks[max_peak_index]]) + ) # find the maximum in the not splined array + charge_sum = y[ + (x_max_pos - (self.window_width - 10)) : ( + x_max_pos + (self.window_width - 6) + ) + ].sum() # gaussian fit # change_grad_pos_left = 3 @@ -498,34 +487,38 @@ def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): # y_fit = np.ma.compressed(mask) # mean = xi[peaks[max_peak_index]] # sigma = change_grad_pos_right + change_grad_pos_left - + # # fit # model = Model(gaus) # params = model.make_params(a=yi[peaks[max_peak_index]] * 3, mu=mean, sigma=sigma) # result = model.fit(y_fit, params, x=x_fit) - + # result_sigma = result.params['sigma'].value # result_mu = result.params['mu'].value - max_position_x_prefit = xi[peaks[max_peak_index]] - - elif ((len(maxima_peak_index) == 1) and (xi[peaks[max_peak_index]] > signal_start[pix]) and ( - xi[peaks[max_peak_index]]< signal_stop[pix])): - + elif ( + (len(maxima_peak_index) == 1) + and (xi[peaks[max_peak_index]] > signal_start[pix]) + and (xi[peaks[max_peak_index]] < signal_stop[pix]) + ): # simple sum integration - x_max = closest_value(x, xi[peaks[max_peak_index]]) # find the maximum in the not splined array - charge_sum = y[(list(x).index(x_max) - (self.window_width - 10)):(list(x).index(x_max) + (self.window_width - 6))].sum() - - + x_max_pos = np.argmin( + np.abs(x - xi[peaks[max_peak_index]]) + ) # find the maximum in the not splined array + charge_sum = y[ + (x_max_pos - (self.window_width - 10)) : ( + x_max_pos + (self.window_width - 6) + ) + ].sum() # gaussian fit # change_grad_pos_left = 3 # change_grad_pos_right = 3 # mean = xi[peaks[max_peak_index]] # sigma = change_grad_pos_right + change_grad_pos_left # define window for the gaussian fit - + # x_fit = xi[peaks[max_peak_index]-change_grad_pos_left:peaks[max_peak_index]+change_grad_pos_right] # y_fit = yi[peaks[max_peak_index]-change_grad_pos_left:peaks[max_peak_index]+change_grad_pos_right] # model = Model(gaus) @@ -535,63 +528,49 @@ def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): max_position_x_prefit = xi[peaks[max_peak_index]] # result_sigma = result.params['sigma'].value # result_mu = result.params['mu'].value - - else: - # index_x_window_min = list(xi).index(closest_value(xi, signal_start[pix])) - charge_sum = y[signal_start[pix]:signal_start[pix] + self.window_width].sum() - - + charge_sum = y[ + signal_start[pix] : signal_start[pix] + self.window_width + ].sum() max_position_x_prefit = -1 # result_sigma = -1 # result_mu = -1 - else: - # If no maximum is found, the integration is done between 20 and 36 ns. - signal_start[pix]=20 - + signal_start[pix] = 20 + # index_x_window_min = list(xi).index(closest_value(xi, signal_start[pix])) - charge_sum = y[signal_start[pix]:signal_start[pix] + self.window_width].sum() + charge_sum = y[ + signal_start[pix] : signal_start[pix] + self.window_width + ].sum() max_position_x_prefit = -1 - + # result_sigma = -1 # result_mu = -1 - - + # tom_mu_evt[pix] = result_mu # tom_sigma_evt[pix] = result_sigma tom_no_fit_evt[pix] = max_position_x_prefit chg[pix] = charge_sum - - - # tom_mu.append(tom_mu_evt) # tom_sigma.append(tom_sigma_evt) tom_no_fit.append(tom_no_fit_evt) - #print("tom for event", tom_no_fit_evt) + # print("tom for event", tom_no_fit_evt) charge.append(chg) - - #is it a good event? - if np.max(chg)<10*np.mean(chg): - #print("is good evt") + # is it a good event? + if np.max(chg) < 10 * np.mean(chg): + # print("is good evt") self.__good_evts.append(self.__ff_event_ind) - - - - - ##This method need to be defined ! def finish(self): - output = ToMContainer( run_number=ToMContainer.fields["run_number"].type(self._run_number), npixels=ToMContainer.fields["npixels"].type(self._npixels), @@ -601,43 +580,29 @@ def finish(self): ), event_type=ToMContainer.fields["event_type"].dtype.type(self.__event_type), event_id=ToMContainer.fields["event_id"].dtype.type(self.__event_id), - - charge_hg=ToMContainer.fields["charge_hg"].dtype.type( - self.__charge_hg - ), - + charge_hg=ToMContainer.fields["charge_hg"].dtype.type(self.__charge_hg), # tom_mu=ToMContainer.fields["tom_mu"].dtype.type( # self.__tom_mu # ), - # tom_sigma=ToMContainer.fields["tom_sigma"].dtype.type( # self.__tom_sigma # ), - - tom_no_fit=ToMContainer.fields["tom_no_fit"].dtype.type( - self.__tom_no_fit - ), - - good_evts=ToMContainer.fields["good_evts"].dtype.type( - self.__good_evts - ), - + tom_no_fit=ToMContainer.fields["tom_no_fit"].dtype.type(self.__tom_no_fit), + good_evts=ToMContainer.fields["good_evts"].dtype.type(self.__good_evts), ) return output - - - class TimingResolutionTestTool(EventsLoopNectarCAMCalibrationTool): """ This class, `TimingResolutionTestTool`, is a subclass of `EventsLoopNectarCAMCalibrationTool` and is used to perform timing resolution tests on NectarCAM data. It reads the output data from the `ToMContainer` dataset and processes the charge, timing, and event information to calculate the timing resolution and mean charge in photoelectrons. - + The `finish()` method is the main entry point for this tool. It reads the output data from the HDF5 file, filters the data to remove cosmic ray events, and then calculates the timing resolution and mean charge per photoelectron. The timing resolution is calculated using a weighted mean and variance approach, with an option to use a bootstrapping method to estimate the error on the RMS value. - + The method returns the RMS of the timing resolution, the error on the RMS, and the mean charge in photoelectrons. """ + name = "TimingResolutionTestTool" componentsList = ComponentNameList( @@ -646,9 +611,6 @@ class TimingResolutionTestTool(EventsLoopNectarCAMCalibrationTool): help="List of Component names to be apply, the order will be respected", ).tag(config=True) - - - def _init_output_path(self): if self.max_events is None: filename = f"{self.name}_run{self.run_number}.h5" @@ -659,22 +621,21 @@ def _init_output_path(self): ) def finish(self, bootstrap=False, *args, **kwargs): - super().finish(return_output_component=True, *args, **kwargs) - - #tom_mu_all= output[0].tom_mu - #tom_sigma_all= output[0].tom_sigma + + # tom_mu_all= output[0].tom_mu + # tom_sigma_all= output[0].tom_sigma output_file = h5py.File(self.output_path) - + charge_all = [] tom_no_fit_all = [] good_evts = [] - + for thing in output_file: group = output_file[thing] - dataset = group['ToMContainer'] + dataset = group["ToMContainer"] data = dataset[:] - #print("data",data) + # print("data",data) for tup in data: try: npixels = tup[1] @@ -683,111 +644,104 @@ def finish(self, bootstrap=False, *args, **kwargs): good_evts.extend(tup[8]) except: break - + output_file.close() - - - tom_no_fit_all= np.array(tom_no_fit_all) - charge_all = np.array(charge_all) - #print(tom_no_fit_all) - #print(charge_all) - + tom_no_fit_all = np.array(tom_no_fit_all) + charge_all = np.array(charge_all) + # print(tom_no_fit_all) + # print(charge_all) - #clean cr events + # clean cr events good_evts = np.array(good_evts) - #print(good_evts) - charge=charge_all[good_evts] - mean_charge_pe = np.mean(np.mean(charge,axis=0))/58. - #tom_mu = np.array(tom_mu_all[good_evts]).reshape(len(good_evts),output[0].npixels) - - #tom_sigma = np.array(tom_sigma_all[good_evts]).reshape(len(good_evts),output[0].npixels) - tom_no_fit = np.array(tom_no_fit_all[good_evts]).reshape(len(good_evts),npixels) - #print(tom_no_fit) - #print(tom_no_fit) - - #rms_mu = np.zeros(output[0].npixels) + # print(good_evts) + charge = charge_all[good_evts] + mean_charge_pe = np.mean(np.mean(charge, axis=0)) / 58.0 + # tom_mu = np.array(tom_mu_all[good_evts]).reshape(len(good_evts),output[0].npixels) + + # tom_sigma = np.array(tom_sigma_all[good_evts]).reshape(len(good_evts),output[0].npixels) + tom_no_fit = np.array(tom_no_fit_all[good_evts]).reshape( + len(good_evts), npixels + ) + # print(tom_no_fit) + # print(tom_no_fit) + + # rms_mu = np.zeros(output[0].npixels) rms_no_fit = np.zeros(npixels) - - #rms_mu_err = np.zeros(output[0].npixels) + + # rms_mu_err = np.zeros(output[0].npixels) rms_no_fit_err = np.zeros(npixels) - - - #bootstrapping method - + + # bootstrapping method + for pix in range(npixels): - - for tom, rms, err in (zip([tom_no_fit[:,pix]],[rms_no_fit],[rms_no_fit_err])): - tom_pos = tom[tom>20] + for tom, rms, err in zip( + [tom_no_fit[:, pix]], [rms_no_fit], [rms_no_fit_err] + ): + tom_pos = tom[tom > 20] boot_rms = [] - - sample = tom_pos[tom_pos<32] - #print(sample) - bins = np.linspace(20,32,50) + + sample = tom_pos[tom_pos < 32] + # print(sample) + bins = np.linspace(20, 32, 50) hist_values, bin_edges = np.histogram(sample, bins=bins) - - + # Compute bin centers bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2 - - - + if bootstrap: - #print(len(sample)) - if len(sample)!=0: + # print(len(sample)) + if len(sample) != 0: for _ in range(1000): - bootsample = np.random.choice(sample,size=int(3/4*(len(sample))), replace=True) - # print(len(bootsample), bootsample.mean(), bootsample.std()) - boot_rms.append(bootsample.std()) + bootsample = np.random.choice( + sample, size=int(3 / 4 * (len(sample))), replace=True + ) + # print(len(bootsample), bootsample.mean(), bootsample.std()) + boot_rms.append(bootsample.std()) # simulated mean of rms bootrms_mean = np.mean(boot_rms) - # simulated standard deviation of rms + # simulated standard deviation of rms bootrms_std = np.std(boot_rms) else: bootrms_std = 0 bootrms_mean = 0 # print(bootrms_std) - err[pix]=bootrms_std + err[pix] = bootrms_std rms[pix] = bootrms_mean - - else: + else: try: - - weighted_mean = np.average(bin_centers, weights=hist_values) - #print("Weighted Mean:", weighted_mean) + # print("Weighted Mean:", weighted_mean) # Compute weighted variance - weighted_variance = np.average((bin_centers - weighted_mean)**2, weights=hist_values) - #print("Weighted Variance:", weighted_variance) + weighted_variance = np.average( + (bin_centers - weighted_mean) ** 2, weights=hist_values + ) + # print("Weighted Variance:", weighted_variance) # Compute RMS value (Standard deviation) rms[pix] = np.sqrt(weighted_variance) - #print("RMS:", rms[pix]) + # print("RMS:", rms[pix]) # Compute the total number of data points (sum of histogram values, i.e. N) N = np.sum(hist_values) - #print("Total number of events (N):", N) + # print("Total number of events (N):", N) # Error on the standard deviation err[pix] = rms[pix] / np.sqrt(2 * N) - #print("Error on RMS:", err[pix]) + # print("Error on RMS:", err[pix]) except: - #no data + # no data rms[pix] = np.nan - err[pix] = np.nan - - - - return rms_no_fit, rms_no_fit_err, mean_charge_pe - + err[pix] = np.nan + return rms_no_fit, rms_no_fit_err, mean_charge_pe class ToMPairsTool(EventsLoopNectarCAMCalibrationTool): """ - This class, `ToMPairsTool`, is an `EventsLoopNectarCAMCalibrationTool` that is used to process ToM (Time of maximum) data from NectarCAM. + This class, `ToMPairsTool`, is an `EventsLoopNectarCAMCalibrationTool` that is used to process ToM (Time of maximum) data from NectarCAM. The `finish` method has the following functionalities: @@ -798,7 +752,7 @@ class ToMPairsTool(EventsLoopNectarCAMCalibrationTool): The class has several configurable parameters, including the list of NectarCAM components to apply, the maximum number of events to process, and the output file path. """ - + name = "ToMPairsTool" componentsList = ComponentNameList( @@ -807,104 +761,117 @@ class ToMPairsTool(EventsLoopNectarCAMCalibrationTool): help="List of Component names to be apply, the order will be respected", ).tag(config=True) - - - - def finish(self, *args, **kwargs): - super().finish(return_output_component=True, *args[1:], **kwargs) tt_path = args[0] pmt_tt = pd.read_csv(tt_path)["tom_pmttt_delta_correction"].values - - tom_no_fit_all= [] - + + tom_no_fit_all = [] + pixels_id = [] - output_file = h5py.File(self.output_path) - - + for thing in output_file: group = output_file[thing] - dataset = group['ToMContainer'] + dataset = group["ToMContainer"] data = dataset[:] - #print("data",data) + # print("data",data) for tup in data: try: pixels_id.extend(tup[2]) tom_no_fit_all.extend(tup[7]) except: break - + output_file.close() pixels_id = np.array(pixels_id) tom_no_fit_all = np.array(tom_no_fit_all) - - #clean cr events - #good_evts = output[0].good_evts - #charge=charge_all[good_evts] - #mean_charge_pe = np.mean(np.mean(charge,axis=0))/58. - tom_no_fit = np.array(tom_no_fit_all,dtype=np.float64) #tom(event,pixel) - #tom_no_fit = tom_no_fit[np.all(tom_no_fit>0,axis=0)] - tom_corrected = -np.ones(tom_no_fit.shape,dtype=np.float128) #-1 for the ones that have tom beyond 0-60 + + # clean cr events + # good_evts = output[0].good_evts + # charge=charge_all[good_evts] + # mean_charge_pe = np.mean(np.mean(charge,axis=0))/58. + tom_no_fit = np.array(tom_no_fit_all, dtype=np.float64) # tom(event,pixel) + # tom_no_fit = tom_no_fit[np.all(tom_no_fit>0,axis=0)] + tom_corrected = -np.ones( + tom_no_fit.shape, dtype=np.float128 + ) # -1 for the ones that have tom beyond 0-60 iter = enumerate(pixels_id) - for i,pix in iter: - #print(pix, pmt_tt[pix]) - normal_values = [a and b for a,b in zip(tom_no_fit[:,i]>0,tom_no_fit[:,i]<60)] - - tom_corrected[normal_values,i] = tom_no_fit[:,i][normal_values]-pmt_tt[pix] - - #print(tom_corrected) - - pixel_ind = [i for i in range(len(pixels_id))] #dealing with indices of pixels in array - pixel_pairs = list(combinations(pixel_ind,2)) - dt_no_correction = np.zeros((len(pixel_pairs),tom_no_fit_all.shape[0])) - dt_corrected = np.zeros((len(pixel_pairs),tom_no_fit_all.shape[0])) - - for i,pair in enumerate(pixel_pairs): + for i, pix in iter: + # print(pix, pmt_tt[pix]) + normal_values = [ + a and b for a, b in zip(tom_no_fit[:, i] > 0, tom_no_fit[:, i] < 60) + ] + + tom_corrected[normal_values, i] = ( + tom_no_fit[:, i][normal_values] - pmt_tt[pix] + ) + + # print(tom_corrected) + + pixel_ind = [ + i for i in range(len(pixels_id)) + ] # dealing with indices of pixels in array + pixel_pairs = list(combinations(pixel_ind, 2)) + dt_no_correction = np.zeros((len(pixel_pairs), tom_no_fit_all.shape[0])) + dt_corrected = np.zeros((len(pixel_pairs), tom_no_fit_all.shape[0])) + + for i, pair in enumerate(pixel_pairs): pix1_ind = pixel_ind[pair[0]] pix2_ind = pixel_ind[pair[1]] - + for event in range(tom_no_fit_all.shape[0]): - cond_no_correction = (tom_no_fit[event,pix1_ind]>0 and tom_no_fit[event,pix1_ind]<60 - and tom_no_fit[event,pix2_ind]>0 and tom_no_fit[event,pix2_ind]<60) - cond_correction = (tom_corrected[event,pix1_ind]>0 and tom_corrected[event,pix1_ind]<60 - and tom_corrected[event,pix2_ind]>0 and tom_corrected[event,pix2_ind]<60) - - if cond_no_correction: #otherwise will be nan - - dt_no_correction[i,event] = tom_no_fit[event,pix1_ind] - tom_no_fit[event,pix2_ind] - + cond_no_correction = ( + tom_no_fit[event, pix1_ind] > 0 + and tom_no_fit[event, pix1_ind] < 60 + and tom_no_fit[event, pix2_ind] > 0 + and tom_no_fit[event, pix2_ind] < 60 + ) + cond_correction = ( + tom_corrected[event, pix1_ind] > 0 + and tom_corrected[event, pix1_ind] < 60 + and tom_corrected[event, pix2_ind] > 0 + and tom_corrected[event, pix2_ind] < 60 + ) + + if cond_no_correction: # otherwise will be nan + dt_no_correction[i, event] = ( + tom_no_fit[event, pix1_ind] - tom_no_fit[event, pix2_ind] + ) + else: - dt_no_correction[i,event] = np.nan - + dt_no_correction[i, event] = np.nan + if cond_correction: - - dt_corrected[i,event] = tom_corrected[event,pix1_ind] - tom_corrected[event,pix2_ind] - + dt_corrected[i, event] = ( + tom_corrected[event, pix1_ind] - tom_corrected[event, pix2_ind] + ) + else: - dt_corrected[i,event] = np.nan - + dt_corrected[i, event] = np.nan - - # rms_no_fit = np.zeros(output[0].npixels) # rms_no_fit_err = np.zeros(output[0].npixels) - - - return tom_no_fit,tom_corrected,pixels_id, dt_no_correction, dt_corrected, pixel_pairs - + + return ( + tom_no_fit, + tom_corrected, + pixels_id, + dt_no_correction, + dt_corrected, + pixel_pairs, + ) class PedestalContainer(NectarCAMContainer): """ Attributes of the PedestalContainer class that store various data related to the pedestal of a NectarCAM event. - + Attributes: run_number (np.uint16): The run number associated with the waveforms. npixels (np.uint16): The number of effective pixels. @@ -917,6 +884,7 @@ class PedestalContainer(NectarCAMContainer): rms_ped_hg (np.ndarray[np.float64]): The high gain pedestal RMS per event. rms_ped_lg (np.ndarray[np.float64]): The low gain pedestal RMS per event. """ + run_number = Field( type=np.uint16, description="run number associated to the waveforms", @@ -934,38 +902,47 @@ class PedestalContainer(NectarCAMContainer): ) event_id = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event ids") - pedestal_hg = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="High gain pedestal per event" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="High gain pedestal per event", ) pedestal_lg = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="Low gain pedestal per event" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="Low gain pedestal per event", ) rms_ped_hg = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="High gain pedestal rms per event" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="High gain pedestal rms per event", ) rms_ped_lg = Field( - type=np.ndarray, dtype=np.float64, ndim=2, description="Low gain pedestal rms per event" + type=np.ndarray, + dtype=np.float64, + ndim=2, + description="Low gain pedestal rms per event", ) - - class PedestalComp(NectarCAMComponent): """ The `PedestalComp` class is a NectarCAMComponent that is responsible for processing the pedestal and RMS of the high and low gain waveforms for each event. - + The `__init__` method initializes the `PedestalComp` class. It sets up several member variables to store pedestal related data such as timestamps, event types, event IDs, pedestal and pedestal rms values for both gains. The `__call__` method is called for each event, and it processes the waveforms to calculate the pedestal and RMS for the high and low gain channels. The results are stored in the class attributes `__pedestal_hg`, `__pedestal_lg`, `__rms_ped_hg`, and `__rms_ped_lg`. - + The `finish` method is called at the end of processing, and it returns a `PedestalContainer` object containing the calculated pedestal and RMS values, as well as other event information. """ - + def __init__(self, subarray, config=None, parent=None, *args, **kwargs): super().__init__( subarray=subarray, config=config, parent=parent, *args, **kwargs @@ -975,8 +952,7 @@ def __init__(self, subarray, config=None, parent=None, *args, **kwargs): self.__ucts_timestamp = [] self.__event_type = [] self.__event_id = [] - - + self.__pedestal_hg = [] self.__pedestal_lg = [] @@ -984,53 +960,39 @@ def __init__(self, subarray, config=None, parent=None, *args, **kwargs): self.__rms_ped_hg = [] self.__rms_ped_lg = [] - - - - ##This method need to be defined ! def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): self.__event_id.append(np.uint32(event.index.event_id)) - - - #print(event.trigger.event_type) - + + # print(event.trigger.event_type) + self.__event_type.append(event.trigger.event_type.value) self.__ucts_timestamp.append(event.nectarcam.tel[0].evt.ucts_timestamp) - wfs = [] wfs.append(event.r0.tel[0].waveform[constants.HIGH_GAIN][self.pixels_id]) wfs.append(event.r0.tel[0].waveform[constants.LOW_GAIN][self.pixels_id]) - - - #####THE JOB IS HERE###### - - for i, (pedestal,rms_pedestal) in enumerate(zip([self.__pedestal_hg,self.__pedestal_lg],[self.__rms_ped_hg,self.__rms_ped_lg])): - wf = np.array(wfs[i]) #waveform per gain - + for i, (pedestal, rms_pedestal) in enumerate( + zip( + [self.__pedestal_hg, self.__pedestal_lg], + [self.__rms_ped_hg, self.__rms_ped_lg], + ) + ): + wf = np.array(wfs[i]) # waveform per gain + ped = np.array([np.mean(wf[pix]) for pix in range(len(self.pixels_id))]) rms_ped = np.array([np.std(wf[pix]) for pix in range(len(self.pixels_id))]) - ped[ped>1000]=np.nan - rms_ped[ped>1000]=np.nan + ped[ped > 1000] = np.nan + rms_ped[ped > 1000] = np.nan pedestal.append(ped) rms_pedestal.append(rms_ped) - - - - - - - - ##This method need to be defined ! def finish(self): - output = PedestalContainer( run_number=PedestalContainer.fields["run_number"].type(self._run_number), npixels=PedestalContainer.fields["npixels"].type(self._npixels), @@ -1038,41 +1000,35 @@ def finish(self): ucts_timestamp=PedestalContainer.fields["ucts_timestamp"].dtype.type( self.__ucts_timestamp ), - event_type=PedestalContainer.fields["event_type"].dtype.type(self.__event_type), + event_type=PedestalContainer.fields["event_type"].dtype.type( + self.__event_type + ), event_id=PedestalContainer.fields["event_id"].dtype.type(self.__event_id), - pedestal_hg=PedestalContainer.fields["pedestal_hg"].dtype.type( self.__pedestal_hg ), - pedestal_lg=PedestalContainer.fields["pedestal_lg"].dtype.type( self.__pedestal_lg ), rms_ped_hg=PedestalContainer.fields["rms_ped_hg"].dtype.type( self.__rms_ped_hg ), - rms_ped_lg=PedestalContainer.fields["rms_ped_lg"].dtype.type( self.__rms_ped_lg ), - - ) return output - - - - class PedestalTool(EventsLoopNectarCAMCalibrationTool): """ - This class is a part of the PedestalTool, which is an EventsLoopNectarCAMCalibrationTool. + This class is a part of the PedestalTool, which is an EventsLoopNectarCAMCalibrationTool. The finish() method opens the output file, which is an HDF5 file, and extracts the `rms_ped_hg` (root mean square of the high gain pedestal) values from the `PedestalContainer` dataset. Finally, it closes the output file and returns the list of `rms_ped_hg` values. - + This method is used to post-process the output of the PedestalTool and extract specific information from the generated HDF5 file. """ + name = "PedestalTool" componentsList = ComponentNameList( @@ -1081,9 +1037,6 @@ class PedestalTool(EventsLoopNectarCAMCalibrationTool): help="List of Component names to be apply, the order will be respected", ).tag(config=True) - - - def _init_output_path(self): if self.max_events is None: filename = f"{self.name}_run{self.run_number}.h5" @@ -1093,40 +1046,33 @@ def _init_output_path(self): f"{os.environ.get('NECTARCAMDATA','/tmp')}/tests/{filename}" ) - - - def finish(self, *args, **kwargs): - super().finish(return_output_component=False, *args, **kwargs) - #print(self.output_path) + # print(self.output_path) output_file = h5py.File(self.output_path) - + rms_ped_hg = [] - + for thing in output_file: group = output_file[thing] - dataset = group['PedestalContainer'] + dataset = group["PedestalContainer"] data = dataset[:] - #print("data",data) + # print("data",data) for tup in data: try: rms_ped_hg.extend(tup[8]) except: break - - output_file.close() + output_file.close() - return rms_ped_hg - class UCTSContainer(NectarCAMContainer): """ Defines the fields for the UCTSContainer class, which is used to store various data related to UCTS events. - + The fields include: - `run_number`: The run number associated with the waveforms. - `npixels`: The number of effective pixels. @@ -1137,6 +1083,7 @@ class UCTSContainer(NectarCAMContainer): - `ucts_busy_counter`: The UCTS busy counter. - `ucts_event_counter`: The UCTS event counter. """ + run_number = Field( type=np.uint16, description="run number associated to the waveforms", @@ -1153,21 +1100,24 @@ class UCTSContainer(NectarCAMContainer): type=np.ndarray, dtype=np.uint8, ndim=1, description="trigger event type" ) event_id = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event ids") - ucts_busy_counter = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="busy counter") - ucts_event_counter = Field(type=np.ndarray, dtype=np.uint32, ndim=1, description="event counter") - - + ucts_busy_counter = Field( + type=np.ndarray, dtype=np.uint32, ndim=1, description="busy counter" + ) + ucts_event_counter = Field( + type=np.ndarray, dtype=np.uint32, ndim=1, description="event counter" + ) + class UCTSComp(NectarCAMComponent): - """ The `__init__` method initializes the `UCTSComp` class, which is a NectarCAMComponent. It sets up several member variables to store UCTS related data, such as timestamps, event types, event IDs, busy counters, and event counters. - + The `__call__` method is called for each event, and it appends the UCTS-related data from the event to the corresponding member variables. - + The `finish` method creates and returns a `UCTSContainer` object, which is a container for the UCTS-related data that was collected during the event loop. """ + def __init__(self, subarray, config=None, parent=None, *args, **kwargs): super().__init__( subarray=subarray, config=config, parent=parent, *args, **kwargs @@ -1179,12 +1129,6 @@ def __init__(self, subarray, config=None, parent=None, *args, **kwargs): self.__event_id = [] self.__ucts_busy_counter = [] self.__ucts_event_counter = [] - - - - - - ##This method need to be defined ! def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): @@ -1193,17 +1137,9 @@ def __call__(self, event: NectarCAMDataContainer, *args, **kwargs): self.__ucts_timestamp.append(event.nectarcam.tel[0].evt.ucts_timestamp) self.__ucts_busy_counter.append(event.nectarcam.tel[0].evt.ucts_busy_counter) self.__ucts_event_counter.append(event.nectarcam.tel[0].evt.ucts_event_counter) - - - - - - - ##This method need to be defined ! def finish(self): - output = UCTSContainer( run_number=UCTSContainer.fields["run_number"].type(self._run_number), npixels=UCTSContainer.fields["npixels"].type(self._npixels), @@ -1219,21 +1155,18 @@ def finish(self): ), event_type=UCTSContainer.fields["event_type"].dtype.type(self.__event_type), event_id=UCTSContainer.fields["event_id"].dtype.type(self.__event_id), - ) return output - - - class DeadtimeTestTool(EventsLoopNectarCAMCalibrationTool): """ - The `DeadtimeTestTool` class is an `EventsLoopNectarCAMCalibrationTool` that is used to test the deadtime of NectarCAM. - + The `DeadtimeTestTool` class is an `EventsLoopNectarCAMCalibrationTool` that is used to test the deadtime of NectarCAM. + The `finish` method is responsible for reading the data from the HDF5 file, extracting the relevant information (UCTS timestamps, event counters, and busy counters), and calculating the deadtime-related metrics. The method returns the UCTS timestamps, the time differences between consecutive UCTS timestamps, the event counters, the busy counters, the collected trigger rate, the total time, and the deadtime percentage. """ + name = "DeadtimeTestTool" componentsList = ComponentNameList( @@ -1242,23 +1175,20 @@ class DeadtimeTestTool(EventsLoopNectarCAMCalibrationTool): help="List of Component names to be apply, the order will be respected", ).tag(config=True) - - def finish(self, *args, **kwargs): - super().finish(return_output_component=False, *args, **kwargs) - #print(self.output_path) + # print(self.output_path) output_file = h5py.File(self.output_path) - - ucts_timestamps=[] - event_counter=[] - busy_counter=[] - + + ucts_timestamps = [] + event_counter = [] + busy_counter = [] + for thing in output_file: group = output_file[thing] - dataset = group['UCTSContainer'] + dataset = group["UCTSContainer"] data = dataset[:] - #print("data",data) + # print("data",data) for tup in data: try: ucts_timestamps.extend(tup[3]) @@ -1266,26 +1196,33 @@ def finish(self, *args, **kwargs): busy_counter.extend(tup[6]) except: break - #print(output_file.keys()) - #tom_mu_all= output[0].tom_mu - #tom_sigma_all= output[0].tom_sigma - #ucts_timestamps= np.array(output_file["ucts_timestamp"]) + # print(output_file.keys()) + # tom_mu_all= output[0].tom_mu + # tom_sigma_all= output[0].tom_sigma + # ucts_timestamps= np.array(output_file["ucts_timestamp"]) ucts_timestamps = np.array(ucts_timestamps).flatten() - #print(ucts_timestamps) + # print(ucts_timestamps) event_counter = np.array(event_counter).flatten() busy_counter = np.array(busy_counter).flatten() - #print(ucts_timestamps) - delta_t = [ucts_timestamps[i]-ucts_timestamps[i-1] for i in range(1,len(ucts_timestamps))] + # print(ucts_timestamps) + delta_t = [ + ucts_timestamps[i] - ucts_timestamps[i - 1] + for i in range(1, len(ucts_timestamps)) + ] # event_counter = np.array(output_file['ucts_event_counter']) # busy_counter=np.array(output_file['ucts_busy_counter']) output_file.close() - - time_tot = ((ucts_timestamps[-1] - ucts_timestamps[0]) * u.ns).to(u.s) - collected_trigger_rate = (event_counter[-1]+busy_counter[-1])/time_tot - deadtime_pc = busy_counter[-1]/(event_counter[-1]+busy_counter[-1])*100 - - - - return ucts_timestamps,delta_t,event_counter,busy_counter,collected_trigger_rate, time_tot, deadtime_pc - + time_tot = ((ucts_timestamps[-1] - ucts_timestamps[0]) * u.ns).to(u.s) + collected_trigger_rate = (event_counter[-1] + busy_counter[-1]) / time_tot + deadtime_pc = busy_counter[-1] / (event_counter[-1] + busy_counter[-1]) * 100 + + return ( + ucts_timestamps, + delta_t, + event_counter, + busy_counter, + collected_trigger_rate, + time_tot, + deadtime_pc, + ) diff --git a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/utils.py b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/utils.py index 5eb7c50b..de32db39 100644 --- a/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/utils.py +++ b/src/nectarchain/user_scripts/dmousadi/test_scripts/tests/utils.py @@ -1,295 +1,386 @@ -from ctapipe.utils import get_dataset_path +import os -import numpy as np import matplotlib.pyplot as plt - - -from scipy.interpolate import interp1d -from traitlets.config import Config -from scipy.stats import expon, poisson -from lmfit.models import Model +import numpy as np from astropy import units as u +from ctapipe.utils import get_dataset_path from iminuit import Minuit -import os - - +from lmfit.models import Model +from scipy.interpolate import interp1d +from scipy.stats import expon, poisson +from traitlets.config import Config -config = Config(dict(NectarCAMEventSource=dict( +config = Config( + dict( + NectarCAMEventSource=dict( NectarCAMR0Corrections=dict( calibration_path=None, apply_flatfield=False, select_gain=False, - )))) + ) + ) + ) +) -#(filter, optical density, transmission) -#configurations with the same order as in the document -filters=np.array( +# (filter, optical density, transmission) +# configurations with the same order as in the document +filters = np.array( [ - [0.00, 0.15, 0.30, 0.45, 0.60, 0.75, 0.90, 1.00, 1.15, 1.30, 1.30, 1.45, 1.50, 1.60, 1.65, 1.80, 1.90, 2.00, 2.10, 2.15, 2.30, 2.30, 2.50, 2.60, 2.80, 3.00, 3.30, 3.50], - [0.000000, 0.213853, 0.370538, 0.584391, 0.739449, 0.953302, 1.109987, 1.344491, 1.558344, 1.715029, 1.792931, 2.006784, 2.089531, 2.163469, 2.303384, 2.460068, 2.532380, 2.695312, 2.828980, 2.909165, 3.065849, 3.137422, 3.434022, 3.434761, 3.882462, 4.039803, 4.488243, 4.784842], - [1.000000, 0.611149, 0.426052, 0.260381, 0.182201, 0.111352, 0.077627, 0.045239, 0.027647, 0.019274, 0.016109, 0.009845, 0.008137, 0.006863, 0.004973, 0.003467, 0.002935, 0.002017, 0.001483, 0.001233, 0.000859, 0.000729, 0.000368, 0.000367, 0.000131, 0.000091, 0.000032, 0.000016] + [ + 0.00, + 0.15, + 0.30, + 0.45, + 0.60, + 0.75, + 0.90, + 1.00, + 1.15, + 1.30, + 1.30, + 1.45, + 1.50, + 1.60, + 1.65, + 1.80, + 1.90, + 2.00, + 2.10, + 2.15, + 2.30, + 2.30, + 2.50, + 2.60, + 2.80, + 3.00, + 3.30, + 3.50, + ], + [ + 0.000000, + 0.213853, + 0.370538, + 0.584391, + 0.739449, + 0.953302, + 1.109987, + 1.344491, + 1.558344, + 1.715029, + 1.792931, + 2.006784, + 2.089531, + 2.163469, + 2.303384, + 2.460068, + 2.532380, + 2.695312, + 2.828980, + 2.909165, + 3.065849, + 3.137422, + 3.434022, + 3.434761, + 3.882462, + 4.039803, + 4.488243, + 4.784842, + ], + [ + 1.000000, + 0.611149, + 0.426052, + 0.260381, + 0.182201, + 0.111352, + 0.077627, + 0.045239, + 0.027647, + 0.019274, + 0.016109, + 0.009845, + 0.008137, + 0.006863, + 0.004973, + 0.003467, + 0.002935, + 0.002017, + 0.001483, + 0.001233, + 0.000859, + 0.000729, + 0.000368, + 0.000367, + 0.000131, + 0.000091, + 0.000032, + 0.000016, + ], ] ).T -#the next ones are in the order of the test linearity runs by federica -#maybe more practical because the runs will be in that order again -trasmission_390ns = np.array([0.002016919, -0.045238588, -0.0276475, -0.019273981, -0.008242516, -0.000368111, -9.12426E-05, -1, -0.611148605, -0.260380956, -0.111351889, -0.004972972, -0.001232637, -0.009844997, -0.426051788, -0.077627064, -0.006863271, -0.003466823, -0.000859312, -0.016109006, -0.182201004, -0.002935077, -0.001482586, -0.000367485, -0.008137092, -0.00013108, -1.64119E-05, -3.24906E-05, -0.000728749,]) - - -optical_density_390ns = np.array([2.69531155, -1.34449096, -1.55834414, -1.71502857, -2.08394019, -3.43402173, -4.03980251, -0.00000000, -0.21385318, -0.58439078, -0.95330241, -2.30338395, -2.90916473, -2.00678442, -0.37053761, -1.10998684, -2.16346885, -2.46006838, -3.06584916, -1.79293125, -0.73944923, -2.53238048, -2.82898001, -3.43476079, -2.08953077, -3.88246202, -4.78484233, -4.48824280, -3.13742221,]) - -adc_to_pe = 58. +# the next ones are in the order of the test linearity runs by federica +# maybe more practical because the runs will be in that order again +trasmission_390ns = np.array( + [ + 0.002016919, + 0.045238588, + 0.0276475, + 0.019273981, + 0.008242516, + 0.000368111, + 9.12426e-05, + 1, + 0.611148605, + 0.260380956, + 0.111351889, + 0.004972972, + 0.001232637, + 0.009844997, + 0.426051788, + 0.077627064, + 0.006863271, + 0.003466823, + 0.000859312, + 0.016109006, + 0.182201004, + 0.002935077, + 0.001482586, + 0.000367485, + 0.008137092, + 0.00013108, + 1.64119e-05, + 3.24906e-05, + 0.000728749, + ] +) + + +optical_density_390ns = np.array( + [ + 2.69531155, + 1.34449096, + 1.55834414, + 1.71502857, + 2.08394019, + 3.43402173, + 4.03980251, + 0.00000000, + 0.21385318, + 0.58439078, + 0.95330241, + 2.30338395, + 2.90916473, + 2.00678442, + 0.37053761, + 1.10998684, + 2.16346885, + 2.46006838, + 3.06584916, + 1.79293125, + 0.73944923, + 2.53238048, + 2.82898001, + 3.43476079, + 2.08953077, + 3.88246202, + 4.78484233, + 4.48824280, + 3.13742221, + ] +) + +adc_to_pe = 58.0 plot_parameters = { "High Gain": { - "linearity_range": [5,-5], - "text_coords":[0.1, 600], - "label_coords":[20, 400], - "color":"C0", - "initials":"HG" + "linearity_range": [5, -5], + "text_coords": [0.1, 600], + "label_coords": [20, 400], + "color": "C0", + "initials": "HG", }, "Low Gain": { - "linearity_range": [11,-1], - "text_coords":[0.3e2, 5], - "label_coords":[1.5e3, 9], - "color":"C4", - "initials":"LG" - } + "linearity_range": [11, -1], + "text_coords": [0.3e2, 5], + "label_coords": [1.5e3, 9], + "color": "C4", + "initials": "LG", + }, } -intensity_percent = np.array([13. , 15. , 16. , 16.5, - 20.6, 22. , - 25. , 35. , 33]) -intensity_to_charge =np.array([ 1.84110824, 2.09712394, 2.24217532, 2.37545181, - 10.32825426, 28.80655155, - 102.79668643, 191.47895686, 188]) +intensity_percent = np.array([13.0, 15.0, 16.0, 16.5, 20.6, 22.0, 25.0, 35.0, 33]) +intensity_to_charge = np.array( + [ + 1.84110824, + 2.09712394, + 2.24217532, + 2.37545181, + 10.32825426, + 28.80655155, + 102.79668643, + 191.47895686, + 188, + ] +) -source_ids_deadtime = [0 for i in range(3332,3342)] + [1 for i in range(3342,3350)] + [2 for i in range(3552,3562)] +source_ids_deadtime = ( + [0 for i in range(3332, 3342)] + + [1 for i in range(3342, 3350)] + + [2 for i in range(3552, 3562)] +) -deadtime_labels = {0:{'source':"random generator",'color':'blue'}, - 1:{'source':"nsb source", 'color':'green'}, - 2:{'source':"laser",'color':'red'} +deadtime_labels = { + 0: {"source": "random generator", "color": "blue"}, + 1: {"source": "nsb source", "color": "green"}, + 2: {"source": "laser", "color": "red"}, } -def pe_from_intensity_percentage (percent,percent_from_calibration=intensity_percent,known_charge=intensity_to_charge): +def pe_from_intensity_percentage( + percent, + percent_from_calibration=intensity_percent, + known_charge=intensity_to_charge, +): """ Converts a percentage of intensity to the corresponding charge value based on a known calibration. - + Args: percent (numpy.ndarray): The percentage of intensity to convert to charge. percent_from_calibration (numpy.ndarray, optional): The known percentages of intensity used in the calibration. Defaults to `intensity_percent`. known_charge (numpy.ndarray, optional): The known charge values corresponding to the calibration percentages. Defaults to `intensity_to_charge`. - + Returns: numpy.ndarray: The charge values corresponding to the input percentages. """ - #known values from laser calibration - - #runs are done with intensity of 15-35 percent + # known values from laser calibration + + # runs are done with intensity of 15-35 percent f = interp1d(percent_from_calibration, known_charge) charge = np.zeros(len(percent)) for i in range(len(percent)): - charge[i]=f(percent[i]) + charge[i] = f(percent[i]) return charge -#functions by federica -def fit_function(x,a,b): + +# functions by federica +def linear_fit_function(x, a, b): """ Computes a linear function of the form `a*x + b`. - + Args: x (float): The input value. a (float): The slope coefficient. b (float): The y-intercept. - + Returns: float: The result of the linear function. """ - return a*x + b + return a * x + b -def fit_function2(x,a,b,c): +def second_degree_fit_function(x, a, b, c): """ Computes a quadratic function of the form `a*(x**2) + b*x + c`. - + Args: x (float): The input value. a (float): The coefficient of the squared term. b (float): The coefficient of the linear term. c (float): The constant term. - + Returns: float: The result of the quadratic function. """ - return a*(x**2) + b*x + c + return a * (x**2) + b * x + c + -def fit_function3(x,a,b,c, d): +def third_degree_fit_function(x, a, b, c, d): """ Computes a function of the form `(a*x + b)/(1+c) + d`. - + Args: x (float): The input value. a (float): The coefficient of the linear term. b (float): The constant term in the numerator. c (float): The coefficient of the denominator. d (float): The constant term added to the result. - + Returns: float: The result of the function. """ - return (a*x + b)/(1+c) + d + return (a * x + b) / (1 + c) + d + -def fit_function_hv(x,a,b): +def fit_function_hv(x, a, b): """ Computes a function of the form `a/sqrt(x) + b`. - + Args: x (float): The input value. a (float): The coefficient of the term `1/sqrt(x)`. b (float): The constant term. - + Returns: float: The result of the function. """ - return a/np.sqrt(x)+b + return a / np.sqrt(x) + b -def err_ratio(nominator, denominator, err_norm, err_denom, cov_nom_den = 0): +def err_ratio(nominator, denominator, err_norm, err_denom, cov_nom_den=0): """ Computes the error ratio for a given nominator, denominator, and their respective errors. - + Args: nominator (float): The nominator value. denominator (float): The denominator value. err_norm (float): The error of the nominator. err_denom (float): The error of the denominator. cov_nom_den (float, optional): The covariance between the nominator and denominator. Defaults to 0. - + Returns: float: The error ratio. """ - delta_err2 = (err_norm/nominator)**2 + (err_denom/denominator)**2 -2*cov_nom_den/(nominator*denominator) - ratio = nominator/denominator - return np.sqrt(delta_err2)*ratio + delta_err2 = ( + (err_norm / nominator) ** 2 + + (err_denom / denominator) ** 2 + - 2 * cov_nom_den / (nominator * denominator) + ) + ratio = nominator / denominator + return np.sqrt(delta_err2) * ratio + def err_sum(err_a, err_b, cov_a_b=0): """ Computes the square root of the sum of the squares of `err_a` and `err_b`, plus twice the covariance `cov_a_b`. - + This function is used to calculate the combined error of two values, taking into account their individual errors and the covariance between them. - + Args: err_a (float): The error of the first value. err_b (float): The error of the second value. cov_a_b (float, optional): The covariance between the two values. Defaults to 0. - - Returns: - float: The combined error. - """ - return np.sqrt(err_a**2 + err_b**2 + 2*cov_a_b) -def closest_value(input_list, input_value): - """ - Returns the value in `input_list` that is closest to `input_value`. - - Args: - input_list (list): The list of values to search. - input_value (float): The value to find the closest match for. - - Returns: - float: The value in `input_list` that is closest to `input_value`. - """ - arr = np.asarray(input_list) - i = (np.abs(arr - input_value)).argmin() - return arr[i] - -def gaus(x,a,mu,sigma): - """ - Computes the Gaussian function with the given parameters. - - Args: - x (float): The input value. - a (float): The amplitude of the Gaussian function. - mu (float): The mean (center) of the Gaussian function. - sigma (float): The standard deviation of the Gaussian function. - Returns: - float: The value of the Gaussian function at the given input `x`. + float: The combined error. """ - return a*np.exp(-(x-mu)**2/(2*sigma**2)) + return np.sqrt(err_a**2 + err_b**2 + 2 * cov_a_b) -#from stackoverflow +# from stackoverflow def argmedian(x, axis=None): """ Returns the index of the median element in the input array `x` along the specified axis. - + If `axis` is `None`, the function returns the index of the median element in the flattened array. Otherwise, it computes the argmedian along the specified axis and returns an array of indices. - + Args: x (numpy.ndarray): The input array. axis (int or None, optional): The axis along which to compute the argmedian. If `None`, the argmedian is computed on the flattened array. - + Returns: int or numpy.ndarray: The index or indices of the median element(s) in the input array. """ @@ -298,20 +389,17 @@ def argmedian(x, axis=None): else: # Compute argmedian along specified axis return np.apply_along_axis( - lambda x: np.argpartition(x, len(x) // 2)[len(x) // 2], - axis=axis, arr=x + lambda x: np.argpartition(x, len(x) // 2)[len(x) // 2], axis=axis, arr=x ) - - def pe2photons(x): """ Converts the input value `x` from photons to photoelectrons (PE) by multiplying it by 4. - + Args: x (float): The input value in photons. - + Returns: float: The input value converted to photoelectrons. """ @@ -321,26 +409,26 @@ def pe2photons(x): def photons2pe(x): """ Converts the input value `x` from photoelectrons (PE) to photons by dividing it by 4. - + Args: x (float): The input value in photoelectrons. - + Returns: float: The input value converted to photons. """ return x / 4 -#from federica's notebook -class ExponentialFitter(): +# from federica's notebook +class ExponentialFitter: """ Represents an exponential fitter class that computes the expected distribution and the minus 2 log likelihood for a given dataset and exponential parameters. - + Attributes: datas (numpy.ndarray): The input data array. bin_edges (numpy.ndarray): The bin edges for the data. - + Methods: compute_expected_distribution(norm, loc, scale): Computes the expected distribution given the normalization, location, and scale parameters. @@ -349,58 +437,63 @@ class ExponentialFitter(): compute_minus2loglike(x): Computes the minus 2 log likelihood given the parameters in `x`. """ - def __init__(self,datas,bin_edges): + + def __init__(self, datas, bin_edges): self.datas = datas.copy() self.bin_edges = bin_edges.copy() - def compute_expected_distribution(self,norm,loc,scale): - cdf_low = expon.cdf(self.bin_edges[:-1],loc=loc,scale=scale) - cdf_up = expon.cdf(self.bin_edges[1:],loc=loc,scale=scale) + def compute_expected_distribution(self, norm, loc, scale): + cdf_low = expon.cdf(self.bin_edges[:-1], loc=loc, scale=scale) + cdf_up = expon.cdf(self.bin_edges[1:], loc=loc, scale=scale) delta_cdf = cdf_up - cdf_low - return norm*delta_cdf + return norm * delta_cdf - def expected_distribution(self,x): - return self.compute_expected_distribution(x[0],x[1],x[2]) + def expected_distribution(self, x): + return self.compute_expected_distribution(x[0], x[1], x[2]) - def compute_minus2loglike(self,x): - norm = x[0] - loc = x[1] + def compute_minus2loglike(self, x): + norm = x[0] + loc = x[1] scale = x[2] - expected = self.compute_expected_distribution(norm,loc,scale) - chi2_mask = expected>0. - minus2loglike = -2.*np.sum( poisson.logpmf(self.datas[chi2_mask],mu=expected[chi2_mask]) ) - minus2loglike0 = -2.*np.sum( poisson.logpmf(self.datas[chi2_mask],mu=self.datas[chi2_mask]) ) -# print(f'{minus2loglike0 = }') + expected = self.compute_expected_distribution(norm, loc, scale) + chi2_mask = expected > 0.0 + minus2loglike = -2.0 * np.sum( + poisson.logpmf(self.datas[chi2_mask], mu=expected[chi2_mask]) + ) + minus2loglike0 = -2.0 * np.sum( + poisson.logpmf(self.datas[chi2_mask], mu=self.datas[chi2_mask]) + ) + # print(f'{minus2loglike0 = }') return minus2loglike - minus2loglike0 - + def pois(x, A, R): """ Computes the expected distribution for a Poisson process with rate parameter `R`. - + Args: x (float): The input value. A (float): The amplitude parameter. R (float): The rate parameter. - + Returns: float: The expected distribution for the Poisson process. """ - '''poisson function, parameter r (rate) is the fit parameter''' - return A*np.exp(x*R) + """poisson function, parameter r (rate) is the fit parameter""" + return A * np.exp(x * R) -def deadtime_and_expo_fit(time_tot, deadtime_us,run, output_plot = None): +def deadtime_and_expo_fit(time_tot, deadtime_us, run, output_plot=None): """ Computes the deadtime and exponential fit parameters for a given dataset. - + Args: time_tot (float): The total time of the dataset. deadtime_us (float): The deadtime of the dataset in microseconds. run (int): The run number. output_plot (str, optional): The path to save the output plot. - + Returns: tuple: A tuple containing the following values: - deadtime (float): The deadtime of the dataset. @@ -415,86 +508,118 @@ def deadtime_and_expo_fit(time_tot, deadtime_us,run, output_plot = None): - first_bin_length (float): The length of the first bin. - tot_nr_events_histo (int): The total number of events in the histogram. """ - #function by federica + # function by federica total_delta_t_for_busy_time = time_tot data = deadtime_us pucts = np.histogram(data, bins=100, range=(1e-2, 50)) - - deadtime = (min(data[~np.isnan(data)])) - + + deadtime = min(data[~np.isnan(data)]) + first_nonemptybin = np.where(pucts[0] > 0)[0][0] - deadtime_err = (pucts[1][first_nonemptybin] - pucts[1][first_nonemptybin - 1]) /\ - np.sqrt(pucts[0][first_nonemptybin]) - deadtime_bin = (pucts[1][first_nonemptybin]) - deadtime_bin_length = (pucts[1][first_nonemptybin] - pucts[1][first_nonemptybin - 1]) - + deadtime_err = ( + pucts[1][first_nonemptybin] - pucts[1][first_nonemptybin - 1] + ) / np.sqrt(pucts[0][first_nonemptybin]) + deadtime_bin = pucts[1][first_nonemptybin] + deadtime_bin_length = pucts[1][first_nonemptybin] - pucts[1][first_nonemptybin - 1] + # the bins should be of integer width, because poisson is an integer distribution bins = np.arange(100000) - 0.5 - entries, bin_edges = np.histogram(data, bins=bins, range=[0, total_delta_t_for_busy_time]) + entries, bin_edges = np.histogram( + data, bins=bins, range=[0, total_delta_t_for_busy_time] + ) bin_middles = 0.5 * (bin_edges[1:] + bin_edges[:-1]) - first_bin_length = (bin_edges[1] - bin_edges[0]) - tot_nr_events_histo = (np.sum(entries)) + first_bin_length = bin_edges[1] - bin_edges[0] + tot_nr_events_histo = np.sum(entries) # second fit model = Model(pois) params = model.make_params(A=2e3, R=-0.001) result = model.fit(entries, params, x=bin_middles) - #print('**** FIT RESULTS RUN {} ****'.format(run)) - #print(result.fit_report()) - #print('chisqr = {}, redchi = {}'.format(result.chisqr, result.redchi)) + # print('**** FIT RESULTS RUN {} ****'.format(run)) + # print(result.fit_report()) + # print('chisqr = {}, redchi = {}'.format(result.chisqr, result.redchi)) - parameter_A_new = (result.params['A'].value) - parameter_R_new = (-1 * result.params['R'].value) - parameter_A_err_new = (result.params['A'].stderr) - parameter_R_err_new = (result.params['R'].stderr) + parameter_A_new = result.params["A"].value + parameter_R_new = -1 * result.params["R"].value + parameter_A_err_new = result.params["A"].stderr + parameter_R_err_new = result.params["R"].stderr plt.close() if output_plot: - fig, ax = plt.subplots(1, 1, figsize=(10, 10 / 1.61)) - plt.hist(data, bins= np.arange(1000) - 0.5, - alpha=0.8, - histtype='step', density=0, lw=2, label='Run {}'.format(run)) + plt.hist( + data, + bins=np.arange(1000) - 0.5, + alpha=0.8, + histtype="step", + density=0, + lw=2, + label="Run {}".format(run), + ) # # plot poisson-deviation with fitted parameter x_plot = np.arange(0, 1000) - plt.plot( x_plot, - pois(x_plot, result.params['A'].value, result.params['R'].value), - marker='', linestyle='-', lw=3, color='C3', alpha=0.85, - label=r'Fit result: %2.3f $\exp^{-{x}/({%2.0f}~\mu\mathrm{s})}$' % ( - result.params['A'].value, abs(1 / result.params['R'].value)), + pois(x_plot, result.params["A"].value, result.params["R"].value), + marker="", + linestyle="-", + lw=3, + color="C3", + alpha=0.85, + label=r"Fit result: %2.3f $\exp^{-{x}/({%2.0f}~\mu\mathrm{s})}$" + % (result.params["A"].value, abs(1 / result.params["R"].value)), ) - R = ((-1 * result.params['R'].value) * (1 / u.us)).to(u.kHz).value - - - R_stderr = ((result.params['R'].stderr) * (1 / u.us)).to(u.kHz).value - - ax.text(600, entries[1] / 1 , '$y = A \cdot \exp({-R \cdot x})$\n' - # + r'$A=%2.2f \pm %2.2f$'%(as_si((result.params['A'].value/1000)*1e3,2), as_si((result.params['A'].stderr/1000)*1e3,2)) - + r'$A=%2.2f \pm %2.2f$' % (result.params['A'].value, result.params['A'].stderr) - + '\n' + r'$R=(%2.2f \pm %2.2f)$ kHz' % (R, R_stderr) - + '\n' + r'$\chi^2_\nu = %2.2f$' % ((result.redchi)) - + '\n' + r'$\delta_{\mathrm{deadtime}} = %2.3f \, \mu$s' % (deadtime), - backgroundcolor='white', bbox=dict(facecolor='white', edgecolor='C3', lw=2, - boxstyle='round,pad=0.3'), - ha='left', va='top', fontsize=17, color='k', alpha=0.9) - - - plt.xlabel(r'$\Delta$T [$\mu$s]') - plt.ylabel('Entries') - plt.title('Run {}'.format(run), y=1.02) - plt.savefig(os.path.join(output_plot,"deadtime_and_expo_fit_{}.png".format(run))) - + R = ((-1 * result.params["R"].value) * (1 / u.us)).to(u.kHz).value + + R_stderr = ((result.params["R"].stderr) * (1 / u.us)).to(u.kHz).value + + ax.text( + 600, + entries[1] / 1, + "$y = A \cdot \exp({-R \cdot x})$\n" + # + r'$A=%2.2f \pm %2.2f$'%(as_si((result.params['A'].value/1000)*1e3,2), as_si((result.params['A'].stderr/1000)*1e3,2)) + + r"$A=%2.2f \pm %2.2f$" + % (result.params["A"].value, result.params["A"].stderr) + + "\n" + + r"$R=(%2.2f \pm %2.2f)$ kHz" % (R, R_stderr) + + "\n" + + r"$\chi^2_\nu = %2.2f$" % ((result.redchi)) + + "\n" + + r"$\delta_{\mathrm{deadtime}} = %2.3f \, \mu$s" % (deadtime), + backgroundcolor="white", + bbox=dict( + facecolor="white", edgecolor="C3", lw=2, boxstyle="round,pad=0.3" + ), + ha="left", + va="top", + fontsize=17, + color="k", + alpha=0.9, + ) - return deadtime, deadtime_bin, deadtime_err, deadtime_bin_length, \ - total_delta_t_for_busy_time, parameter_A_new, parameter_R_new, \ - parameter_A_err_new, parameter_R_err_new, \ - first_bin_length, tot_nr_events_histo + plt.xlabel(r"$\Delta$T [$\mu$s]") + plt.ylabel("Entries") + plt.title("Run {}".format(run), y=1.02) + plt.savefig( + os.path.join(output_plot, "deadtime_and_expo_fit_{}.png".format(run)) + ) + return ( + deadtime, + deadtime_bin, + deadtime_err, + deadtime_bin_length, + total_delta_t_for_busy_time, + parameter_A_new, + parameter_R_new, + parameter_A_err_new, + parameter_R_err_new, + first_bin_length, + tot_nr_events_histo, + )