diff --git a/src/test/rgw/bucket_notification/api.py b/src/test/rgw/bucket_notification/api.py index e7ec31f17111a..e84aa16edc753 100644 --- a/src/test/rgw/bucket_notification/api.py +++ b/src/test/rgw/bucket_notification/api.py @@ -247,12 +247,16 @@ def delete_all_topics(conn, tenant, cluster): if tenant == '': topics_result = admin(['topic', 'list'], cluster) topics_json = json.loads(topics_result[0]) + if 'topics' not in topics_json: + topics_json = topics_json.get('result',{}) for topic in topics_json['topics']: rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster) print(rm_result) else: topics_result = admin(['topic', 'list', '--tenant', tenant], cluster) topics_json = json.loads(topics_result[0]) + if 'topics' not in topics_json: + topics_json = topics_json.get('result',{}) for topic in topics_json['topics']: rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster) print(rm_result) diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 359990b35319d..90ee33617fe86 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -4359,6 +4359,242 @@ def test_ps_s3_multiple_topics_notification(): http_server.close() +@attr('data_path_v2_test') +def test_ps_s3_list_topics_migration(): + """ test list topics on migration""" + if get_config_cluster() == 'noname': + return SkipTest('realm is needed for migration test') + + # Initialize connections and configurations + conn1 = connection() + tenant = 'kaboom1' + conn2 = connect_random_user(tenant) + bucket_name = gen_bucket_name() + topics = [f"{bucket_name}{TOPIC_SUFFIX}{i}" for i in range(1, 7)] + tenant_topics = [f"{tenant}_{topic}" for topic in topics] + + # Define topic names with version + topic_versions = { + "topic1_v2": f"{topics[0]}_v2", + "topic2_v2": f"{topics[1]}_v2", + "topic3_v1": f"{topics[2]}_v1", + "topic4_v1": f"{topics[3]}_v1", + "topic5_v1": f"{topics[4]}_v1", + "topic6_v1": f"{topics[5]}_v1", + "tenant_topic1_v2": f"{tenant_topics[0]}_v2", + "tenant_topic2_v1": f"{tenant_topics[1]}_v1", + "tenant_topic3_v1": f"{tenant_topics[2]}_v1" + } + + # Get necessary configurations + host = get_ip() + http_port = random.randint(10000, 20000) + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + zonegroup = get_config_zonegroup() + conf_cluster = get_config_cluster() + + # Make sure there are no leftover topics on v2 + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) + delete_all_topics(conn1, '', conf_cluster) + delete_all_topics(conn2, tenant, conf_cluster) + + # Start v1 notification + # Make sure there are no leftover topics on v1 + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) + delete_all_topics(conn1, '', conf_cluster) + delete_all_topics(conn2, tenant, conf_cluster) + + # Create s3 - v1 topics + topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args) + topic_arn3 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args) + topic_arn4 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_versions['topic5_v1'], zonegroup, endpoint_args=endpoint_args) + topic_arn5 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_versions['topic6_v1'], zonegroup, endpoint_args=endpoint_args) + topic_arn6 = topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic2_v1'], zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn2 = tenant_topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic3_v1'], zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn3 = tenant_topic_conf.set_config() + + # Start v2 notification + zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2) + + # Create s3 - v2 topics + topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args) + topic_arn2 = topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic1_v2'], zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn1 = tenant_topic_conf.set_config() + + # Verify topics list + try: + # Verify no tenant topics + res, status = topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] + assert_equal(len(member), 6) + + # Verify tenant topics + res, status = tenant_topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] + assert_equal(len(member), 3) + finally: + # Cleanup created topics + topic_conf.del_config(topic_arn1) + topic_conf.del_config(topic_arn2) + topic_conf.del_config(topic_arn3) + topic_conf.del_config(topic_arn4) + topic_conf.del_config(topic_arn5) + topic_conf.del_config(topic_arn6) + tenant_topic_conf.del_config(tenant_topic_arn1) + tenant_topic_conf.del_config(tenant_topic_arn2) + tenant_topic_conf.del_config(tenant_topic_arn3) + + +@attr('basic_test') +def test_ps_s3_list_topics(): + """ test list topics""" + + # Initialize connections, topic names and configurations + conn1 = connection() + tenant = 'kaboom1' + conn2 = connect_random_user(tenant) + bucket_name = gen_bucket_name() + topic_name1 = bucket_name + TOPIC_SUFFIX + '1' + topic_name2 = bucket_name + TOPIC_SUFFIX + '2' + topic_name3 = bucket_name + TOPIC_SUFFIX + '3' + tenant_topic_name1 = tenant + "_" + topic_name1 + tenant_topic_name2 = tenant + "_" + topic_name2 + host = get_ip() + http_port = random.randint(10000, 20000) + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + zonegroup = get_config_zonegroup() + + # Make sure there are no leftover topics + delete_all_topics(conn1, '', get_config_cluster()) + delete_all_topics(conn2, tenant, get_config_cluster()) + + # Create s3 - v2 topics + topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args) + topic_arn2 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args) + topic_arn3 = topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn1 = tenant_topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn2 = tenant_topic_conf.set_config() + + # Verify topics list + try: + # Verify no tenant topics + res, status = topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] # version 2 + assert_equal(len(member), 3) + + # Verify topics for tenant + res, status = tenant_topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] + assert_equal(len(member), 2) + finally: + # Cleanup created topics + topic_conf.del_config(topic_arn1) + topic_conf.del_config(topic_arn2) + topic_conf.del_config(topic_arn3) + tenant_topic_conf.del_config(tenant_topic_arn1) + tenant_topic_conf.del_config(tenant_topic_arn2) + +@attr('data_path_v2_test') +def test_ps_s3_list_topics_v1(): + """ test list topics on v1""" + if get_config_cluster() == 'noname': + return SkipTest('realm is needed') + + # Initialize connections and configurations + conn1 = connection() + tenant = 'kaboom1' + conn2 = connect_random_user(tenant) + bucket_name = gen_bucket_name() + topic_name1 = bucket_name + TOPIC_SUFFIX + '1' + topic_name2 = bucket_name + TOPIC_SUFFIX + '2' + topic_name3 = bucket_name + TOPIC_SUFFIX + '3' + tenant_topic_name1 = tenant + "_" + topic_name1 + tenant_topic_name2 = tenant + "_" + topic_name2 + host = get_ip() + http_port = random.randint(10000, 20000) + endpoint_address = 'http://' + host + ':' + str(http_port) + endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true' + zonegroup = get_config_zonegroup() + conf_cluster = get_config_cluster() + + # Make sure there are no leftover topics + delete_all_topics(conn1, '', conf_cluster) + delete_all_topics(conn2, tenant, conf_cluster) + + # Make sure that we disable v2 + zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2) + + # Create s3 - v1 topics + topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args) + topic_arn1 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args) + topic_arn2 = topic_conf.set_config() + topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args) + topic_arn3 = topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn1 = tenant_topic_conf.set_config() + tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args) + tenant_topic_arn2 = tenant_topic_conf.set_config() + + # Verify topics list + try: + # Verify no tenant topics + res, status = topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] + assert_equal(len(member), 3) + + # Verify tenant topics + res, status = tenant_topic_conf.get_list() + assert_equal(status // 100, 2) + listTopicsResponse = res.get('ListTopicsResponse', {}) + listTopicsResult = listTopicsResponse.get('ListTopicsResult', {}) + topics = listTopicsResult.get('Topics', {}) + member = topics['member'] if topics else [] + assert_equal(len(member), 2) + finally: + # Cleanup created topics + topic_conf.del_config(topic_arn1) + topic_conf.del_config(topic_arn2) + topic_conf.del_config(topic_arn3) + tenant_topic_conf.del_config(tenant_topic_arn1) + tenant_topic_conf.del_config(tenant_topic_arn2) + + @attr('basic_test') def test_ps_s3_topic_permissions(): """ test s3 topic set/get/delete permissions """