Skip to content

Commit

Permalink
changes to test load test locally
Browse files Browse the repository at this point in the history
Signed-off-by: Wesley Pettit <[email protected]>
  • Loading branch information
PettitWesley committed Jun 13, 2023
1 parent 2cb10d8 commit d5fd7c1
Showing 1 changed file with 111 additions and 102 deletions.
213 changes: 111 additions & 102 deletions load_tests/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def generate_task_definition(session, throughput, input_logger, s3_fluent_config

# Task Environment Variables
'$TASK_ROLE_ARN': os.environ['LOAD_TEST_TASK_ROLE_ARN'],
'$TASK_EXECUTION_ROLE_ARN': os.environ['LOAD_TEST_TASK_EXECUTION_ROLE_ARN'],
'$TASK_EXECUTION_ROLE_ARN': os.environ['TASK_EXECUTION_ROLE_ARN'],
'$CUSTOM_S3_OBJECT_NAME': resource_resolver.resolve_s3_object_name(custom_config),

# Plugin Specific Environment Variables
Expand Down Expand Up @@ -142,44 +142,45 @@ def generate_task_definition(session, throughput, input_logger, s3_fluent_config
if IS_TASK_DEFINITION_PRINTED:
print("Registering task definition:")
print(json.dumps(task_def, indent=4))
session.client('ecs').register_task_definition(
**task_def
)
# session.client('ecs').register_task_definition(
# **task_def
# )
else:
print("Registering task definition")

# With multiple codebuild projects running parallel,
# Testing resources only needs to be created once
def create_testing_resources():
session = get_sts_boto_session()

if OUTPUT_PLUGIN != 'cloudwatch':
client = session.client('cloudformation')
waiter = client.get_waiter('stack_exists')
waiter.wait(
StackName=TESTING_RESOURCES_STACK_NAME,
WaiterConfig={
'MaxAttempts': 60
}
)
waiter = client.get_waiter('stack_create_complete')
waiter.wait(
StackName=TESTING_RESOURCES_STACK_NAME
)
else:
# scale up eks cluster
if PLATFORM == 'eks':
os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng')
while True:
time.sleep(90)
number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l")
if(int(number_of_nodes) == NUM_OF_EKS_NODES):
break
# create namespace
os.system('kubectl apply -f ./load_tests/create_testing_resources/eks/namespace.yaml')
# Once deployment starts, it will wait until the stack creation is completed
os.chdir(f'./load_tests/{sys.argv[1]}/{PLATFORM}')
os.system('cdk deploy --require-approval never')
print('create_testing_resources')
# session = get_sts_boto_session()

# if OUTPUT_PLUGIN != 'cloudwatch':
# client = session.client('cloudformation')
# waiter = client.get_waiter('stack_exists')
# waiter.wait(
# StackName=TESTING_RESOURCES_STACK_NAME,
# WaiterConfig={
# 'MaxAttempts': 60
# }
# )
# waiter = client.get_waiter('stack_create_complete')
# waiter.wait(
# StackName=TESTING_RESOURCES_STACK_NAME
# )
# else:
# # scale up eks cluster
# if PLATFORM == 'eks':
# os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng')
# while True:
# time.sleep(90)
# number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l")
# if(int(number_of_nodes) == NUM_OF_EKS_NODES):
# break
# # create namespace
# os.system('kubectl apply -f ./load_tests/create_testing_resources/eks/namespace.yaml')
# # Once deployment starts, it will wait until the stack creation is completed
# os.chdir(f'./load_tests/{sys.argv[1]}/{PLATFORM}')
# os.system('cdk deploy --require-approval never')

# For tests on ECS, we need to:
# 1. generate and register task definitions based on templates at /load_tests/task_definitons
Expand All @@ -195,7 +196,7 @@ def run_ecs_tests():
for input_logger in INPUT_LOGGERS:
session = get_sts_boto_session()

client = session.client('ecs')
#client = session.client('ecs')

# Delete corresponding testing data for a fresh start
delete_testing_data(session)
Expand All @@ -207,12 +208,12 @@ def run_ecs_tests():
for throughput in THROUGHPUT_LIST:
os.environ['THROUGHPUT'] = throughput
generate_task_definition(session, throughput, input_logger, s3_fluent_config_arn)
response = client.run_task(
print('''response = client.run_task(
cluster=ecs_cluster_name,
launchType='EC2',
taskDefinition=f'{PREFIX}{OUTPUT_PLUGIN}-{throughput}-{input_logger["name"]}'
)
names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn'] = response['tasks'][0]['taskArn']
)''')
names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn'] = 'fake-task-arn'

# Validation input type banner
print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...')
Expand All @@ -224,32 +225,35 @@ def run_ecs_tests():

for throughput in THROUGHPUT_LIST:
session = get_sts_boto_session()
client = session.client('ecs')
waiter = client.get_waiter('tasks_stopped')
# client = session.client('ecs')
#waiter = client.get_waiter('tasks_stopped')
task_arn = names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn']
waiter.wait(
print('''waiter.wait(
cluster=ecs_cluster_name,
tasks=[
task_arn,
],
WaiterConfig={
'MaxAttempts': 600
}
)
response = client.describe_tasks(
)''')
print('''response = client.describe_tasks(
cluster=ecs_cluster_name,
tasks=[
task_arn,
]
)
)''')
print(f'task_arn={task_arn}')
print(f'response={response}')
check_app_exit_code(response)
# print(f'response={response}')
#check_app_exit_code(response)
input_record = calculate_total_input_number(throughput)
start_time = response['tasks'][0]['startedAt']
stop_time = response['tasks'][0]['stoppedAt']
log_delay = get_log_delay(parse_time(stop_time)-parse_time(start_time)-LOGGER_RUN_TIME_IN_SECOND)
set_buffer(parse_time(stop_time))
#start_time = response['tasks'][0]['startedAt']
#stop_time = response['tasks'][0]['stoppedAt']
#
#log_delay = get_log_delay(parse_time(stop_time)-parse_time(start_time)-LOGGER_RUN_TIME_IN_SECOND)

#set_buffer(parse_time(stop_time))
log_delay = 2

# Validate logs
os.environ['LOG_SOURCE_NAME'] = input_logger["name"]
Expand Down Expand Up @@ -391,6 +395,8 @@ def format_test_results_to_markdown(test_results):

def parse_json_template(template, dict):
data = template
print('parse_json_template')
print(dict)
for key in dict:
if(key[0] == '$'):
data = data.replace(key, dict[key])
Expand All @@ -402,35 +408,37 @@ def parse_json_template(template, dict):
# Returns s3 arn
def publish_fluent_config_s3(session, input_logger):
bucket_name = os.environ['S3_BUCKET_NAME']
s3 = session.client('s3')
s3.upload_file(
input_logger["fluent_config_file_path"],
bucket_name,
f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf',
)
# s3 = session.client('s3')
# s3.upload_file(
# input_logger["fluent_config_file_path"],
# bucket_name,
# f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf',
# )
return f'arn:aws:s3:::{bucket_name}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf'

# The following method is used to clear data between
# testing batches
def delete_testing_data(session):
# All testing data related to the plugin option will be deleted
if OUTPUT_PLUGIN == 'cloudwatch':
# Delete associated cloudwatch log streams
client = session.client('logs')
response = client.describe_log_streams(
logGroupName=os.environ['CW_LOG_GROUP_NAME']
)
for stream in response["logStreams"]:
client.delete_log_stream(
logGroupName=os.environ['CW_LOG_GROUP_NAME'],
logStreamName=stream["logStreamName"]
)
else:
# Delete associated s3 bucket objects
s3 = session.resource('s3')
bucket = s3.Bucket(os.environ['S3_BUCKET_NAME'])
s3_objects = bucket.objects.filter(Prefix=f'{OUTPUT_PLUGIN}-test/{PLATFORM}/')
s3_objects.delete()
print('delete_testing_data')
return
# if OUTPUT_PLUGIN == 'cloudwatch':
# # Delete associated cloudwatch log streams
# client = session.client('logs')
# response = client.describe_log_streams(
# logGroupName=os.environ['CW_LOG_GROUP_NAME']
# )
# for stream in response["logStreams"]:
# client.delete_log_stream(
# logGroupName=os.environ['CW_LOG_GROUP_NAME'],
# logStreamName=stream["logStreamName"]
# )
# else:
# # Delete associated s3 bucket objects
# s3 = session.resource('s3')
# bucket = s3.Bucket(os.environ['S3_BUCKET_NAME'])
# s3_objects = bucket.objects.filter(Prefix=f'{OUTPUT_PLUGIN}-test/{PLATFORM}/')
# s3_objects.delete()

def generate_daemonset_config(throughput):
daemonset_config_dict = {
Expand Down Expand Up @@ -480,22 +488,23 @@ def run_eks_tests():
p.wait()

def delete_testing_resources():
print('delete_testing_resources')
# Create sts session
session = get_sts_boto_session()

# All related testing resources will be destroyed once the stack is deleted
client = session.client('cloudformation')
client.delete_stack(
StackName=TESTING_RESOURCES_STACK_NAME
)
# Empty s3 bucket
s3 = session.resource('s3')
bucket = s3.Bucket(os.environ['S3_BUCKET_NAME'])
bucket.objects.all().delete()
# scale down eks cluster
if PLATFORM == 'eks':
os.system('kubectl delete namespace load-test-fluent-bit-eks-ns')
os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng')
# session = get_sts_boto_session()

# # All related testing resources will be destroyed once the stack is deleted
# client = session.client('cloudformation')
# client.delete_stack(
# StackName=TESTING_RESOURCES_STACK_NAME
# )
# # Empty s3 bucket
# s3 = session.resource('s3')
# bucket = s3.Bucket(os.environ['S3_BUCKET_NAME'])
# bucket.objects.all().delete()
# # scale down eks cluster
# if PLATFORM == 'eks':
# os.system('kubectl delete namespace load-test-fluent-bit-eks-ns')
# os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng')

def get_validated_input_prefix(input_logger):
# Prefix used to form destination identifier
Expand All @@ -510,25 +519,25 @@ def get_validated_input_prefix(input_logger):

def get_sts_boto_session():
# STS credentials
sts_client = boto3.client('sts')

# Call the assume_role method of the STSConnection object and pass the role
# ARN and a role session name.
assumed_role_object = sts_client.assume_role(
RoleArn=os.environ["LOAD_TEST_CFN_ROLE_ARN"],
RoleSessionName="load-test-cfn",
DurationSeconds=3600
)
# sts_client = boto3.client('sts')

# # Call the assume_role method of the STSConnection object and pass the role
# # ARN and a role session name.
# assumed_role_object = sts_client.assume_role(
# RoleArn=os.environ["LOAD_TEST_CFN_ROLE_ARN"],
# RoleSessionName="load-test-cfn",
# DurationSeconds=3600
# )

# From the response that contains the assumed role, get the temporary
# credentials that can be used to make subsequent API calls
credentials=assumed_role_object['Credentials']
# # From the response that contains the assumed role, get the temporary
# # credentials that can be used to make subsequent API calls
# credentials=assumed_role_object['Credentials']

# Create boto session
return boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
aws_access_key_id='AccessKeyId',
aws_secret_access_key='SecretAccessKey',
aws_session_token='SessionToken'
)

if sys.argv[1] == 'create_testing_resources':
Expand Down

0 comments on commit d5fd7c1

Please sign in to comment.