diff --git a/tests/apitests/python/library/base.py b/tests/apitests/python/library/base.py index acae0189c69c..5c92ad54aaa7 100644 --- a/tests/apitests/python/library/base.py +++ b/tests/apitests/python/library/base.py @@ -32,7 +32,7 @@ def _create_client(server, credential, debug, api_type="products"): if api_type in ('projectv2', 'artifact', 'repository', 'scanner', 'scan', 'scanall', 'preheat', 'quota', 'replication', 'registry', 'robot', 'gc', 'retention', 'immutable', 'system_cve_allowlist', 'configure', 'user', 'member', 'health', 'label', 'webhook', 'purge', 'audit_log', 'scan_data_export', - 'statistic', "system_info"): + 'statistic', "system_info", "jobservice", "schedule"): cfg = v2_swagger_client.Configuration() else: cfg = swagger_client.Configuration() @@ -81,7 +81,9 @@ def _create_client(server, credential, debug, api_type="products"): "audit_log": v2_swagger_client.AuditlogApi(v2_swagger_client.ApiClient(cfg)), "scan_data_export": v2_swagger_client.ScanDataExportApi(v2_swagger_client.ApiClient(cfg)), "statistic": v2_swagger_client.StatisticApi(v2_swagger_client.ApiClient(cfg)), - "system_info": v2_swagger_client.SysteminfoApi(v2_swagger_client.ApiClient(cfg)) + "system_info": v2_swagger_client.SysteminfoApi(v2_swagger_client.ApiClient(cfg)), + "jobservice": v2_swagger_client.JobserviceApi(v2_swagger_client.ApiClient(cfg)), + "schedule": v2_swagger_client.ScheduleApi(v2_swagger_client.ApiClient(cfg)), }.get(api_type,'Error: Wrong API type') def _assert_status_code(expect_code, return_code, err_msg = r"HTTPS status code s not as we expected. Expected {}, while actual HTTPS status code is {}."): diff --git a/tests/apitests/python/library/gc.py b/tests/apitests/python/library/gc.py index 89fe927c0dd0..831aa277ab44 100644 --- a/tests/apitests/python/library/gc.py +++ b/tests/apitests/python/library/gc.py @@ -91,6 +91,28 @@ def create_gc_schedule(self, schedule_type, is_delete_untagged, cron = None, exp base._assert_status_code(expect_status_code, status_code) return base._get_id_from_header(header) + def update_gc_schedule(self, schedule_type, cron = None, expect_status_code = 200, expect_response_body = None, **kwargs): + gc_schedule = v2_swagger_client.ScheduleObj() + gc_schedule.type = schedule_type + if cron is not None: + gc_schedule.cron = cron + + gc_job = v2_swagger_client.Schedule() + gc_job.schedule = gc_schedule + + try: + _, status_code, header = self._get_client(**kwargs).update_gc_schedule_with_http_info(gc_job) + except ApiException as e: + if e.status == expect_status_code: + if expect_response_body is not None and e.body.strip() != expect_response_body.strip(): + raise Exception(r"Update GC schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip())) + else: + return e.reason, e.body + else: + raise Exception(r"Update GC schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status)) + base._assert_status_code(expect_status_code, status_code) + return base._get_id_from_header(header) + def gc_now(self, is_delete_untagged=False, **kwargs): gc_id = self.create_gc_schedule('Manual', is_delete_untagged, **kwargs) return gc_id diff --git a/tests/apitests/python/library/jobservice.py b/tests/apitests/python/library/jobservice.py new file mode 100644 index 000000000000..25e6b3104f00 --- /dev/null +++ b/tests/apitests/python/library/jobservice.py @@ -0,0 +1,67 @@ +# -*- coding: utf-8 -*- + +import base +import v2_swagger_client +from v2_swagger_client.rest import ApiException + + +class Jobservice(base.Base): + + def __init__(self): + super(Jobservice, self).__init__(api_type="jobservice") + + def get_job_queues(self, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).list_job_queues_with_http_info() + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return dict(zip([job_queue.job_type for job_queue in return_data], return_data)) + + def action_pending_jobs(self, job_type, action, expect_status_code=200, expect_response_body=None, **kwargs): + try: + action_request = v2_swagger_client.ActionRequest(action=action) + return_data, status_code, _ = self._get_client(**kwargs).action_pending_jobs_with_http_info(job_type, action_request) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_worker_pools(self, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_worker_pools_with_http_info() + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def get_workers(self, pool_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_workers_with_http_info(pool_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def stop_running_job(self, job_id, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).stop_running_job_with_http_info(job_id) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data diff --git a/tests/apitests/python/library/preheat.py b/tests/apitests/python/library/preheat.py index aeda8547432b..03e9f73856ca 100644 --- a/tests/apitests/python/library/preheat.py +++ b/tests/apitests/python/library/preheat.py @@ -16,7 +16,6 @@ def create_instance(self, name = None, description="It's a dragonfly instance", name = base._random_name("instance") instance = v2_swagger_client.Instance(name=name, description=description,vendor=vendor, endpoint=endpoint_url, auth_mode=auth_mode, enabled=enabled) - print("instance:",instance) try: _, status_code, header = self._get_client(**kwargs).create_instance_with_http_info(instance) except ApiException as e: @@ -36,7 +35,6 @@ def create_policy(self, project_name, project_id, provider_id, name = None, desc policy = v2_swagger_client.PreheatPolicy(name=name, project_id=project_id, provider_id=provider_id, description=description,filters=filters, trigger=trigger, enabled=enabled) - print("policy:",policy) try: data, status_code, header = self._get_client(**kwargs).create_policy_with_http_info(project_name, policy) except ApiException as e: diff --git a/tests/apitests/python/library/purge.py b/tests/apitests/python/library/purge.py index 2a157dcfd7c9..a49fe06d5f26 100644 --- a/tests/apitests/python/library/purge.py +++ b/tests/apitests/python/library/purge.py @@ -6,11 +6,11 @@ class Purge(base.Base): - + def __init__(self): super(Purge, self).__init__(api_type="purge") - def create_purge_schedule(self, type, cron, dry_run, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs): + def create_purge_schedule(self, type, cron, dry_run=True, audit_retention_hour=24, include_operations="create,delete,pull", expect_status_code=201, expect_response_body=None, **kwargs): scheduleObj = v2_swagger_client.ScheduleObj(type=type) if cron is not None: scheduleObj.cron = cron @@ -102,4 +102,4 @@ def get_purge_schedule(self, expect_status_code=200, expect_response_body=None, base._assert_status_body(expect_response_body, e.body) return base._assert_status_code(expect_status_code, status_code) - return return_data \ No newline at end of file + return return_data diff --git a/tests/apitests/python/library/retention.py b/tests/apitests/python/library/retention.py index 04652110ece7..a67843d0072a 100644 --- a/tests/apitests/python/library/retention.py +++ b/tests/apitests/python/library/retention.py @@ -63,7 +63,7 @@ def get_retention_policy(self, retention_id, expect_status_code = 200, **kwargs) base._assert_status_code(expect_status_code, status_code) return policy - def update_retention_policy(self, retention_id, selector_repository="**", selector_tag="**", expect_status_code = 200, **kwargs): + def update_retention_policy(self, retention_id, project_id, selector_repository="**", selector_tag="**", cron="", expect_status_code = 200, **kwargs): policy=v2_swagger_client.RetentionPolicy( id=retention_id, algorithm='or', @@ -72,9 +72,7 @@ def update_retention_policy(self, retention_id, selector_repository="**", select disabled=False, action="retain", template="always", - params= { - - }, + params= {}, scope_selectors={ "repository": [ { @@ -96,7 +94,7 @@ def update_retention_policy(self, retention_id, selector_repository="**", select trigger= { "kind": "Schedule", "settings": { - "cron": "" + "cron": cron }, "references": { } diff --git a/tests/apitests/python/library/scan_all.py b/tests/apitests/python/library/scan_all.py index b25f960dc231..32f263bd3ce4 100644 --- a/tests/apitests/python/library/scan_all.py +++ b/tests/apitests/python/library/scan_all.py @@ -30,6 +30,27 @@ def create_scan_all_schedule(self, schedule_type, cron=None, expect_status_code= raise Exception(r"Create scan all schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status)) base._assert_status_code(expect_status_code, status_code) + def update_scan_all_schedule(self, schedule_type, cron=None, expect_status_code=200, expect_response_body=None, **kwargs): + schedule_obj = v2_swagger_client.ScheduleObj() + schedule_obj.type = schedule_type + if cron is not None: + schedule_obj.cron = cron + + schedule = v2_swagger_client.Schedule() + schedule.schedule = schedule_obj + + try: + _, status_code, _ = self._get_client(**kwargs).update_scan_all_schedule_with_http_info(schedule) + except ApiException as e: + if e.status == expect_status_code: + if expect_response_body is not None and e.body.strip() != expect_response_body.strip(): + raise Exception(r"Update scan all schedule response body is not as expected {} actual status is {}.".format(expect_response_body.strip(), e.body.strip())) + else: + return e.reason, e.body + else: + raise Exception(r"Update scan all schedule result is not as expected {} actual status is {}.".format(expect_status_code, e.status)) + base._assert_status_code(expect_status_code, status_code) + def scan_all_now(self, **kwargs): self.create_scan_all_schedule('Manual', **kwargs) diff --git a/tests/apitests/python/library/schedule.py b/tests/apitests/python/library/schedule.py new file mode 100644 index 000000000000..ae61e34d5735 --- /dev/null +++ b/tests/apitests/python/library/schedule.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- + +import base +from v2_swagger_client.rest import ApiException + + +class Schedule(base.Base): + + def __init__(self): + super(Schedule, self).__init__(api_type="schedule") + + def get_schedule_paused(self, job_type, expect_status_code=200, expect_response_body=None, **kwargs): + try: + return_data, status_code, _ = self._get_client(**kwargs).get_schedule_paused_with_http_info(job_type) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + return return_data + + def list_schedules(self, page_size=50, page=1, expect_status_code=200, expect_response_body=None, **kwargs): + try: + schedules, status_code, _ = self._get_client(**kwargs).list_schedules_with_http_info(page_size=50, page=1) + except ApiException as e: + base._assert_status_code(expect_status_code, e.status) + if expect_response_body is not None: + base._assert_status_body(expect_response_body, e.body) + return + base._assert_status_code(expect_status_code, status_code) + schedule_dict = {} + for schedule in schedules: + if schedule.vendor_id in [ None, -1]: + schedule_dict[schedule.vendor_type] = schedule + else: + schedule_dict["%s-%d" % (schedule.vendor_type, schedule.vendor_id)] = schedule + return schedule_dict diff --git a/tests/apitests/python/test_job_service_dashboard.py b/tests/apitests/python/test_job_service_dashboard.py new file mode 100644 index 000000000000..97bbc4615ab1 --- /dev/null +++ b/tests/apitests/python/test_job_service_dashboard.py @@ -0,0 +1,351 @@ +from __future__ import absolute_import +import time + +import unittest +import v2_swagger_client +from library import base +from testutils import harbor_server, ADMIN_CLIENT, suppress_urllib3_warning +from library.jobservice import Jobservice +from library.gc import GC +from library.purge import Purge +from library.user import User +from library.project import Project +from library.retention import Retention +from library.preheat import Preheat +from library.replication import Replication +from library.registry import Registry +from library.scan_all import ScanAll +from library.schedule import Schedule + + +class TestJobServiceDashboard(unittest.TestCase, object): + + @suppress_urllib3_warning + def setUp(self): + self.jobservice = Jobservice() + self.gc = GC() + self.purge = Purge() + self.user = User() + self.project = Project() + self.retention = Retention() + self.preheat = Preheat() + self.replication = Replication() + self.registry = Registry() + self.scan_all = ScanAll() + self.schedule = Schedule() + self.job_types = [ "GARBAGE_COLLECTION", "PURGE_AUDIT", "P2P_PREHEAT", "IMAGE_SCAN", "REPLICATION", "RETENTION", "SCAN_DATA_EXPORT", "SCHEDULER", "SLACK", "SYSTEM_ARTIFACT_CLEANUP", "WEBHOOK"] + self.cron_type = "Custom" + self.cron = "0 0 0 * * 0" + + def testJobQueues(self): + """ + Test case: + Job Service Dashboard Job Queues + Test step and expected result: + 1. List job queue; + 2. Pause GC Job and purge audit Job; + 3. Verify that the Job status is Paused; + 4. Run GC and purge audit; + 5. Verify pending jobs of job queues; + 6. Resume GC Job and purge audit Job; + 7. Verify pending jobs of job queues; + 8. Pause GC Job and purge audit Job; + 9. Verify that the Job status is Paused; + 10. Run GC and purge audit; + 11. Verify pending jobs of job queues; + 12. Stop GC Job and purge audit Job; + 13. Verify pending jobs of job queues; + 14. Run GC and purge audit; + 15. Verify pending jobs of job queues; + 16. Stop all Job; + 17. Verify pending jobs of job queues; + 18. Resume all Job; + """ + # 1. List job queue + job_queues = self.jobservice.get_job_queues() + self.assertSetEqual(set(self.job_types), set(job_queues.keys())) + + # 2. Pause GC Job and purge audit Job + self.jobservice.action_pending_jobs(self.job_types[0], "pause") + self.jobservice.action_pending_jobs(self.job_types[1], "pause") + + # 3. Verify that the Job status is Paused + job_queues = self.jobservice.get_job_queues() + self.assertTrue(job_queues[self.job_types[0]].paused) + self.assertTrue(job_queues[self.job_types[1]].paused) + + # 4. Run GC and purge audit + self.gc.gc_now() + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False) + time.sleep(2) + + # 5. Verify pending jobs of job queues + self.verifyPendingJobs([self.job_types[0], self.job_types[1]]) + + # 6. Resume GC Job and purge audit Job + self.jobservice.action_pending_jobs(self.job_types[0], "resume") + self.jobservice.action_pending_jobs(self.job_types[1], "resume") + + # 7. Verify pending jobs of job queues + self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]]) + + # 8. Pause GC Job and purge audit Job; + self.jobservice.action_pending_jobs(self.job_types[0], "pause") + self.jobservice.action_pending_jobs(self.job_types[1], "pause") + + # 9. Verify that the Job status is Paused + job_queues = self.jobservice.get_job_queues() + self.assertTrue(job_queues[self.job_types[0]].paused) + self.assertTrue(job_queues[self.job_types[1]].paused) + + # 10. Run GC and purge audit + self.gc.gc_now() + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False) + time.sleep(2) + + # 11. Verify pending jobs of job queues + self.verifyPendingJobs([self.job_types[0], self.job_types[1]]) + + # 12. Stop GC Job and purge audit Job + self.jobservice.action_pending_jobs(self.job_types[0], "stop") + self.jobservice.action_pending_jobs(self.job_types[1], "stop") + + # 13. Verify pending jobs of job queues + self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]]) + + # 14. Run GC and purge audit + self.gc.gc_now() + self.purge.create_purge_schedule(type="Manual", cron=None, dry_run=False) + time.sleep(2) + + # 15. Verify pending jobs of job queues + self.verifyPendingJobs([self.job_types[0], self.job_types[1]]) + + # 16. Stop all Job + self.jobservice.action_pending_jobs("all", "stop") + + # 17. Verify pending jobs of job queues + self.waitJobQueuesStopToComplete([self.job_types[0], self.job_types[1]]) + + # 18. Resume all Job + self.jobservice.action_pending_jobs("all", "resume") + job_queues = self.jobservice.get_job_queues() + self.assertFalse(job_queues[self.job_types[0]].paused) + self.assertFalse(job_queues[self.job_types[1]].paused) + + + def verifyPendingJobs(self, job_types): + job_queues = self.jobservice.get_job_queues() + for job_type in job_types: + self.assertTrue(job_queues[job_type].count > 0) + self.assertTrue(job_queues[job_type].latency > 0) + self.assertTrue(job_queues[job_type].count > 0) + self.assertTrue(job_queues[job_type].latency > 0) + + + def waitJobQueuesStopToComplete(self, job_types): + is_success = False + for i in range(10): + print("Wait for queues to be consumed:", i) + job_queues = self.jobservice.get_job_queues() + for job_type in job_types: + if job_queues[job_type].count is not None or job_queues[job_type].latency is not None: + is_success = False + break + else: + is_success = True + break + time.sleep(2) + self.assertTrue(is_success) + + + def testSchedules(self): + """ + Test case: + Job Service Dashboard Schedules + Test step and expected result: + 1. Create a new project; + 2. Create a retention policy triggered by schedule; + 3. Create a new distribution; + 4. Create a preheat policy triggered by schedule; + 5. Create a new registry; + 6. Create a replication policy triggered by schedule; + 7. Set up a schedule to scan all; + 8. Set up a schedule to GC; + 9. Set up a schedule to log rotation; + 10. Verify schedules; + 11. Pause all schedules; + 12. Verify schedules is Paused; + 13. Resume all schedules; + 14. Verify schedules is not Paused; + 15. Reset the schedule for scan all, GC, and log rotation; + 16. Verify schedules; + """ + # 1. Create a new project(PA) by user(UA) + project_id, project_name = self.project.create_project(metadata = {"public": "false"}) + + # 2. Create a retention policy + retention_id = self.retention.create_retention_policy(project_id, selector_repository="**", selector_tag="**") + self.retention.update_retention_policy(retention_id, project_id, cron=self.cron) + + # 3. Create a new distribution + _, distribution_name = self.preheat.create_instance(endpoint_url=base._random_name("https://")) + + # 4. Create a new preheat policy + distribution = self.preheat.get_instance(distribution_name) + _, preheat_policy_name = self.preheat.create_policy(project_name, project_id, distribution.id, trigger=r'{"type":"scheduled","trigger_setting":{"cron":"%s"}}' % (self.cron)) + preheat_policy = self.preheat.get_policy(project_name, preheat_policy_name) + + # 5. Create a new registry + registry_id, _ = self.registry.create_registry("https://" + harbor_server) + + # 6. Create a replication policy triggered by schedule + replication_id, _ = self.replication.create_replication_policy(dest_registry=v2_swagger_client.Registry(id=int(registry_id)), trigger=v2_swagger_client.ReplicationTrigger(type="scheduled",trigger_settings=v2_swagger_client.ReplicationTriggerSettings(cron=self.cron))) + + # 7. Set up a schedule to scan all + self.scan_all.create_scan_all_schedule(self.cron_type, cron=self.cron) + + # 8. Set up a schedule to GC + self.gc.create_gc_schedule(self.cron_type, is_delete_untagged=True, cron=self.cron) + + # 9. Set up a schedule to Log Rotation + self.purge.create_purge_schedule(self.cron_type, self.cron, True) + + # 10. Verify schedules + schedules = self.schedule.list_schedules(page_size=50, page=1) + # 10.1 Verify retention schedule + retention_schedule = schedules["%s-%d" % (self.job_types[5], retention_id)] + self.assertEqual(retention_schedule.cron, self.cron) + # 10.2 Verify preheat schedule + preheat_schedule = schedules["%s-%d" % (self.job_types[2], preheat_policy.id)] + self.assertEqual(preheat_schedule.cron, self.cron) + # 10.3 Verify replication schedule + replication_schedule = schedules["%s-%d" % (self.job_types[4], replication_id)] + self.assertEqual(replication_schedule.vendor_type, self.job_types[4]) + self.assertEqual(replication_schedule.cron, self.cron) + # 10.4 Verify scan all schedule + scan_all_schedule = schedules["SCAN_ALL"] + self.assertEqual(scan_all_schedule.cron, self.cron) + # 10.5 Verify GC schedule + gc_schedule = schedules[self.job_types[0]] + self.assertEqual(gc_schedule.cron, self.cron) + # 10.6 Verify log rotation + log_rotation_schedule = schedules["PURGE_AUDIT_LOG"] + self.assertEqual(log_rotation_schedule.cron, self.cron) + + # 11. Pause all schedules + self.jobservice.action_pending_jobs("scheduler", "pause") + + # 12. Verify schedules is Paused; + self.assertTrue(self.schedule.get_schedule_paused("all").paused) + + # 13. Resume all schedules + self.jobservice.action_pending_jobs("scheduler", "resume") + + # 14. Verify schedules is not Paused + self.assertFalse(self.schedule.get_schedule_paused("all").paused) + + # 15. Reset the schedule for scan all, GC, and log rotation + self.scan_all.update_scan_all_schedule("None", cron="") + self.gc.update_gc_schedule("None", cron="") + self.purge.update_purge_schedule("None", "") + + # 16. Verify schedules + schedules = self.schedule.list_schedules(page_size=50, page=1) + # 16.1 Verify retention schedule + retention_schedule = schedules["%s-%d" % (self.job_types[5], retention_id)] + self.assertEqual(retention_schedule.cron, self.cron) + # 16.2 Verify preheat schedule + preheat_schedule = schedules["%s-%d" % (self.job_types[2], preheat_policy.id)] + self.assertEqual(preheat_schedule.cron, self.cron) + # 16.3 Verify replication schedule + replication_schedule = schedules["%s-%d" % (self.job_types[4], replication_id)] + self.assertEqual(replication_schedule.vendor_type, self.job_types[4]) + self.assertEqual(replication_schedule.cron, self.cron) + # 16.4 Verify scan all schedule + self.assertNotIn("SCAN_ALL", schedules) + # 16.5 Verify GC schedule + self.assertNotIn(self.job_types[0], schedules) + # 16.6 Verify log rotation + self.assertNotIn("PURGE_AUDIT_LOG", schedules) + + + def testWorkers(self): + """ + Test case: + Job Service Dashboard Workers + Test step and expected result: + 1. Get worker pools; + 2. Get workers in current pool; + 3. Stop running job; + """ + # 1. Get worker pools + worker_pools = self.jobservice.get_worker_pools() + for worker_pool in worker_pools: + self.assertIsNotNone(worker_pool.pid) + self.assertIsNotNone(worker_pool.worker_pool_id) + self.assertIsNotNone(worker_pool.concurrency) + self.assertIsNotNone(worker_pool.start_at) + self.assertIsNotNone(worker_pool.heartbeat_at) + + # 2. Get workers in current pool + workers = self.jobservice.get_workers(worker_pool.worker_pool_id) + self.assertEqual(len(workers), worker_pool.concurrency) + for worker in workers: + self.assertIsNotNone(worker.id) + self.assertEqual(worker_pool.worker_pool_id, worker.pool_id) + + # 3. Stop running job + self.jobservice.stop_running_job("966a49aa2278b67d743f8aca") + + + def testJobServiceDashboardAPIPermission(self): + """ + Test case: + Log Rotaion Permission API + Test step and expected result: + 1. Create a new user(UA); + 2. User(UA) should not have permission to list job queue API; + 3. User(UA) should not have permission to action_pending_jobs API; + 4. User(UA) should not have permission to stop running job API; + 5. User(UA) should not have permission to get worker pools API; + 6. User(UA) should not have permission to get workers in current pool API; + 7. User(UA) should not have permission to list schedules API; + 8. User(UA) should have permission to get scheduler paused status API; + 9. Verify that the get scheduler paused status API parameter job_type only support all; + """ + expect_status_code = 403 + expect_response_body = "FORBIDDEN" + + # 1. Create a new user(UA) + user_password = "Aa123456" + _, user_name = self.user.create_user(user_password = user_password) + USER_CLIENT = dict(endpoint = ADMIN_CLIENT["endpoint"], username = user_name, password = user_password) + + # 2. User(UA) should not have permission to list job queue API + self.jobservice.get_job_queues(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 3. User(UA) should not have permission to action_pending_jobs API + self.jobservice.action_pending_jobs(self.job_types[0], "pause", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 4. User(UA) should not have permission to stop running job API + self.jobservice.stop_running_job("966a49aa2278b67d743f8aca", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 5. User(UA) should not have permission to get worker pools API + self.jobservice.get_worker_pools(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 6. User(UA) should not have permission to get workers in current pool API + self.jobservice.get_workers("13e0cfe999715102c47614ec", expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 7. User(UA) should not have permission to list schedules API + self.schedule.list_schedules(expect_status_code=expect_status_code, expect_response_body=expect_response_body, **USER_CLIENT) + + # 8. User(UA) should have permission to get scheduler paused status API + self.schedule.get_schedule_paused("all") + + # 9. Verify that the get scheduler paused status API parameter job_type only support all + self.schedule.get_schedule_paused(self.job_types[0], expect_status_code=400, expect_response_body="job_type can only be 'all'") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot index da91d4ff64f4..9c5fff9d5968 100644 --- a/tests/robot-cases/Group0-BAT/API_DB.robot +++ b/tests/robot-cases/Group0-BAT/API_DB.robot @@ -181,4 +181,8 @@ Test Case - Log Forward Test Case - Scan Data Export [Tags] scan_data_export - Harbor API Test ./tests/apitests/python/test_scan_data_export.py \ No newline at end of file + Harbor API Test ./tests/apitests/python/test_scan_data_export.py + +Test Case - Job Service Dashboard + [Tags] job_service_dashboard + Harbor API Test ./tests/apitests/python/test_job_service_dashboard.py