Skip to content

Commit

Permalink
Merge pull request ceph#60788 from oshrey16/tests-rgw-ps-list-topics-op
Browse files Browse the repository at this point in the history
tests/rgw/notifications: Add tests for RGWPSListTopicsOp::execute()
  • Loading branch information
yuvalif authored Dec 19, 2024
2 parents ed261a0 + 65614c4 commit 9349823
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/test/rgw/bucket_notification/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,16 @@ def delete_all_topics(conn, tenant, cluster):
if tenant == '':
topics_result = admin(['topic', 'list'], cluster)
topics_json = json.loads(topics_result[0])
if 'topics' not in topics_json:
topics_json = topics_json.get('result',{})
for topic in topics_json['topics']:
rm_result = admin(['topic', 'rm', '--topic', topic['name']], cluster)
print(rm_result)
else:
topics_result = admin(['topic', 'list', '--tenant', tenant], cluster)
topics_json = json.loads(topics_result[0])
if 'topics' not in topics_json:
topics_json = topics_json.get('result',{})
for topic in topics_json['topics']:
rm_result = admin(['topic', 'rm', '--tenant', tenant, '--topic', topic['name']], cluster)
print(rm_result)
Expand Down
236 changes: 236 additions & 0 deletions src/test/rgw/bucket_notification/test_bn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4359,6 +4359,242 @@ def test_ps_s3_multiple_topics_notification():
http_server.close()


@attr('data_path_v2_test')
def test_ps_s3_list_topics_migration():
""" test list topics on migration"""
if get_config_cluster() == 'noname':
return SkipTest('realm is needed for migration test')

# Initialize connections and configurations
conn1 = connection()
tenant = 'kaboom1'
conn2 = connect_random_user(tenant)
bucket_name = gen_bucket_name()
topics = [f"{bucket_name}{TOPIC_SUFFIX}{i}" for i in range(1, 7)]
tenant_topics = [f"{tenant}_{topic}" for topic in topics]

# Define topic names with version
topic_versions = {
"topic1_v2": f"{topics[0]}_v2",
"topic2_v2": f"{topics[1]}_v2",
"topic3_v1": f"{topics[2]}_v1",
"topic4_v1": f"{topics[3]}_v1",
"topic5_v1": f"{topics[4]}_v1",
"topic6_v1": f"{topics[5]}_v1",
"tenant_topic1_v2": f"{tenant_topics[0]}_v2",
"tenant_topic2_v1": f"{tenant_topics[1]}_v1",
"tenant_topic3_v1": f"{tenant_topics[2]}_v1"
}

# Get necessary configurations
host = get_ip()
http_port = random.randint(10000, 20000)
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
zonegroup = get_config_zonegroup()
conf_cluster = get_config_cluster()

# Make sure there are no leftover topics on v2
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)
delete_all_topics(conn1, '', conf_cluster)
delete_all_topics(conn2, tenant, conf_cluster)

# Start v1 notification
# Make sure there are no leftover topics on v1
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)
delete_all_topics(conn1, '', conf_cluster)
delete_all_topics(conn2, tenant, conf_cluster)

# Create s3 - v1 topics
topic_conf = PSTopicS3(conn1, topic_versions['topic3_v1'], zonegroup, endpoint_args=endpoint_args)
topic_arn3 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic4_v1'], zonegroup, endpoint_args=endpoint_args)
topic_arn4 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic5_v1'], zonegroup, endpoint_args=endpoint_args)
topic_arn5 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic6_v1'], zonegroup, endpoint_args=endpoint_args)
topic_arn6 = topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic2_v1'], zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn2 = tenant_topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic3_v1'], zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn3 = tenant_topic_conf.set_config()

# Start v2 notification
zonegroup_modify_feature(enable=True, feature_name=zonegroup_feature_notification_v2)

# Create s3 - v2 topics
topic_conf = PSTopicS3(conn1, topic_versions['topic1_v2'], zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_versions['topic2_v2'], zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, topic_versions['tenant_topic1_v2'], zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn1 = tenant_topic_conf.set_config()

# Verify topics list
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
assert_equal(len(member), 6)

# Verify tenant topics
res, status = tenant_topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
assert_equal(len(member), 3)
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
topic_conf.del_config(topic_arn2)
topic_conf.del_config(topic_arn3)
topic_conf.del_config(topic_arn4)
topic_conf.del_config(topic_arn5)
topic_conf.del_config(topic_arn6)
tenant_topic_conf.del_config(tenant_topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn2)
tenant_topic_conf.del_config(tenant_topic_arn3)


@attr('basic_test')
def test_ps_s3_list_topics():
""" test list topics"""

# Initialize connections, topic names and configurations
conn1 = connection()
tenant = 'kaboom1'
conn2 = connect_random_user(tenant)
bucket_name = gen_bucket_name()
topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
tenant_topic_name1 = tenant + "_" + topic_name1
tenant_topic_name2 = tenant + "_" + topic_name2
host = get_ip()
http_port = random.randint(10000, 20000)
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
zonegroup = get_config_zonegroup()

# Make sure there are no leftover topics
delete_all_topics(conn1, '', get_config_cluster())
delete_all_topics(conn2, tenant, get_config_cluster())

# Create s3 - v2 topics
topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
topic_arn3 = topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn1 = tenant_topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn2 = tenant_topic_conf.set_config()

# Verify topics list
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else [] # version 2
assert_equal(len(member), 3)

# Verify topics for tenant
res, status = tenant_topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
assert_equal(len(member), 2)
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
topic_conf.del_config(topic_arn2)
topic_conf.del_config(topic_arn3)
tenant_topic_conf.del_config(tenant_topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn2)

@attr('data_path_v2_test')
def test_ps_s3_list_topics_v1():
""" test list topics on v1"""
if get_config_cluster() == 'noname':
return SkipTest('realm is needed')

# Initialize connections and configurations
conn1 = connection()
tenant = 'kaboom1'
conn2 = connect_random_user(tenant)
bucket_name = gen_bucket_name()
topic_name1 = bucket_name + TOPIC_SUFFIX + '1'
topic_name2 = bucket_name + TOPIC_SUFFIX + '2'
topic_name3 = bucket_name + TOPIC_SUFFIX + '3'
tenant_topic_name1 = tenant + "_" + topic_name1
tenant_topic_name2 = tenant + "_" + topic_name2
host = get_ip()
http_port = random.randint(10000, 20000)
endpoint_address = 'http://' + host + ':' + str(http_port)
endpoint_args = 'push-endpoint=' + endpoint_address + '&persistent=true'
zonegroup = get_config_zonegroup()
conf_cluster = get_config_cluster()

# Make sure there are no leftover topics
delete_all_topics(conn1, '', conf_cluster)
delete_all_topics(conn2, tenant, conf_cluster)

# Make sure that we disable v2
zonegroup_modify_feature(enable=False, feature_name=zonegroup_feature_notification_v2)

# Create s3 - v1 topics
topic_conf = PSTopicS3(conn1, topic_name1, zonegroup, endpoint_args=endpoint_args)
topic_arn1 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name2, zonegroup, endpoint_args=endpoint_args)
topic_arn2 = topic_conf.set_config()
topic_conf = PSTopicS3(conn1, topic_name3, zonegroup, endpoint_args=endpoint_args)
topic_arn3 = topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name1, zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn1 = tenant_topic_conf.set_config()
tenant_topic_conf = PSTopicS3(conn2, tenant_topic_name2, zonegroup, endpoint_args=endpoint_args)
tenant_topic_arn2 = tenant_topic_conf.set_config()

# Verify topics list
try:
# Verify no tenant topics
res, status = topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
assert_equal(len(member), 3)

# Verify tenant topics
res, status = tenant_topic_conf.get_list()
assert_equal(status // 100, 2)
listTopicsResponse = res.get('ListTopicsResponse', {})
listTopicsResult = listTopicsResponse.get('ListTopicsResult', {})
topics = listTopicsResult.get('Topics', {})
member = topics['member'] if topics else []
assert_equal(len(member), 2)
finally:
# Cleanup created topics
topic_conf.del_config(topic_arn1)
topic_conf.del_config(topic_arn2)
topic_conf.del_config(topic_arn3)
tenant_topic_conf.del_config(tenant_topic_arn1)
tenant_topic_conf.del_config(tenant_topic_arn2)


@attr('basic_test')
def test_ps_s3_topic_permissions():
""" test s3 topic set/get/delete permissions """
Expand Down

0 comments on commit 9349823

Please sign in to comment.