diff --git a/contrib/analysis/collect_config_data.py b/contrib/analysis/collect_config_data.py index 6a7e612899..5041304dbb 100755 --- a/contrib/analysis/collect_config_data.py +++ b/contrib/analysis/collect_config_data.py @@ -321,6 +321,7 @@ def main(args, logger): database = dict() # stores config data for all the pbench runs for res in result_list: + record = res.get() if record: run_id = record["run_id"] @@ -361,7 +362,7 @@ def main(args, logger): print(df.nunique()) # Covert dataframe to a csv file - df.to_csv(r"config.csv", sep=";", mode="a") + df.to_csv(r"config.csv", sep=";", mode="w") return 0 diff --git a/contrib/analysis/merge.py b/contrib/analysis/merge.py index c9bd7c9949..62bfea5537 100755 --- a/contrib/analysis/merge.py +++ b/contrib/analysis/merge.py @@ -39,10 +39,10 @@ def main(args): thr_df = df[df["sample.measurement_type"] == "throughput"] # Covert dataframes to csv files - slat_df.to_csv(r"latency_slat.csv", sep=";", mode="a") - clat_df.to_csv(r"latency_clat.csv", sep=";", mode="a") - lat_df.to_csv(r"latency_lat.csv", sep=";", mode="a") - thr_df.to_csv(r"throughput_iops_sec.csv", sep=";", mode="a") + slat_df.to_csv(r"latency_slat.csv", sep=";", mode="w") + clat_df.to_csv(r"latency_clat.csv", sep=";", mode="w") + lat_df.to_csv(r"latency_lat.csv", sep=";", mode="w") + thr_df.to_csv(r"throughput_iops_sec.csv", sep=";", mode="w") return 0 diff --git a/contrib/analysis/merge_sos_and_perf_parallel.py b/contrib/analysis/merge_sos_and_perf_parallel.py index 421b42564c..142f57bc1f 100755 --- a/contrib/analysis/merge_sos_and_perf_parallel.py +++ b/contrib/analysis/merge_sos_and_perf_parallel.py @@ -55,6 +55,7 @@ def es_data_gen(es, index, doc_type): index=index, doc_type=doc_type, scroll="1d", + request_timeout=3600, # to prevent timeout errors (3600 is arbitrary) ): yield doc @@ -177,6 +178,7 @@ def extract_clients(results_meta, es): index=run_index, doc_type="pbench-run-toc-entry", scroll="1m", + request_timeout=3600, # to prevent timeout errors (3600 is arbitrary) ): src = doc["_source"] if src["parent"] == parent_dir_name: @@ -329,45 +331,29 @@ def transform_result(source, pbench_runs, results_seen, stats): result["controller_dir"] = pbench_run["controller_dir"] result["sosreports"] = pbench_run["sosreports"] - # optional workload parameters - try: - result["benchmark.filename"] = ", ".join( - set((benchmark["filename"].split(","))) - ) - except KeyError: - result["benchmark.filename"] = "/tmp/fio" - try: - result["benchmark.iodepth"] = benchmark["iodepth"] - except KeyError: - result["benchmark.iodepth"] = "32" - try: - result["benchmark.size"] = ", ".join(set((benchmark["size"].split(",")))) - except KeyError: - result["benchmark.size"] = "4096M" - try: - result["benchmark.numjobs"] = ", ".join(set((benchmark["numjobs"].split(",")))) - except KeyError: - result["benchmark.numjobs"] = "1" - try: - result["benchmark.ramp_time"] = benchmark["ramp_time"] - except KeyError: - result["benchmark.ramp_time"] = "none" - try: - result["benchmark.runtime"] = benchmark["runtime"] - except KeyError: - result["benchmark.runtime"] = "none" - try: - result["benchmark.sync"] = benchmark["sync"] - except KeyError: - result["benchmark.sync"] = "none" - try: - result["benchmark.time_based"] = benchmark["time_based"] - except KeyError: - result["benchmark.time_based"] = "none" + # optional workload parameters accounting for defaults if not found + + result["benchmark.filename"] = sentence_setify( + benchmark.get("filename", "/tmp/fio") + ) + result["benchmark.iodepth"] = benchmark.get("iodepth", "32") + result["benchmark.size"] = sentence_setify(benchmark.get("size", "4096M")) + result["benchmark.numjobs"] = sentence_setify(benchmark.get("numjobs", "1")) + result["benchmark.ramp_time"] = benchmark.get("ramp_time", "none") + result["benchmark.runtime"] = benchmark.get("runtime", "none") + result["benchmark.sync"] = benchmark.get("sync", "none") + result["benchmark.time_based"] = benchmark.get("time_based", "none") return result +def sentence_setify(sentence: str) -> str: + """Splits input by ", " gets rid of duplicates and rejoins unique + items into original format. Effectively removes duplicates in input. + """ + return ", ".join(set((sentence.split(", ")))) + + def process_results(es, now, session, incoming_url, pool, pbench_runs, stats): """Intermediate generator for handling the fetching of the client names, disk names, and host names. @@ -460,7 +446,9 @@ def main(args): ncpus = multiprocessing.cpu_count() - 1 if concurrency == 0 else concurrency pool = multiprocessing.Pool(ncpus) if ncpus != 1 else None - es = Elasticsearch([f"{es_host}:{es_port}"]) + es = Elasticsearch( + [f"{es_host}:{es_port}"], timeout=60 + ) # to prevent read timeout errors (60 is arbitrary) session = requests.Session() ua = session.headers["User-Agent"]