Skip to content

Commit

Permalink
Update ingestion (#592)
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
(cherry picked from commit 0bd6e0e)
  • Loading branch information
amitgalitz committed Jun 27, 2022
1 parent 2b638bd commit 868db97
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 84 deletions.
12 changes: 8 additions & 4 deletions dataGeneration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ The following code in this directory can be used to easily ingest data into an O

### Python

Python 3.7 or above is required
Python 3.8 or above is required

### pip

Expand All @@ -22,7 +22,11 @@ pip install -r requirements.txt

### Quick Start

In order to execute the script you must have a running OpenSearch cluster so you can supply an endpoint for the data to be ingested too. The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed throughout. The dataset created will have two categorical fields to test a multi-entity AD (of type `keyword`) and two fields that can act as the two features fields (cpuTime and jvmGcTime). These two fields are of type `double`.
In order to execute the script you must have a running OpenSearch cluster, so you can supply an endpoint for the data to be ingested too.

The current iteration of this data script creates data in a cosine pattern with anomalies injected with a random seed.

The dataset created will have two categorical fields to test a multi-entity AD (`host` and `process` of type `keyword`) and two fields that can act as the two features fields (`cpuTime` and `jvmGcTime` of type `double`).

### Example Request:

Expand All @@ -43,8 +47,8 @@ In order to execute the script you must have a running OpenSearch cluster so you
| --bulk-size | Number of documents per bulk request | 3000 | No
| --ingestion-frequency | How often each respective document is indexed (in seconds) | 600 | No
| --points | Total number of points in time ingested | 1008 | No
| --number-of-host | number of 'host' entities | 1000 | No
| --number-of-process | number of 'process' entities | 1000 | No
| --number-of-host | number of 'host' entities (host is one of the categorical field that an entity is defined by) | 1000 | No
| --number-of-process | number of 'process' entities (process is one of the categorical field that an entity is defined by)| 1000 | No
| --number-of-historical-days | number of day of historical data to ingest | 2 | No
| --username | username for authentication if security is true | admin | No
| --password | password for authentication if security is true | admin | No
Expand Down
160 changes: 83 additions & 77 deletions dataGeneration/generate-cosine-data-multi-entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@
'''
Generate index INDEX_NAME
'''
def create_index(es, INDEX_NAME, shard_number):
def create_index(os, INDEX_NAME, shard_number):
# First, delete the index if it exists
print("Deleting index if it exists...")
es.indices.delete(index=INDEX_NAME, ignore=[400, 404])
os.indices.delete(index=INDEX_NAME, ignore=[400, 404])

# Next, create the index
print("Creating index \"{}\"...".format(INDEX_NAME))
Expand Down Expand Up @@ -112,9 +112,7 @@ def create_index(es, INDEX_NAME, shard_number):
}
}
}

es.indices.create(index=INDEX_NAME, body=request_body)

os.indices.create(index=INDEX_NAME, body=request_body)

'''
Posts a document(s) to the index
Expand Down Expand Up @@ -154,62 +152,66 @@ def post_log_stream(index_value, time_intervals, sample_per_interval, max_number
j = (int)(min_number / service_number)
index = j * service_number - 1

try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries = 0
while keep_loop and retries < 10 and j < host_number:
try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries += 1
client[thread_index] = create_client(SECURITY, URL)

def split(a, n):
k, m = divmod(len(a), n)
Expand All @@ -228,46 +230,51 @@ def create_cosine(total_entities, base_dimension, period, amplitude):
return cosine_param

'''
Main entry method for script
Create OpenSearch client
'''
def main():
global client
if SECURITY and URL.strip() == 'localhost':
for i in range(0, THREADS):
client.append(OpenSearch(
def create_client(security, URL):
if security and URL.strip() == 'localhost':
return OpenSearch(
hosts=[URL],
use_ssl=True,
verify_certs=False,
http_auth=(USERNAME, PASSWORD),
scheme="https",
connection_class=RequestsHttpConnection
))
elif SECURITY:
for i in range(0, THREADS):
client.append(OpenSearch(
)
elif security:
return OpenSearch(
hosts=[{'host': URL, 'port': 443}],
use_ssl=True,
verify_certs=False,
http_auth=(USERNAME, PASSWORD),
scheme="https",
port=443,
connection_class=RequestsHttpConnection
))
)
elif URL.strip() == 'localhost':
for i in range(0, THREADS):
client.append(OpenSearch(
return OpenSearch(
hosts=[{'host': URL, 'port': 9200}],
use_ssl=False,
verify_certs=False,
connection_class=RequestsHttpConnection
))
)
else:
es = OpenSearch(
return OpenSearch(
hosts=[{'host': URL, 'port': 80}],
use_ssl=False,
verify_certs=False,
connection_class=RequestsHttpConnection
)

'''
Main entry method for script
'''
def main():
global client
for i in range(0, THREADS):
client.append(create_client(SECURITY, URL))

create_index(client[0], INDEX_NAME, SHARD_NUMBER)

total_entities = HOST_NUMBER * PROCESS_NUMBER
Expand All @@ -291,4 +298,3 @@ def main():

if __name__ == "__main__":
main()

6 changes: 3 additions & 3 deletions dataGeneration/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
numpy==1.21.2
opensearch_py==1.1.0
numpy==1.23.0
opensearch_py==2.0.0
retry==0.9.2
scipy==1.7.1
scipy==1.8.0
urllib3==1.26.9

0 comments on commit 868db97

Please sign in to comment.