Skip to content

Commit

Permalink
iteration after the review, added more precise definition of cache be…
Browse files Browse the repository at this point in the history
…ing in-sync
  • Loading branch information
eliax1996 committed Jun 5, 2024
1 parent 7d7fdf6 commit 121bf3b
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 16 deletions.
46 changes: 33 additions & 13 deletions karapace/kafka_rest_apis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,10 @@ def __init__(
self.config = config
self.kafka_timeout = kafka_timeout
self.serializer = serializer
self._cluster_metadata = None
self._cluster_metadata: _ClusterMetadata = self._empty_cluster_metadata_cache()
self._cluster_metadata_complete = False
# birth of all the metadata (when the request was requiring all the metadata available in the cluster)
self._global_metadata_birth: float | None = None
self._global_metadata_birth: float = 0.0 # set to this value will always require a refresh at the first call.
self._cluster_metadata_topic_birth: dict[str, float] = {}
self.metadata_max_age = self.config["admin_metadata_max_age"]
self.admin_client = None
Expand Down Expand Up @@ -634,25 +634,19 @@ async def get_topic_config(self, topic: str) -> dict:
return self.admin_client.get_topic_config(topic)

def is_global_metadata_old(self) -> bool:
return (
self._global_metadata_birth is None or (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age
)
return (time.monotonic() - self._global_metadata_birth) > self.metadata_max_age

def is_metadata_of_topics_old(self, topics: list[str]) -> bool:
# Return from metadata only if all queried topics have cached metadata

if self._cluster_metadata_topic_birth is None:
return True

are_all_topic_queried_at_least_once = all(topic in self._cluster_metadata_topic_birth for topic in topics)

if not are_all_topic_queried_at_least_once:
return True

oldest_requested_topic_udpate_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics)
oldest_requested_topic_update_timestamp = min(self._cluster_metadata_topic_birth[topic] for topic in topics)
return (
are_all_topic_queried_at_least_once
and (time.monotonic() - oldest_requested_topic_udpate_timestamp) > self.metadata_max_age
and (time.monotonic() - oldest_requested_topic_update_timestamp) > self.metadata_max_age
)

def _update_all_metadata(self) -> _ClusterMetadata:
Expand Down Expand Up @@ -685,17 +679,43 @@ def _update_metadata_for_topics(self, topics: list[str]) -> _ClusterMetadata:
if self._cluster_metadata is None:
self._cluster_metadata = self._empty_cluster_metadata_cache()

# we need to refresh if at least 1 broker isn't present in the current metadata
need_refresh = not all(broker in self._cluster_metadata["brokers"] for broker in metadata["brokers"])

for topic in metadata["topics"]:
# or if there is a new topic
need_refresh = (
need_refresh
or (topic not in self._cluster_metadata["topics"])
# or if a topic has new/different data.
# nb: equality its valid since the _ClusterMetadata object its structurally
# composed only of primitives lists and dicts
or (self._cluster_metadata["topics"][topic] != metadata["topics"][topic])
)
self._cluster_metadata_topic_birth[topic] = metadata_birth
self._cluster_metadata["topics"][topic] = metadata["topics"][topic]

self._cluster_metadata_complete = False
if need_refresh:
# we don't need to reason about expiration time since at each request
# for the global metadata it's checked before performing the request,
# so we need to guard only for new missing pieces of info
self._cluster_metadata_complete = False
else:
# for malicious actors we may also cache that a certain topic (that do not exist) it has been queried
# and for a while the reply isn't present. not implementing this now since its an additional complexity
# that may be unrequired. Leaving a comment and a warning there, if its present often in the logs the feature
# may be needed.
log.warning(
"Requested metadata for topics %s but the reply didn't triggered a cache invalidation. "
"Data not present on server side",
topics,
)
return metadata

async def cluster_metadata(self, topics: list[str] | None = None) -> _ClusterMetadata:
async with self.admin_lock:
try:
if topics is None:
if topics is None or len(topics) == 0:
metadata = self._update_all_metadata()
else:
metadata = self._update_metadata_for_topics(topics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,70 @@ def user_rest_proxy(max_age_metadata: int = 5) -> UserRestProxy:
"partitions": [
{
"partition": 0,
"leader": 10,
"leader": 69,
"replicas": [
{"broker": 69, "leader": True, "in_sync": True},
{"broker": 67, "leader": False, "in_sync": True},
],
}
]
}
},
"brokers": [69, 67],
}

TOPIC_REQUEST_WITH_CHANGED_REPLICA = {
"topics": {
"topic_a": {
"partitions": [
{
"partition": 0,
"leader": 69,
"replicas": [
{"broker": 10, "leader": True, "in_sync": True},
{"broker": 69, "leader": True, "in_sync": True},
{"broker": 68, "leader": False, "in_sync": True},
],
}
]
}
},
"brokers": [10],
"brokers": [69, 68],
}


TOPIC_REQUEST_WITH_NEW_BROKER = {
"topics": {
"topic_a": {
"partitions": [
{
"partition": 0,
"leader": 69,
"replicas": [
{"broker": 69, "leader": True, "in_sync": True},
{"broker": 67, "leader": False, "in_sync": True},
],
}
]
}
},
"brokers": [69, 67, 101300],
}

TOPIC_REQUEST_WITH_NEW_TOPIC = {
"topics": {
"mistery_topic": {
"partitions": [
{
"partition": 0,
"leader": 68,
"replicas": [
{"broker": 68, "leader": True, "in_sync": True},
],
}
]
}
},
"brokers": [68],
}

ALL_TOPIC_REQUEST = {
Expand Down Expand Up @@ -112,6 +167,15 @@ async def test_cache_is_evicted_after_expiration_global_initially() -> None:
mocked_cluster_metadata.assert_called_once_with(None) # "initially the metadata are always old"


async def test_no_topic_means_all_metadata() -> None:
proxy = user_rest_proxy()
with patch(
"karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=EMPTY_REPLY
) as mocked_cluster_metadata:
await proxy.cluster_metadata([])
mocked_cluster_metadata.assert_called_once_with(None)


async def test_cache_is_evicted_after_expiration_global() -> None:
proxy = user_rest_proxy(max_age_metadata=10)
proxy._global_metadata_birth = 0
Expand Down Expand Up @@ -251,3 +315,74 @@ async def test_update_topic_cache_do_not_evict_all_the_global_cache() -> None:
assert (
mocked_cluster_metadata.call_count == 1
), "we should call the server since the previous time of caching for the topic_a was 0"


async def test_update_local_cache_does_not_evict_all_the_global_cache_if_no_new_data() -> None:
proxy = user_rest_proxy(max_age_metadata=10)
proxy._global_metadata_birth = 0
proxy._cluster_metadata_complete = True
proxy._cluster_metadata = ALL_TOPIC_REQUEST
proxy._cluster_metadata_topic_birth = {"topic_a": 0, "topic_b": 200, "__consumer_offsets": 200}

with patch(
"karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST
) as mocked_cluster_metadata:
with patch("time.monotonic", return_value=208):
res = await proxy.cluster_metadata(["topic_a"])

assert res == TOPIC_REQUEST

assert proxy._cluster_metadata_topic_birth == {"topic_a": 208, "topic_b": 200, "__consumer_offsets": 200}

expected_metadata = copy.deepcopy(ALL_TOPIC_REQUEST)
expected_metadata["topics"]["topic_a"] = TOPIC_REQUEST["topics"]["topic_a"]
assert proxy._cluster_metadata == expected_metadata
assert (
proxy._cluster_metadata_complete
), "since wasn't containing new brokers and no new topics the metadata its completed"

assert (
mocked_cluster_metadata.call_count == 1
), "we should call the server since the previous time of caching for the topic_a was 0"


async def test_update_local_cache_not_evict_all_the_global_cache_if_changed_replica_data() -> None:
proxy = user_rest_proxy(max_age_metadata=10)
proxy._global_metadata_birth = 0
proxy._cluster_metadata_complete = True
proxy._cluster_metadata = ALL_TOPIC_REQUEST
proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200}

with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_CHANGED_REPLICA):
with patch("time.monotonic", return_value=500):
await proxy.cluster_metadata(["topic_a"])

assert not proxy._cluster_metadata_complete, "new replica data incoming, should update the global metadata next!"


async def test_update_local_cache_not_evict_all_the_global_cache_if_new_topic_data() -> None:
proxy = user_rest_proxy(max_age_metadata=10)
proxy._global_metadata_birth = 0
proxy._cluster_metadata_complete = True
proxy._cluster_metadata = ALL_TOPIC_REQUEST
proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200}

with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_NEW_TOPIC):
with patch("time.monotonic", return_value=200):
await proxy.cluster_metadata(["mistery_topic"])

assert not proxy._cluster_metadata_complete, "new topic data incoming, should update the global metadata next!"


async def test_update_local_cache_not_evict_all_the_global_cache_if_new_broker_data() -> None:
proxy = user_rest_proxy(max_age_metadata=10)
proxy._global_metadata_birth = 0
proxy._cluster_metadata_complete = True
proxy._cluster_metadata = ALL_TOPIC_REQUEST
proxy._cluster_metadata_topic_birth = {"topic_a": 200, "topic_b": 200, "__consumer_offsets": 200}

with patch("karapace.kafka.admin.KafkaAdminClient.cluster_metadata", return_value=TOPIC_REQUEST_WITH_NEW_BROKER):
with patch("time.monotonic", return_value=500):
await proxy.cluster_metadata(["topic_a"])

assert not proxy._cluster_metadata_complete, "new broker data incoming, should update the global metadata next!"

0 comments on commit 121bf3b

Please sign in to comment.