From 868db97cce615c4755498bff2bf6bfe5eb466050 Mon Sep 17 00:00:00 2001 From: Amit Galitzky Date: Mon, 27 Jun 2022 13:59:36 -0700 Subject: [PATCH] Update ingestion (#592) Signed-off-by: Amit Galitzky (cherry picked from commit 0bd6e0e6aa933e6cd6ada8dfe88762b6e0db441a) --- dataGeneration/README.md | 12 +- .../generate-cosine-data-multi-entity.py | 160 +++++++++--------- dataGeneration/requirements.txt | 6 +- 3 files changed, 94 insertions(+), 84 deletions(-) diff --git a/dataGeneration/README.md b/dataGeneration/README.md index 51ad9a674..ecefb0b78 100644 --- a/dataGeneration/README.md +++ b/dataGeneration/README.md @@ -8,7 +8,7 @@ The following code in this directory can be used to easily ingest data into an O ### Python -Python 3.7 or above is required +Python 3.8 or above is required ### pip @@ -22,7 +22,11 @@ pip install -r requirements.txt ### Quick Start -In order to execute the script you must have a running OpenSearch cluster so you can supply an endpoint for the data to be ingested too. The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed throughout. The dataset created will have two categorical fields to test a multi-entity AD (of type `keyword`) and two fields that can act as the two features fields (cpuTime and jvmGcTime). These two fields are of type `double`. +In order to execute the script you must have a running OpenSearch cluster, so you can supply an endpoint for the data to be ingested too. + +The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed. + +The dataset created will have two categorical fields to test a multi-entity AD (`host` and `process` of type `keyword`) and two fields that can act as the two features fields (`cpuTime` and `jvmGcTime` of type `double`). ### Example Request: @@ -43,8 +47,8 @@ In order to execute the script you must have a running OpenSearch cluster so you | --bulk-size | Number of documents per bulk request | 3000 | No | --ingestion-frequency | How often each respective document is indexed (in seconds) | 600 | No | --points | Total number of points in time ingested | 1008 | No -| --number-of-host | number of 'host' entities | 1000 | No -| --number-of-process | number of 'process' entities | 1000 | No +| --number-of-host | number of 'host' entities (host is one of the categorical field that an entity is defined by) | 1000 | No +| --number-of-process | number of 'process' entities (process is one of the categorical field that an entity is defined by)| 1000 | No | --number-of-historical-days | number of day of historical data to ingest | 2 | No | --username | username for authentication if security is true | admin | No | --password | password for authentication if security is true | admin | No diff --git a/dataGeneration/generate-cosine-data-multi-entity.py b/dataGeneration/generate-cosine-data-multi-entity.py index 8fc5bb888..c849a9d8b 100644 --- a/dataGeneration/generate-cosine-data-multi-entity.py +++ b/dataGeneration/generate-cosine-data-multi-entity.py @@ -78,10 +78,10 @@ ''' Generate index INDEX_NAME ''' -def create_index(es, INDEX_NAME, shard_number): +def create_index(os, INDEX_NAME, shard_number): # First, delete the index if it exists print("Deleting index if it exists...") - es.indices.delete(index=INDEX_NAME, ignore=[400, 404]) + os.indices.delete(index=INDEX_NAME, ignore=[400, 404]) # Next, create the index print("Creating index \"{}\"...".format(INDEX_NAME)) @@ -112,9 +112,7 @@ def create_index(es, INDEX_NAME, shard_number): } } } - - es.indices.create(index=INDEX_NAME, body=request_body) - + os.indices.create(index=INDEX_NAME, body=request_body) ''' Posts a document(s) to the index @@ -154,62 +152,66 @@ def post_log_stream(index_value, time_intervals, sample_per_interval, max_number j = (int)(min_number / service_number) index = j * service_number - 1 - try: - while keep_loop and j < host_number: - host_str = host_prefix + str(j) - for l in range(service_number): - process_str = process_prefix + str(l) - index += 1 - # index can be [min_number, max_number] - if index < min_number: - continue - if index > max_number: - keep_loop = False - break - nextTs = startTs - prb = Random() - prb.seed(random.randint(0, 100000000)) - cosine_p = cosine_params[index] - data_index = 0 - for i in range(0, time_intervals): - ts = nextTs.strftime(dtFormat) - for k in range(0, sample_per_interval): - data = generate_val(cosine_p[1], cosine_p[0], 2, data_index, - 50, 5, prb) - bulk_payload.append( - { - index_name: index_value, - "_source": - { - timestamp_name: ts, - cpu_name: data[0], - mem_name: data[1], - host_name: host_str, - process_name: process_str - } - } - ) - count += 1 - data_index += 1 - if count >= batch_size: - post_log(bulk_payload, thread_index) - bulk_payload = list() # reset list - totalCount += count - count = 0 - # increment by ingestion_frequency (in seconds) after looping through each host multiple samples - nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY) - if totalCount - lastTotalCount > 1000000: - # report progress every 1 million inserts - print("totalCount {} thread_index {}".format(totalCount, - thread_index)) - lastTotalCount = totalCount - j += 1 - - if len(bulk_payload) > 0: - post_log(bulk_payload, thread_index) - bulk_payload = list() - except Error as err: - print("error: {0}".format(err)) + retries = 0 + while keep_loop and retries < 10 and j < host_number: + try: + while keep_loop and j < host_number: + host_str = host_prefix + str(j) + for l in range(service_number): + process_str = process_prefix + str(l) + index += 1 + # index can be [min_number, max_number] + if index < min_number: + continue + if index > max_number: + keep_loop = False + break + nextTs = startTs + prb = Random() + prb.seed(random.randint(0, 100000000)) + cosine_p = cosine_params[index] + data_index = 0 + for i in range(0, time_intervals): + ts = nextTs.strftime(dtFormat) + for k in range(0, sample_per_interval): + data = generate_val(cosine_p[1], cosine_p[0], 2, data_index, + 50, 5, prb) + bulk_payload.append( + { + index_name: index_value, + "_source": + { + timestamp_name: ts, + cpu_name: data[0], + mem_name: data[1], + host_name: host_str, + process_name: process_str + } + } + ) + count += 1 + data_index += 1 + if count >= batch_size: + post_log(bulk_payload, thread_index) + bulk_payload = list() # reset list + totalCount += count + count = 0 + # increment by ingestion_frequency (in seconds) after looping through each host multiple samples + nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY) + if totalCount - lastTotalCount > 1000000: + # report progress every 1 million inserts + print("totalCount {} thread_index {}".format(totalCount, + thread_index)) + lastTotalCount = totalCount + j += 1 + + if len(bulk_payload) > 0: + post_log(bulk_payload, thread_index) + bulk_payload = list() + except Error as err: + print("error: {0}".format(err)) + retries += 1 + client[thread_index] = create_client(SECURITY, URL) def split(a, n): k, m = divmod(len(a), n) @@ -228,23 +230,20 @@ def create_cosine(total_entities, base_dimension, period, amplitude): return cosine_param ''' - Main entry method for script +Create OpenSearch client ''' -def main(): - global client - if SECURITY and URL.strip() == 'localhost': - for i in range(0, THREADS): - client.append(OpenSearch( +def create_client(security, URL): + if security and URL.strip() == 'localhost': + return OpenSearch( hosts=[URL], use_ssl=True, verify_certs=False, http_auth=(USERNAME, PASSWORD), scheme="https", connection_class=RequestsHttpConnection - )) - elif SECURITY: - for i in range(0, THREADS): - client.append(OpenSearch( + ) + elif security: + return OpenSearch( hosts=[{'host': URL, 'port': 443}], use_ssl=True, verify_certs=False, @@ -252,22 +251,30 @@ def main(): scheme="https", port=443, connection_class=RequestsHttpConnection - )) + ) elif URL.strip() == 'localhost': - for i in range(0, THREADS): - client.append(OpenSearch( + return OpenSearch( hosts=[{'host': URL, 'port': 9200}], use_ssl=False, verify_certs=False, connection_class=RequestsHttpConnection - )) + ) else: - es = OpenSearch( + return OpenSearch( hosts=[{'host': URL, 'port': 80}], use_ssl=False, verify_certs=False, connection_class=RequestsHttpConnection ) + +''' + Main entry method for script +''' +def main(): + global client + for i in range(0, THREADS): + client.append(create_client(SECURITY, URL)) + create_index(client[0], INDEX_NAME, SHARD_NUMBER) total_entities = HOST_NUMBER * PROCESS_NUMBER @@ -291,4 +298,3 @@ def main(): if __name__ == "__main__": main() - diff --git a/dataGeneration/requirements.txt b/dataGeneration/requirements.txt index 1e37764ef..5e900e629 100644 --- a/dataGeneration/requirements.txt +++ b/dataGeneration/requirements.txt @@ -1,5 +1,5 @@ -numpy==1.21.2 -opensearch_py==1.1.0 +numpy==1.23.0 +opensearch_py==2.0.0 retry==0.9.2 -scipy==1.7.1 +scipy==1.8.0 urllib3==1.26.9