Skip to content

Commit

Permalink
watch improvement (context manager, close method) #60 (#61)
Browse files Browse the repository at this point in the history
It solves #60
  • Loading branch information
hubo1016 authored and tomplus committed Feb 22, 2019
1 parent d57e9e9 commit f83dc57
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 24 deletions.
3 changes: 3 additions & 0 deletions examples/example2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ async def main():
w.stop()

print("Ended.")
# An explicit close is necessary to stop the stream
# or use async context manager like in example4.py
w.close()


if __name__ == '__main__':
Expand Down
14 changes: 8 additions & 6 deletions examples/example4.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,18 @@

async def watch_namespaces():
v1 = client.CoreV1Api()
async for event in watch.Watch().stream(v1.list_namespace):
etype, obj = event['type'], event['object']
print("{} namespace {}".format(etype, obj.metadata.name))
async with watch.Watch().stream(v1.list_namespace) as stream:
async for event in stream:
etype, obj = event['type'], event['object']
print("{} namespace {}".format(etype, obj.metadata.name))


async def watch_pods():
v1 = client.CoreV1Api()
async for event in watch.Watch().stream(v1.list_pod_for_all_namespaces):
evt, obj = event['type'], event['object']
print("{} pod {} in NS {}".format(evt, obj.metadata.name, obj.metadata.namespace))
async with watch.Watch().stream(v1.list_pod_for_all_namespaces) as stream:
async for event in stream:
evt, obj = event['type'], event['object']
print("{} pod {} in NS {}".format(evt, obj.metadata.name, obj.metadata.namespace))


def main():
Expand Down
20 changes: 18 additions & 2 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def __init__(self, return_type=None):
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0
self.resp = None

def stop(self):
self._stop = True
Expand Down Expand Up @@ -107,7 +108,11 @@ def __aiter__(self):
return self

async def __anext__(self):
return await self.next()
try:
return await self.next()
except:
self.close()
raise

async def next(self):

Expand Down Expand Up @@ -169,12 +174,23 @@ def stream(self, func, *args, **kwargs):
if should_stop:
watch.stop()
"""
self.close()
self._stop = False
self.return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False

self.func = partial(func, *args, **kwargs)
self.resp = None

return self

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self.close()

def close(self):
if self.resp is not None:
self.resp.release()
self.resp = None
37 changes: 21 additions & 16 deletions kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class WatchTest(TestCase):
async def test_watch_with_decode(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
fake_resp.release = Mock()
side_effects = [
{
"type": "ADDED",
Expand All @@ -47,22 +48,24 @@ async def test_watch_with_decode(self):

watch = kubernetes_asyncio.watch.Watch()
count = 0
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
self.assertEqual("ADDED", e['type'])
# make sure decoder worked and we got a model with the right name
self.assertEqual("test%d" % count, e['object'].metadata.name)
# make sure decoder worked and updated Watch.resource_version
self.assertEqual(e['object'].metadata.resource_version, str(count))
self.assertEqual(watch.resource_version, str(count))

# Stop the watch. This must not return the next event which would
# be an AssertionError exception.
count += 1
if count == len(side_effects) - 1:
watch.stop()
async with watch:
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
self.assertEqual("ADDED", e['type'])
# make sure decoder worked and we got a model with the right name
self.assertEqual("test%d" % count, e['object'].metadata.name)
# make sure decoder worked and updated Watch.resource_version
self.assertEqual(e['object'].metadata.resource_version, str(count))
self.assertEqual(watch.resource_version, str(count))

# Stop the watch. This must not return the next event which would
# be an AssertionError exception.
count += 1
if count == len(side_effects) - 1:
watch.stop()

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True, resource_version='123')
fake_resp.release.assert_called_once_with()

async def test_watch_k8s_empty_response(self):
"""Stop the iterator when the response is empty.
Expand Down Expand Up @@ -161,6 +164,7 @@ async def test_watch_with_exception(self):
async def test_watch_timeout(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
fake_resp.release = Mock()

mock_event = {"type": "ADDED",
"object": {"metadata": {"name": "test1555",
Expand All @@ -177,13 +181,14 @@ async def test_watch_timeout(self):
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

watch = kubernetes_asyncio.watch.Watch()
async for e in watch.stream(fake_api.get_namespaces): # noqa
pass
async with watch.stream(fake_api.get_namespaces) as stream:
async for e in stream: # noqa
pass

fake_api.get_namespaces.assert_has_calls(
[call(_preload_content=False, watch=True),
call(_preload_content=False, watch=True, resource_version='1555')])

fake_resp.release.assert_called_once_with()

if __name__ == '__main__':
import asynctest
Expand Down

0 comments on commit f83dc57

Please sign in to comment.