-
Notifications
You must be signed in to change notification settings - Fork 207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
When I publish a data to pubsub topic, I have a memory leak on kubernetes pod. #395
Comments
@fodrh1201 Thanks for the report. Would it be possible to provide a minimal runnable self-contained code sample? We are not aware of any publisher memory leaks, thus we would first like to exclude any code outside of the client as a possible reason. BTW, one thing that caught my eye is the mutable default argument to In the code sample the callback is actually instantiated only once, but might nevertheless be worth checking out if that's also the case in the actual code. |
This is python code. import json
from google.cloud import pubsub_v1
import asyncio
import logging
from datetime import datetime
class Callback:
def __init__(self, pod_name, is_work_at_stop=False, preprocess=[]):
self.is_work_at_stop = is_work_at_stop
self.pod_name = pod_name
self.preprocess = preprocess
def __call__(self, data):
for preprocess in self.preprocess:
data = preprocess(data)
return data
class PublishToPubsubCallback(Callback):
def __init__(self, pod_name, project_id, topic_id, preprocess=[]):
super(PublishToPubsubCallback, self).__init__(pod_name, preprocess=preprocess)
self.project_id = project_id
self.topic_id = topic_id
self.publisher = pubsub_v1.PublisherClient()
self.topic_path = self.publisher.topic_path(project_id, topic_id)
# @profile
def __call__(self, data):
data = super().__call__(data)
pub_data = json.dumps(data).encode('utf-8')
self.publisher.publish(self.topic_path, pub_data,
origin=self.pod_name, data_type=data['data_type'],
symbol=data['symbol'], client_id=data['client_id'],
timestamp=str(data['timestamp']),
market_type=data['market_type'])
data = {
'data_type': 'orderbook',
'client_id': 'binance',
'symbol': 'BTC/USDT',
'timestamp': '1619494646034',
'market_type': 'spot',
'asks': [[53482.36, 2e-06], [53493.34, 0.025379], [53493.64, 0.01412]],
'bids': [[53482.35, 2.882399], [53479.18, 0.000373], [53477.2, 0.406]]
}
callback = PublishToPubsubCallback('test', 'bx-datafeed', 'test')
async def run():
while True:
callback(data)
await asyncio.sleep(0.1)
loop = asyncio.get_event_loop()
loop.run_until_complete(run()) this is the result of above code. my docker file is below.
|
I ran a slightly modified version of the example locally and measured memory consumption. I put the main code into the It appears that the memory is indeed slowly but surely leaking. Can't yet say why, will investigate further. A possible workaround is to modify the code to run in a subprocess (for a bounded number of iterations), and creating a new subprocess when the previous one finishes. Not ideal, of course, but at least the memory will be released at regular intervals. |
It turns out the issue is reproducible even with a straightforward example that repeatedly publishes a trivial message in a loop. import time
from google.cloud import pubsub_v1
PROJECT_ID = "set-project-id"
TOPIC_ID = "set-topic-id"
def main():
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
print("Starting the main loop...")
for i in range(15_000):
if not i % 1000:
print(f"{i: 5}: publishing a message")
publisher.publish(topic_path, b"x")
time.sleep(0.01)
print("Main loop complete!")
if __name__ == "__main__":
main() Using
It appears that the these allocations are initiated by something outside of Python , maybe gRPC? @lidizheng Do you know if there are currently any confirmed or suspected leaks in gRPC? Do you have any free cycles to take a look in the near future? Is there anything else I can provide on top of the above minimal example? Versions:
|
@plamut I will take a look today or tomorrow. |
@lidizheng This is the The memory usage initially jumps to almost 70 MiB and then gradually increases to almost 79 MiB. I'm attaching the raw profiling data below (renamed the Update: diff --git google/pubsub_v1/services/publisher/client.py google/pubsub_v1/services/publisher/client.py
index f74e85a..d560307 100644
--- google/pubsub_v1/services/publisher/client.py
+++ google/pubsub_v1/services/publisher/client.py
@@ -605,7 +605,8 @@ class PublisherClient(metaclass=PublisherClientMeta):
)
# Send the request.
- response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
+ # response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,)
+ response = pubsub.PublishResponse(message_ids=["123"] * len(request.messages))
# Done; return the response.
return response It might be something in the hand-written layer that leaks memory, perhaps some rouge references to all the publish threads created when publishing each batch of messages... quite interesting. |
Found a possible fix! If the client commit thread is marked as a daemon, the leak disappears. Can't say why, though. The patch: diff --git google/cloud/pubsub_v1/publisher/_batch/thread.py google/cloud/pubsub_v1/publisher/_batch/thread.py
index 36dd3b9..d0d648e 100644
--- google/cloud/pubsub_v1/publisher/_batch/thread.py
+++ google/cloud/pubsub_v1/publisher/_batch/thread.py
@@ -207,9 +207,10 @@ class Batch(base.Batch):
def _start_commit_thread(self):
"""Start a new thread to actually handle the commit."""
-
+ # NOTE: If the thread is *not* a daemon, a memory leak exists for some reason...
+ # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
commit_thread = threading.Thread(
- name="Thread-CommitBatchPublisher", target=self._commit
+ name="Thread-CommitBatchPublisher", target=self._commit, daemon=True
)
commit_thread.start()
diff --git google/cloud/pubsub_v1/publisher/client.py google/cloud/pubsub_v1/publisher/client.py
index 7e6801d..330fecb 100644
--- google/cloud/pubsub_v1/publisher/client.py
+++ google/cloud/pubsub_v1/publisher/client.py
@@ -375,8 +375,12 @@ class Client(object):
def _start_commit_thread(self):
"""Start a new thread to actually wait and commit the sequencers."""
+ # NOTE: If the thread is *not* a daemon, a memory leak exists for some reason...
+ # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303
self._commit_thread = threading.Thread(
- name="Thread-PubSubBatchCommitter", target=self._wait_and_commit_sequencers
+ name="Thread-PubSubBatchCommitter",
+ target=self._wait_and_commit_sequencers,
+ daemon=True,
)
self._commit_thread.start()
|
For additional context, I found this CPython issue and the
The issue has been closed as a duplicate of another issue, which is currently still open, and reportedly affects Python 3.8+. I also did a test run with Python 3.6 and the memory usage was more or less constant - it appears that the linked |
@plamut Thank you!! I solved the problem thanks to you. |
Each time a public request is made, the memory in the pod of Kubernetes increases. Removing the publishing part does not increase memory.
The pod is continuously restarted due to OOM.
Environment details
google-cloud-pubsub
version: 2.4.1Code example
The text was updated successfully, but these errors were encountered: