From d5fd7c1f32852411d0a747ada826925fd97eeee1 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 12 Jun 2023 20:05:29 -0700 Subject: [PATCH] changes to test load test locally Signed-off-by: Wesley Pettit --- load_tests/load_test.py | 213 +++++++++++++++++++++------------------- 1 file changed, 111 insertions(+), 102 deletions(-) diff --git a/load_tests/load_test.py b/load_tests/load_test.py index 4b44d40f2..788b5a26d 100644 --- a/load_tests/load_test.py +++ b/load_tests/load_test.py @@ -102,7 +102,7 @@ def generate_task_definition(session, throughput, input_logger, s3_fluent_config # Task Environment Variables '$TASK_ROLE_ARN': os.environ['LOAD_TEST_TASK_ROLE_ARN'], - '$TASK_EXECUTION_ROLE_ARN': os.environ['LOAD_TEST_TASK_EXECUTION_ROLE_ARN'], + '$TASK_EXECUTION_ROLE_ARN': os.environ['TASK_EXECUTION_ROLE_ARN'], '$CUSTOM_S3_OBJECT_NAME': resource_resolver.resolve_s3_object_name(custom_config), # Plugin Specific Environment Variables @@ -142,44 +142,45 @@ def generate_task_definition(session, throughput, input_logger, s3_fluent_config if IS_TASK_DEFINITION_PRINTED: print("Registering task definition:") print(json.dumps(task_def, indent=4)) - session.client('ecs').register_task_definition( - **task_def - ) + # session.client('ecs').register_task_definition( + # **task_def + # ) else: print("Registering task definition") # With multiple codebuild projects running parallel, # Testing resources only needs to be created once def create_testing_resources(): - session = get_sts_boto_session() - - if OUTPUT_PLUGIN != 'cloudwatch': - client = session.client('cloudformation') - waiter = client.get_waiter('stack_exists') - waiter.wait( - StackName=TESTING_RESOURCES_STACK_NAME, - WaiterConfig={ - 'MaxAttempts': 60 - } - ) - waiter = client.get_waiter('stack_create_complete') - waiter.wait( - StackName=TESTING_RESOURCES_STACK_NAME - ) - else: - # scale up eks cluster - if PLATFORM == 'eks': - os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng') - while True: - time.sleep(90) - number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l") - if(int(number_of_nodes) == NUM_OF_EKS_NODES): - break - # create namespace - os.system('kubectl apply -f ./load_tests/create_testing_resources/eks/namespace.yaml') - # Once deployment starts, it will wait until the stack creation is completed - os.chdir(f'./load_tests/{sys.argv[1]}/{PLATFORM}') - os.system('cdk deploy --require-approval never') + print('create_testing_resources') + # session = get_sts_boto_session() + + # if OUTPUT_PLUGIN != 'cloudwatch': + # client = session.client('cloudformation') + # waiter = client.get_waiter('stack_exists') + # waiter.wait( + # StackName=TESTING_RESOURCES_STACK_NAME, + # WaiterConfig={ + # 'MaxAttempts': 60 + # } + # ) + # waiter = client.get_waiter('stack_create_complete') + # waiter.wait( + # StackName=TESTING_RESOURCES_STACK_NAME + # ) + # else: + # # scale up eks cluster + # if PLATFORM == 'eks': + # os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes={NUM_OF_EKS_NODES} ng') + # while True: + # time.sleep(90) + # number_of_nodes = subprocess.getoutput("kubectl get nodes --no-headers=true | wc -l") + # if(int(number_of_nodes) == NUM_OF_EKS_NODES): + # break + # # create namespace + # os.system('kubectl apply -f ./load_tests/create_testing_resources/eks/namespace.yaml') + # # Once deployment starts, it will wait until the stack creation is completed + # os.chdir(f'./load_tests/{sys.argv[1]}/{PLATFORM}') + # os.system('cdk deploy --require-approval never') # For tests on ECS, we need to: # 1. generate and register task definitions based on templates at /load_tests/task_definitons @@ -195,7 +196,7 @@ def run_ecs_tests(): for input_logger in INPUT_LOGGERS: session = get_sts_boto_session() - client = session.client('ecs') + #client = session.client('ecs') # Delete corresponding testing data for a fresh start delete_testing_data(session) @@ -207,12 +208,12 @@ def run_ecs_tests(): for throughput in THROUGHPUT_LIST: os.environ['THROUGHPUT'] = throughput generate_task_definition(session, throughput, input_logger, s3_fluent_config_arn) - response = client.run_task( + print('''response = client.run_task( cluster=ecs_cluster_name, launchType='EC2', taskDefinition=f'{PREFIX}{OUTPUT_PLUGIN}-{throughput}-{input_logger["name"]}' - ) - names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn'] = response['tasks'][0]['taskArn'] + )''') + names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn'] = 'fake-task-arn' # Validation input type banner print(f'\nTest {input_logger["name"]} to {OUTPUT_PLUGIN} in progress...') @@ -224,10 +225,10 @@ def run_ecs_tests(): for throughput in THROUGHPUT_LIST: session = get_sts_boto_session() - client = session.client('ecs') - waiter = client.get_waiter('tasks_stopped') + # client = session.client('ecs') + #waiter = client.get_waiter('tasks_stopped') task_arn = names[f'{OUTPUT_PLUGIN}_{throughput}_task_arn'] - waiter.wait( + print('''waiter.wait( cluster=ecs_cluster_name, tasks=[ task_arn, @@ -235,21 +236,24 @@ def run_ecs_tests(): WaiterConfig={ 'MaxAttempts': 600 } - ) - response = client.describe_tasks( + )''') + print('''response = client.describe_tasks( cluster=ecs_cluster_name, tasks=[ task_arn, ] - ) + )''') print(f'task_arn={task_arn}') - print(f'response={response}') - check_app_exit_code(response) + # print(f'response={response}') + #check_app_exit_code(response) input_record = calculate_total_input_number(throughput) - start_time = response['tasks'][0]['startedAt'] - stop_time = response['tasks'][0]['stoppedAt'] - log_delay = get_log_delay(parse_time(stop_time)-parse_time(start_time)-LOGGER_RUN_TIME_IN_SECOND) - set_buffer(parse_time(stop_time)) + #start_time = response['tasks'][0]['startedAt'] + #stop_time = response['tasks'][0]['stoppedAt'] + # + #log_delay = get_log_delay(parse_time(stop_time)-parse_time(start_time)-LOGGER_RUN_TIME_IN_SECOND) + + #set_buffer(parse_time(stop_time)) + log_delay = 2 # Validate logs os.environ['LOG_SOURCE_NAME'] = input_logger["name"] @@ -391,6 +395,8 @@ def format_test_results_to_markdown(test_results): def parse_json_template(template, dict): data = template + print('parse_json_template') + print(dict) for key in dict: if(key[0] == '$'): data = data.replace(key, dict[key]) @@ -402,35 +408,37 @@ def parse_json_template(template, dict): # Returns s3 arn def publish_fluent_config_s3(session, input_logger): bucket_name = os.environ['S3_BUCKET_NAME'] - s3 = session.client('s3') - s3.upload_file( - input_logger["fluent_config_file_path"], - bucket_name, - f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf', - ) + # s3 = session.client('s3') + # s3.upload_file( + # input_logger["fluent_config_file_path"], + # bucket_name, + # f'{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf', + # ) return f'arn:aws:s3:::{bucket_name}/{OUTPUT_PLUGIN}-test/{PLATFORM}/fluent-{input_logger["name"]}.conf' # The following method is used to clear data between # testing batches def delete_testing_data(session): # All testing data related to the plugin option will be deleted - if OUTPUT_PLUGIN == 'cloudwatch': - # Delete associated cloudwatch log streams - client = session.client('logs') - response = client.describe_log_streams( - logGroupName=os.environ['CW_LOG_GROUP_NAME'] - ) - for stream in response["logStreams"]: - client.delete_log_stream( - logGroupName=os.environ['CW_LOG_GROUP_NAME'], - logStreamName=stream["logStreamName"] - ) - else: - # Delete associated s3 bucket objects - s3 = session.resource('s3') - bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) - s3_objects = bucket.objects.filter(Prefix=f'{OUTPUT_PLUGIN}-test/{PLATFORM}/') - s3_objects.delete() + print('delete_testing_data') + return + # if OUTPUT_PLUGIN == 'cloudwatch': + # # Delete associated cloudwatch log streams + # client = session.client('logs') + # response = client.describe_log_streams( + # logGroupName=os.environ['CW_LOG_GROUP_NAME'] + # ) + # for stream in response["logStreams"]: + # client.delete_log_stream( + # logGroupName=os.environ['CW_LOG_GROUP_NAME'], + # logStreamName=stream["logStreamName"] + # ) + # else: + # # Delete associated s3 bucket objects + # s3 = session.resource('s3') + # bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) + # s3_objects = bucket.objects.filter(Prefix=f'{OUTPUT_PLUGIN}-test/{PLATFORM}/') + # s3_objects.delete() def generate_daemonset_config(throughput): daemonset_config_dict = { @@ -480,22 +488,23 @@ def run_eks_tests(): p.wait() def delete_testing_resources(): + print('delete_testing_resources') # Create sts session - session = get_sts_boto_session() - - # All related testing resources will be destroyed once the stack is deleted - client = session.client('cloudformation') - client.delete_stack( - StackName=TESTING_RESOURCES_STACK_NAME - ) - # Empty s3 bucket - s3 = session.resource('s3') - bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) - bucket.objects.all().delete() - # scale down eks cluster - if PLATFORM == 'eks': - os.system('kubectl delete namespace load-test-fluent-bit-eks-ns') - os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng') + # session = get_sts_boto_session() + + # # All related testing resources will be destroyed once the stack is deleted + # client = session.client('cloudformation') + # client.delete_stack( + # StackName=TESTING_RESOURCES_STACK_NAME + # ) + # # Empty s3 bucket + # s3 = session.resource('s3') + # bucket = s3.Bucket(os.environ['S3_BUCKET_NAME']) + # bucket.objects.all().delete() + # # scale down eks cluster + # if PLATFORM == 'eks': + # os.system('kubectl delete namespace load-test-fluent-bit-eks-ns') + # os.system(f'eksctl scale nodegroup --cluster={EKS_CLUSTER_NAME} --nodes=0 ng') def get_validated_input_prefix(input_logger): # Prefix used to form destination identifier @@ -510,25 +519,25 @@ def get_validated_input_prefix(input_logger): def get_sts_boto_session(): # STS credentials - sts_client = boto3.client('sts') - - # Call the assume_role method of the STSConnection object and pass the role - # ARN and a role session name. - assumed_role_object = sts_client.assume_role( - RoleArn=os.environ["LOAD_TEST_CFN_ROLE_ARN"], - RoleSessionName="load-test-cfn", - DurationSeconds=3600 - ) + # sts_client = boto3.client('sts') + + # # Call the assume_role method of the STSConnection object and pass the role + # # ARN and a role session name. + # assumed_role_object = sts_client.assume_role( + # RoleArn=os.environ["LOAD_TEST_CFN_ROLE_ARN"], + # RoleSessionName="load-test-cfn", + # DurationSeconds=3600 + # ) - # From the response that contains the assumed role, get the temporary - # credentials that can be used to make subsequent API calls - credentials=assumed_role_object['Credentials'] + # # From the response that contains the assumed role, get the temporary + # # credentials that can be used to make subsequent API calls + # credentials=assumed_role_object['Credentials'] # Create boto session return boto3.Session( - aws_access_key_id=credentials['AccessKeyId'], - aws_secret_access_key=credentials['SecretAccessKey'], - aws_session_token=credentials['SessionToken'] + aws_access_key_id='AccessKeyId', + aws_secret_access_key='SecretAccessKey', + aws_session_token='SessionToken' ) if sys.argv[1] == 'create_testing_resources':