Skip to content

Commit

Permalink
add retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Jun 27, 2022
1 parent 46b1a89 commit a1a5366
Showing 1 changed file with 60 additions and 56 deletions.
116 changes: 60 additions & 56 deletions dataGeneration/generate-cosine-data-multi-entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,62 +152,66 @@ def post_log_stream(index_value, time_intervals, sample_per_interval, max_number
j = (int)(min_number / service_number)
index = j * service_number - 1

try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries = 0
while keep_loop and retries < 10 and j < host_number:
try:
while keep_loop and j < host_number:
host_str = host_prefix + str(j)
for l in range(service_number):
process_str = process_prefix + str(l)
index += 1
# index can be [min_number, max_number]
if index < min_number:
continue
if index > max_number:
keep_loop = False
break
nextTs = startTs
prb = Random()
prb.seed(random.randint(0, 100000000))
cosine_p = cosine_params[index]
data_index = 0
for i in range(0, time_intervals):
ts = nextTs.strftime(dtFormat)
for k in range(0, sample_per_interval):
data = generate_val(cosine_p[1], cosine_p[0], 2, data_index,
50, 5, prb)
bulk_payload.append(
{
index_name: index_value,
"_source":
{
timestamp_name: ts,
cpu_name: data[0],
mem_name: data[1],
host_name: host_str,
process_name: process_str
}
}
)
count += 1
data_index += 1
if count >= batch_size:
post_log(bulk_payload, thread_index)
bulk_payload = list() # reset list
totalCount += count
count = 0
# increment by ingestion_frequency (in seconds) after looping through each host multiple samples
nextTs += datetime.timedelta(seconds=INGESTION_FREQUENCY)
if totalCount - lastTotalCount > 1000000:
# report progress every 1 million inserts
print("totalCount {} thread_index {}".format(totalCount,
thread_index))
lastTotalCount = totalCount
j += 1

if len(bulk_payload) > 0:
post_log(bulk_payload, thread_index)
bulk_payload = list()
except Error as err:
print("error: {0}".format(err))
retries += 1
client[thread_index] = create_client(SECURITY, URL)

def split(a, n):
k, m = divmod(len(a), n)
Expand Down

0 comments on commit a1a5366

Please sign in to comment.