Skip to content

Commit

Permalink
Overhaul Data Aggregation Pipeline
Browse files Browse the repository at this point in the history
Replaces previous contributed data aggregation files with the following:

pbench_combined_data.py

This contains the PbenchCombinedData class which serves as a wrapper object for processing data sources given and storing it along with diagnostic information. Has methods for adding run, result, client name, and disk host data. These methods use the Filter objects provided that specify the processing.

This also contains the Filter abstract class that serves as a template for custom filters on any data sources. Filter class allows you to specify required and optional fields from JSON docs for run and result data. Enables the specification of a diagnostic method to run more complex validation checks on the data source. And a apply_filter method that filters down the data source to the required components to be returned.

This also contains the PbenchCombinedDataCollection class which serves as a wrapper class for multiple PbenchCombinedData objects. While the PCD class is used for 1 run and it’s associated result data, this collection class is used for the processing and storing of all data to be aggregated. It has methods that keep statistics of valid and invalid data, and implements multiprocessing techniques to process the data. It has a method for outputting the data in csv files. The method that gets called to perform the aggregation is called aggregate_data that takes in a list of months to aggregate data for. It processes run data first. Then stores base result data onto a queue. Multiple worker processes read off the queue and add run, diskhost and client name data to it and put it on another queue. Multiple worker processes then read from that queue and add sos data to it, and these complete data are stored and then outputted.

sos_collection.py

This contains the Sosreport and SosCollection classes that respectively wrap the Sosreport data processing given the sos tarball and processing for all the sosreports. The Sosreport class has methods for extracting the desired information out of the exploded tarball, and takes in the path to the tarball on the local system and does the exploding and extracting. Since the sosreports need to be downloaded from a remote server, the SosCollection class has methods for opening a ssh and sftp client to download the desired Sosreport and then perform the processing, given the data of a PbenchCombinedData object.

aggregate_data.py

This is the script that is called to execute the aggregation. It creates a PbenchCombinedDataCollection object, and calls its aggregate_data method on a default if the last 12 months and then it’s emit_csv method to output the data. The months can be changed with cli flags.

The command to run the processing is:

./aggregate_data.py elasticsearch.intlab.perf-infra.lab.eng.rdu2.redhat.com 10081 http://pbench.perf.lab.eng.bos.redhat.com intlab-011.ctrl.perf-infra.lab.eng.rdu2.redhat.com

This will run it with all available CPU’s for the multiprocessing and aggregate the last 12 months worth of data.

I recommend adding the —months_e flag that specifies how many months prior to now to end the aggregation. Add it with as many months so that it ends in 2021-12. Because from what I’ve noticed 2022 has all invalid data according to hifza’s checks and millions of records to go through, which cause the program to use a lot of memory doing nothing inevitably crashing because it’s out of memory.

NOTE: the processing here is slightly different from the one Hifza was using. The use of the ClientCount filter is the difference and dramatically reduces the set of ‘valid’ data. If you would like to have Hifza’s original processing, remove ClientCount from list for run from line 267 of pbench_combined_data.py. And comment line 961 of the same file.

----
The work was performed in a series of 21 commits, what follows are the
set of messages from the original commits.

1st commit message:
----

Use argparse for cli parsing & Add record limiting functionality

Refactor naive cli argument parsing to use python's argparse module.
Add cli option to limit number of records, which defaults to 10, and
modify code accordingly.
Note: With this implementation there is no way to
run for all records as of yet.

commit message #2:
----

Refactor PbenchRun class into PbenchCombinedData and PbenchCombinedDataCollection

Since ultimate goal is to store all data in one object type, renamed class
accordingly. Created a corresponding collection object that tracks various
statistics about the objects inserted into it.

Update PbenchCombinedDataCollection to separately store invalid records

Add an internal dictionary for storing invalid records, so that diagnostic
data stored on it isn't lost.

Add some diagnostic check classes for result-data, refactor structure of abstract DiagnosticCheck and inheriting concrete classes

Fixed DiagnosticCheck and concrete inheriting class' structures

Updata structure, so that tracker_names now retrievable from a
DiagnosticCheck object, without the need to pass in a doc to perform
the check on. That means we can pass the check classes into the Collection object,
and can get all the different issues we will track as objects are inserted, and can
initialize the trackers for all such issues.

Succesfully reimplements run data processing and adding as it was originally in new structure

Use check class instances instead of class names in check list

Since some checks require extra information to perform the check, apart from
the 'doc' passed in as input to the diagnostic method, decided to have those
checks take more args in their constructor, to preserve the ability to simply
loop over checks and perform them in the compact manner used so far.
This means that list of check classes must now be instances of those classes,
so that those checks that those checks that require args can be given them.

Add basic result data merging without disknames,hostnames,clientnames to new class structure

Added methods that now handle the error-checking and merging of 'pure' result data
from a result doc to the PbenchCOmbinedData and Collection version classes. Thus can now simply
call add_run and add_result methods on a collection Object and the error-checking,
stats tracking, merging all happen internally in the class objects.
Note: error-checking for result data needs to be done in the collection
class as compared to other checks which happen in the PbenchCombinedDataCollection
class, because need to know associated PCD object which has associated run ahead of time,
and the existence of such an object is part of the checks performed.

Reimplement ability to process all data for the past year & set limit default to all

Set default limit value to be -1 meaning all. Now default is all data processed and
to limit need to pass --limit flag in cli.

Implement host and disk name data collection

Add method in PbenchCombinedData class to add host and disk names for data collected.
This should only be calles after run and base result data already added. Calls this method
in collection's add_result method after result data added, to ensure correct order of
insertion. Associated check classes implemented and performed accordingly.

Implement client name adding in new class structure

Add method in PbenchCombinedData class to add client name data
to object. Needs to be performed after run and base result data
already added. Call this method in the collection's add_result
method, after base result and disk and host names added. Associated check
classes implemented and performed accordingly.

Add Documentation and deletes Old code

Adds documentation for all new classes created: PbenchCombinedData,
PbenchCombinedDataCollection, DiagnosticCheck and concrete inheritors of
DiagnosticCheck. Also adds documentation for remaining functions in
merge_sos_and_perf_parallel.py. All documentation adheres to the
numpy/scipy style. All old code functionality reimplemented under new
structure so deleted.

commit message #3:
----

Internalize run and result data collection for given month.

Make merging and collecting of run and result data for a given month
the collect_data method in the PbenchCombinedDataCollection class.
Make the es_data_gen function also a method in this class. Rename
merge_sos_and_perf_parallel to aggregate_data. aggregate_data now
only gives the collection object a month to collect data on and all
processing happens inside the Class object.

commit message distributed-system-analysis#4:
----

Fix for comments by Peter on aggregate_data.py

Change _month_gen function signature, definition and docs to reflect
2 params. One for end_time and start_months_prior duration specifying
how many months prior to start data collection from. Update function
call in main appropriately.

Add type hints and documentation to main function. Remove comments
regarding multiprocessing. Fix user agent string to reflect filename
by using argparse.

Add optional arument for argparse to specify months_prior to use for
_month_gen, have default be 12 months.

Fix program name retrieval in main

CHange parse_arguments to return ArgumentParser Object instead of
Namespace, and create Namespace args in main function, because
prog name only accessible from parser.

add check to main to ensure record limiting works

Add print_report and emit_csv methods to Collection Object

Adds print_report method to print tracker information, instead of
using print on object. Adds emit_csv method, which writes important
data collected to csv files in a subdirectory. Writes valid, invalid,
trackers, diskhost, client data to separate csv files.

make csv folder using current working directory instead of hardcoding it

Finish Fixing comments by peter, add some comments to code

commit message distributed-system-analysis#5:
----

Implement multiprocessing capability

Uses pathos because dill can better serialize objects. Creates
a ProcessPool and stores it in an attribute of collection object.
Makes Collection object take in no of cpus to use in initialization,
so pool can be initialized. Adds methods to add one month asynchornously,
or a list of months. Add methods to merge dictionaries in a special way,
and combine another Collection object's data into its own. This because
self is returned from the worker processes.

commit message distributed-system-analysis#6:
----

fix jenkins errors

commit message distributed-system-analysis#7:
----

Fix 1 multiprocessing issue

CHange default cpu count to 0, which means all but 1 cpus of the system.
Change ProcessPool initialization to work when cpus to use is 1.

commit message distributed-system-analysis#8:
----

start fio result hostname refactor

commit message distributed-system-analysis#9:
----

Verify aggregation behavior

commit message distributed-system-analysis#10:
----

Implement ClientCount check

This should replace the ClientHostAggregteResultCheck and
the last check of SosreportCheck where it checks if the
sos reports have different hostnames. invalid run ids are those
that have more than 2 measurement_idx values for any measurement_title.
It also checks for run ids not in any result data (ie with no corresponding
results and thus invalid)

commit message distributed-system-analysis#11:
----

Reverse order of month string generation in _month_gen

YYYY-MM strings now generated starting with the most recent month
and going backwards and ending the start_months_prior number of months
before.

commit message distributed-system-analysis#12:
----

Change * use in result index generation to a loop

commit message distributed-system-analysis#13:
----

Use asyncio module to replace pathos

Use async function calls to process each month, as opposed to
using multiple processes. This to see if record limiting can
work under this method instead.

commit message distributed-system-analysis#14:
----

Table Multiprocessing/asyncio on month processing

Tables the use of multiprocessing/asyncio to speedup
processing of months by doing it in parallel for later,
since running into a myriad of issues.

commit message distributed-system-analysis#15:
----

Test ssh and sftp client & Begin refactoring of sos collection

commit message distributed-system-analysis#16:
----

Finish Sosreport collection refactor - no changes to processing logic

commit message distributed-system-analysis#17:
----

print timings

commit message distributed-system-analysis#18:
----

Modify Check class into Filter class

Filter class implements improved handling of custom required and
optional field/property specifications from run and result docs.
Factors out filtering logic, making it easily changeable.

commit message distributed-system-analysis#19:
----

Add documentation and wrap up changes

commit message distributed-system-analysis#20:
----

Finish documentation and changes

commit message distributed-system-analysis#21:
----

Address flake8 errors and warnings
  • Loading branch information
Rohit Mohnani authored and portante committed Aug 25, 2022
1 parent 5de49f2 commit 0c18972
Show file tree
Hide file tree
Showing 8 changed files with 2,355 additions and 1,004 deletions.
195 changes: 195 additions & 0 deletions contrib/analysis/aggregate_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
#!/usr/bin/env python3

import argparse
import requests
import time

from datetime import datetime
from dateutil import rrule
from dateutil.relativedelta import relativedelta
from pbench_combined_data import PbenchCombinedDataCollection
from elasticsearch1 import Elasticsearch


def _year_month_gen(
end_time: datetime, start_months_prior: int, end_months_prior: int
) -> str:
"""Generate YYYY-MM stings for months specified.
For all months inclusive, generate YYYY-MM strings starting at the
month of the end_time, and ending start_months_prior before the end_time.
Parameters
----------
end_time : datetime
The time for the last month desired
start_months_prior : int
Number of months before the end_time to end
string generation.
Yields
------
month_str : str
month string in the YYYY-MM format
"""
start = end_time - relativedelta(months=start_months_prior)
first_month = start.replace(day=1)
last_month = (
end_time + relativedelta(day=31) - relativedelta(months=end_months_prior)
)
reverse_months = sorted(
rrule.rrule(rrule.MONTHLY, dtstart=first_month, until=last_month), reverse=True
)
for date in reverse_months:
yield f"{date.year:04}-{date.month:02}"


def main(parser: argparse.ArgumentParser) -> None:
"""Given cli args, sets up Elasticsearch and aggregates all data.
Creates Elasticsearch instance and PbenchCombinedDataCollection
object, which it then adds all the data to for the months generated
from _month_gen given the args passed in.
Parameters
----------
parser : argparse.ArgumentParser
arguments passed in stored with easily accessible
variable names, can be accessed by calling parse_args on it.
Returns
-------
None
"""
args = parser.parse_args()

if args.profile_memory_usage:
from guppy import hpy

memprof = hpy()
else:
memprof = None

if memprof:
print(f"Initial memory profile ... {memprof.heap()}", flush=True)

es = Elasticsearch(
[f"{args.es_host}:{args.es_port}"], timeout=200
) # to prevent read timeout errors (60 is arbitrary)

session = requests.Session()
ua = session.headers["User-Agent"]
session.headers.update({"User-Agent": f"{ua} -- {parser.prog}"})
pbench_data = PbenchCombinedDataCollection(
args.url_prefix,
args.sos_host_server,
session,
es,
args.record_limit,
args.cpu_n,
)

scan_start = time.time()
end_time = datetime.utcfromtimestamp(scan_start)

# pbench_data.sync_add_months(
# _year_month_gen(end_time, args.start_months_prior, args.end_months_prior)
# )

pbench_data.aggregate_data(
_year_month_gen(end_time, args.start_months_prior, args.end_months_prior)
)

scan_end = time.time()
duration = scan_end - scan_start

pbench_data.print_report()
pbench_data.emit_csv()
print(f"--- merging run and result data took {duration:0.2f} seconds", flush=True)

if memprof:
print(
f"Final memory profile ... {memprof.heap()}",
flush=True,
)


def parse_arguments() -> argparse.ArgumentParser:
"""Specifies Command Line argument parsing.
Gives help info when running this file as to what arguments needed, etc.
Adds optional flags to change execution of code.
Returns
-------
parser : argparse.ArgumentParser
arguments passed in stored with easily accessible
variable names, can be accessed by calling parse_args on it.
"""
parser = argparse.ArgumentParser(description="Host and Server Information")
parser.add_argument(
"es_host", action="store", type=str, help="Elasticsearch host name"
)
parser.add_argument(
"es_port", action="store", type=int, help="Elasticsearch port number"
)
parser.add_argument(
"url_prefix",
action="store",
type=str,
help="Pbench server url prefix to extract host and disk names",
)
parser.add_argument(
"sos_host_server",
action="store",
type=str,
help="Sosreport host server to access sosreport info",
)
parser.add_argument(
"--cpu",
action="store",
dest="cpu_n",
type=int,
default=0,
help="Number of CPUs to be used",
)
parser.add_argument(
"--limit",
action="store",
dest="record_limit",
type=int,
default=-1,
help="Number of desired acceptable results for processing",
) # Temporarily used -1 as default and it meaning all
parser.add_argument(
"--profile",
action="store_true",
dest="profile_memory_usage",
help="Want memory usage profile",
)
parser.add_argument(
"--months_s",
action="store",
dest="start_months_prior",
type=int,
default=12, # default to 12 months worth of data
help="Number of months prior to now at which to start data collection",
)
parser.add_argument(
"--months_e",
action="store",
dest="end_months_prior",
type=int,
default=0, # setting so have usable data for testing
help="Number of months prior to now at which to end data collection",
)
return parser


# point of entry
if __name__ == "__main__":
args = parse_arguments()
main(args)
Loading

0 comments on commit 0c18972

Please sign in to comment.