diff --git a/kubernetes_asyncio/watch/watch.py b/kubernetes_asyncio/watch/watch.py index 6b4431065..75ef5bf95 100644 --- a/kubernetes_asyncio/watch/watch.py +++ b/kubernetes_asyncio/watch/watch.py @@ -15,6 +15,7 @@ import json import pydoc from functools import partial +from types import SimpleNamespace from kubernetes_asyncio import client @@ -28,12 +29,6 @@ TYPE_LIST_SUFFIX = "List" -class SimpleNamespace: - - def __init__(self, **kwargs): - self.__dict__.update(kwargs) - - def _find_return_type(func): for line in pydoc.getdoc(func).splitlines(): if line.startswith(PYDOC_RETURN_LABEL): @@ -41,13 +36,6 @@ def _find_return_type(func): return "" -async def iter_resp_lines(resp): - line = await resp.content.readline() - if isinstance(line, bytes): - line = line.decode('utf8') - return line - - class Stream(object): def __init__(self, func, *args, **kwargs): @@ -60,7 +48,6 @@ def __init__(self, return_type=None): self._raw_return_type = return_type self._stop = False self._api_client = client.ApiClient() - self.resource_version = 0 def stop(self): self._stop = True @@ -69,18 +56,36 @@ def get_return_type(self, func): if self._raw_return_type: return self._raw_return_type return_type = _find_return_type(func) + if return_type.endswith(TYPE_LIST_SUFFIX): return return_type[:-len(TYPE_LIST_SUFFIX)] return return_type - def unmarshal_event(self, data, return_type): + def unmarshal_event(self, data: str, response_type): + """Return the K8s response `data` in JSON format. + + """ js = json.loads(data) + + # Make a copy of the original object and save it under the + # `raw_object` key because we will replace the data under `object` with + # a Python native type shortly. js['raw_object'] = js['object'] - if return_type: - obj = SimpleNamespace(data=json.dumps(js['raw_object'])) - js['object'] = self._api_client.deserialize(obj, return_type) - if hasattr(js['object'], 'metadata'): - self.resource_version = js['object'].metadata.resource_version + + # Something went wrong. A typical example would be that the user + # supplied a resource version that was too old. In that case K8s would + # not send a conventional ADDED/DELETED/... event but an error. Turn + # this error into a Python exception to save the user the hassle. + if js['type'].lower() == 'error': + return js + + # If possible, compile the JSON response into a Python native response + # type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ... + if response_type is not None: + js['object'] = self._api_client.deserialize( + response=SimpleNamespace(data=json.dumps(js['raw_object'])), + response_type=response_type + ) return js def __aiter__(self): @@ -90,15 +95,26 @@ async def __anext__(self): return await self.next() async def next(self): + # Set the response object to the user supplied function (eg + # `list_namespaced_pods`) if this is the first iteration. if self.resp is None: self.resp = await self.func() + # Abort at the current iteration if the user has called `stop` on this + # stream instance. if self._stop: raise StopAsyncIteration - ret = await iter_resp_lines(self.resp) - ret = self.unmarshal_event(ret, self.return_type) - return ret + # Fetch the next K8s response. + line = await self.resp.content.readline() + line = line.decode('utf8') + + # Stop the iterator if K8s sends an empty response. This happens when + # eg the supplied timeout has expired. + if line == '': + raise StopAsyncIteration + + return self.unmarshal_event(line, self.return_type) def stream(self, func, *args, **kwargs): """Watch an API resource and stream the result back via a generator. @@ -129,7 +145,6 @@ def stream(self, func, *args, **kwargs): self.return_type = self.get_return_type(func) kwargs['watch'] = True kwargs['_preload_content'] = False - timeouts = ('timeout_seconds' in kwargs) self.func = partial(func, *args, **kwargs) self.resp = None diff --git a/kubernetes_asyncio/watch/watch_test.py b/kubernetes_asyncio/watch/watch_test.py index 3b78b4897..b0a53acbb 100644 --- a/kubernetes_asyncio/watch/watch_test.py +++ b/kubernetes_asyncio/watch/watch_test.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from asynctest import CoroutineMock, Mock, TestCase, patch +import json + +from asynctest import CoroutineMock, Mock, TestCase import kubernetes_asyncio from kubernetes_asyncio.watch import Watch @@ -23,30 +25,68 @@ class WatchTest(TestCase): async def test_watch_with_decode(self): fake_resp = CoroutineMock() fake_resp.content.readline = CoroutineMock() - fake_resp.content.readline.side_effect = [ - '{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}', - '{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}', - '{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}', - 'should_not_happened'] + side_effects = [ + { + "type": "ADDED", + "object": { + "metadata": {"name": "test{}".format(uid)}, + "spec": {}, "status": {} + } + } + for uid in range(3) + ] + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + side_effects.extend([AssertionError('Should not have been called')]) + fake_resp.content.readline.side_effect = side_effects fake_api = Mock() fake_api.get_namespaces = CoroutineMock(return_value=fake_resp) fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' watch = kubernetes_asyncio.watch.Watch() - count = 1 - async for e in watch.stream(fake_api.get_namespaces): + count = 0 + async for e in watch.stream(fake_api.get_namespaces, resource_version='123'): self.assertEqual("ADDED", e['type']) # make sure decoder worked and we got a model with the right name self.assertEqual("test%d" % count, e['object'].metadata.name) + + # Stop the watch. This must not return the next event which would + # be an AssertionError exception. count += 1 - # make sure we can stop the watch and the last event with won't be - # returned - if count == 4: + if count == len(side_effects) - 1: watch.stop() fake_api.get_namespaces.assert_called_once_with( - _preload_content=False, watch=True) + _preload_content=False, watch=True, resource_version='123') + + async def test_watch_k8s_empty_response(self): + """Stop the iterator when the response is empty. + + This typically happens when the user supplied timeout expires. + + """ + # Mock the readline return value to first return a valid response + # followed by an empty response. + fake_resp = CoroutineMock() + fake_resp.content.readline = CoroutineMock() + side_effects = [ + {"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}}, + {"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}}, + ] + side_effects = [json.dumps(_).encode('utf8') for _ in side_effects] + fake_resp.content.readline.side_effect = side_effects + [b''] + + # Fake the K8s resource object to watch. + fake_api = Mock() + fake_api.get_namespaces = CoroutineMock(return_value=fake_resp) + fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList' + + # Iteration must cease after all valid responses were received. + watch = kubernetes_asyncio.watch.Watch() + cnt = 0 + async for _ in watch.stream(fake_api.get_namespaces): + cnt += 1 + self.assertEqual(cnt, len(side_effects)) def test_unmarshal_with_float_object(self): w = Watch() @@ -64,6 +104,29 @@ def test_unmarshal_with_no_return_type(self): self.assertEqual(["test1"], event['object']) self.assertEqual(["test1"], event['raw_object']) + async def test_unmarshall_k8s_error_response(self): + """Never parse messages of type ERROR. + + This test uses an actually recorded error, in this case for an outdated + resource version. + + """ + # An actual error response sent by K8s during testing. + k8s_err = { + 'type': 'ERROR', + 'object': { + 'kind': 'Status', 'apiVersion': 'v1', 'metadata': {}, + 'status': 'Failure', + 'message': 'too old resource version: 1 (8146471)', + 'reason': 'Gone', 'code': 410 + } + } + + ret = Watch().unmarshal_event(json.dumps(k8s_err), None) + self.assertEqual(ret['type'], k8s_err['type']) + self.assertEqual(ret['object'], k8s_err['object']) + self.assertEqual(ret['object'], k8s_err['object']) + async def test_watch_with_exception(self): fake_resp = CoroutineMock() fake_resp.content.readline = CoroutineMock()