Skip to content

Commit

Permalink
Fix timeout errors and apply some enhancements
Browse files Browse the repository at this point in the history
 * Add `timeout` parameter to creation of `Elasticsearch` object with
   arbitrary value 60
 * Add `request_timeout` parameter to `elasticsearch1.helpers.scan`
   function invocation with arbitrary value 3600
 * transform_result function to a loop with 1 try-except block instead
 * Change mode from 'a' to 'w' for all "data frame to csv" output files
 * Remove try-except blocks in favor of `.get()` with default value
 * Refactor to avoid using loop
 * Fix up documentation

Co-authored-by: Rohit Mohnani <[email protected]>
  • Loading branch information
rmohnani and Rohit Mohnani authored Jul 3, 2022
1 parent e915c96 commit 1d2dd32
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 41 deletions.
3 changes: 2 additions & 1 deletion contrib/analysis/collect_config_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ def main(args, logger):
database = dict() # stores config data for all the pbench runs

for res in result_list:

record = res.get()
if record:
run_id = record["run_id"]
Expand Down Expand Up @@ -361,7 +362,7 @@ def main(args, logger):
print(df.nunique())

# Covert dataframe to a csv file
df.to_csv(r"config.csv", sep=";", mode="a")
df.to_csv(r"config.csv", sep=";", mode="w")

return 0

Expand Down
8 changes: 4 additions & 4 deletions contrib/analysis/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def main(args):
thr_df = df[df["sample.measurement_type"] == "throughput"]

# Covert dataframes to csv files
slat_df.to_csv(r"latency_slat.csv", sep=";", mode="a")
clat_df.to_csv(r"latency_clat.csv", sep=";", mode="a")
lat_df.to_csv(r"latency_lat.csv", sep=";", mode="a")
thr_df.to_csv(r"throughput_iops_sec.csv", sep=";", mode="a")
slat_df.to_csv(r"latency_slat.csv", sep=";", mode="w")
clat_df.to_csv(r"latency_clat.csv", sep=";", mode="w")
lat_df.to_csv(r"latency_lat.csv", sep=";", mode="w")
thr_df.to_csv(r"throughput_iops_sec.csv", sep=";", mode="w")

return 0

Expand Down
60 changes: 24 additions & 36 deletions contrib/analysis/merge_sos_and_perf_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def es_data_gen(es, index, doc_type):
index=index,
doc_type=doc_type,
scroll="1d",
request_timeout=3600, # to prevent timeout errors (3600 is arbitrary)
):
yield doc

Expand Down Expand Up @@ -177,6 +178,7 @@ def extract_clients(results_meta, es):
index=run_index,
doc_type="pbench-run-toc-entry",
scroll="1m",
request_timeout=3600, # to prevent timeout errors (3600 is arbitrary)
):
src = doc["_source"]
if src["parent"] == parent_dir_name:
Expand Down Expand Up @@ -329,45 +331,29 @@ def transform_result(source, pbench_runs, results_seen, stats):
result["controller_dir"] = pbench_run["controller_dir"]
result["sosreports"] = pbench_run["sosreports"]

# optional workload parameters
try:
result["benchmark.filename"] = ", ".join(
set((benchmark["filename"].split(",")))
)
except KeyError:
result["benchmark.filename"] = "/tmp/fio"
try:
result["benchmark.iodepth"] = benchmark["iodepth"]
except KeyError:
result["benchmark.iodepth"] = "32"
try:
result["benchmark.size"] = ", ".join(set((benchmark["size"].split(","))))
except KeyError:
result["benchmark.size"] = "4096M"
try:
result["benchmark.numjobs"] = ", ".join(set((benchmark["numjobs"].split(","))))
except KeyError:
result["benchmark.numjobs"] = "1"
try:
result["benchmark.ramp_time"] = benchmark["ramp_time"]
except KeyError:
result["benchmark.ramp_time"] = "none"
try:
result["benchmark.runtime"] = benchmark["runtime"]
except KeyError:
result["benchmark.runtime"] = "none"
try:
result["benchmark.sync"] = benchmark["sync"]
except KeyError:
result["benchmark.sync"] = "none"
try:
result["benchmark.time_based"] = benchmark["time_based"]
except KeyError:
result["benchmark.time_based"] = "none"
# optional workload parameters accounting for defaults if not found

result["benchmark.filename"] = sentence_setify(
benchmark.get("filename", "/tmp/fio")
)
result["benchmark.iodepth"] = benchmark.get("iodepth", "32")
result["benchmark.size"] = sentence_setify(benchmark.get("size", "4096M"))
result["benchmark.numjobs"] = sentence_setify(benchmark.get("numjobs", "1"))
result["benchmark.ramp_time"] = benchmark.get("ramp_time", "none")
result["benchmark.runtime"] = benchmark.get("runtime", "none")
result["benchmark.sync"] = benchmark.get("sync", "none")
result["benchmark.time_based"] = benchmark.get("time_based", "none")

return result


def sentence_setify(sentence: str) -> str:
"""Splits input by ", " gets rid of duplicates and rejoins unique
items into original format. Effectively removes duplicates in input.
"""
return ", ".join(set((sentence.split(", "))))


def process_results(es, now, session, incoming_url, pool, pbench_runs, stats):
"""Intermediate generator for handling the fetching of the client names, disk
names, and host names.
Expand Down Expand Up @@ -460,7 +446,9 @@ def main(args):
ncpus = multiprocessing.cpu_count() - 1 if concurrency == 0 else concurrency
pool = multiprocessing.Pool(ncpus) if ncpus != 1 else None

es = Elasticsearch([f"{es_host}:{es_port}"])
es = Elasticsearch(
[f"{es_host}:{es_port}"], timeout=60
) # to prevent read timeout errors (60 is arbitrary)

session = requests.Session()
ua = session.headers["User-Agent"]
Expand Down

0 comments on commit 1d2dd32

Please sign in to comment.