Skip to content

Commit

Permalink
Merge pull request #22 from olitheolix/oli-k8s-errors
Browse files Browse the repository at this point in the history
Stop the iterator for empty responses and do not process ERROR responses
  • Loading branch information
tomplus authored Jun 21, 2018
2 parents fb68f98 + ce3f4f1 commit 176f41d
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 36 deletions.
63 changes: 39 additions & 24 deletions kubernetes_asyncio/watch/watch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import json
import pydoc
from functools import partial
from types import SimpleNamespace

from kubernetes_asyncio import client

Expand All @@ -28,26 +29,13 @@
TYPE_LIST_SUFFIX = "List"


class SimpleNamespace:

def __init__(self, **kwargs):
self.__dict__.update(kwargs)


def _find_return_type(func):
for line in pydoc.getdoc(func).splitlines():
if line.startswith(PYDOC_RETURN_LABEL):
return line[len(PYDOC_RETURN_LABEL):].strip()
return ""


async def iter_resp_lines(resp):
line = await resp.content.readline()
if isinstance(line, bytes):
line = line.decode('utf8')
return line


class Stream(object):

def __init__(self, func, *args, **kwargs):
Expand All @@ -60,7 +48,6 @@ def __init__(self, return_type=None):
self._raw_return_type = return_type
self._stop = False
self._api_client = client.ApiClient()
self.resource_version = 0

def stop(self):
self._stop = True
Expand All @@ -69,18 +56,36 @@ def get_return_type(self, func):
if self._raw_return_type:
return self._raw_return_type
return_type = _find_return_type(func)

if return_type.endswith(TYPE_LIST_SUFFIX):
return return_type[:-len(TYPE_LIST_SUFFIX)]
return return_type

def unmarshal_event(self, data, return_type):
def unmarshal_event(self, data: str, response_type):
"""Return the K8s response `data` in JSON format.
"""
js = json.loads(data)

# Make a copy of the original object and save it under the
# `raw_object` key because we will replace the data under `object` with
# a Python native type shortly.
js['raw_object'] = js['object']
if return_type:
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version

# Something went wrong. A typical example would be that the user
# supplied a resource version that was too old. In that case K8s would
# not send a conventional ADDED/DELETED/... event but an error. Turn
# this error into a Python exception to save the user the hassle.
if js['type'].lower() == 'error':
return js

# If possible, compile the JSON response into a Python native response
# type, eg `V1Namespace` or `V1Pod`,`ExtensionsV1beta1Deployment`, ...
if response_type is not None:
js['object'] = self._api_client.deserialize(
response=SimpleNamespace(data=json.dumps(js['raw_object'])),
response_type=response_type
)
return js

def __aiter__(self):
Expand All @@ -90,15 +95,26 @@ async def __anext__(self):
return await self.next()

async def next(self):
# Set the response object to the user supplied function (eg
# `list_namespaced_pods`) if this is the first iteration.
if self.resp is None:
self.resp = await self.func()

# Abort at the current iteration if the user has called `stop` on this
# stream instance.
if self._stop:
raise StopAsyncIteration

ret = await iter_resp_lines(self.resp)
ret = self.unmarshal_event(ret, self.return_type)
return ret
# Fetch the next K8s response.
line = await self.resp.content.readline()
line = line.decode('utf8')

# Stop the iterator if K8s sends an empty response. This happens when
# eg the supplied timeout has expired.
if line == '':
raise StopAsyncIteration

return self.unmarshal_event(line, self.return_type)

def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
Expand Down Expand Up @@ -129,7 +145,6 @@ def stream(self, func, *args, **kwargs):
self.return_type = self.get_return_type(func)
kwargs['watch'] = True
kwargs['_preload_content'] = False
timeouts = ('timeout_seconds' in kwargs)

self.func = partial(func, *args, **kwargs)
self.resp = None
Expand Down
87 changes: 75 additions & 12 deletions kubernetes_asyncio/watch/watch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from asynctest import CoroutineMock, Mock, TestCase, patch
import json

from asynctest import CoroutineMock, Mock, TestCase

import kubernetes_asyncio
from kubernetes_asyncio.watch import Watch
Expand All @@ -23,30 +25,68 @@ class WatchTest(TestCase):
async def test_watch_with_decode(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
fake_resp.content.readline.side_effect = [
'{"type": "ADDED", "object": {"metadata": {"name": "test1"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test2"},"spec": {}, "status": {}}}',
'{"type": "ADDED", "object": {"metadata": {"name": "test3"},"spec": {}, "status": {}}}',
'should_not_happened']
side_effects = [
{
"type": "ADDED",
"object": {
"metadata": {"name": "test{}".format(uid)},
"spec": {}, "status": {}
}
}
for uid in range(3)
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
side_effects.extend([AssertionError('Should not have been called')])
fake_resp.content.readline.side_effect = side_effects

fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

watch = kubernetes_asyncio.watch.Watch()
count = 1
async for e in watch.stream(fake_api.get_namespaces):
count = 0
async for e in watch.stream(fake_api.get_namespaces, resource_version='123'):
self.assertEqual("ADDED", e['type'])
# make sure decoder worked and we got a model with the right name
self.assertEqual("test%d" % count, e['object'].metadata.name)

# Stop the watch. This must not return the next event which would
# be an AssertionError exception.
count += 1
# make sure we can stop the watch and the last event with won't be
# returned
if count == 4:
if count == len(side_effects) - 1:
watch.stop()

fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
_preload_content=False, watch=True, resource_version='123')

async def test_watch_k8s_empty_response(self):
"""Stop the iterator when the response is empty.
This typically happens when the user supplied timeout expires.
"""
# Mock the readline return value to first return a valid response
# followed by an empty response.
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
side_effects = [
{"type": "ADDED", "object": {"metadata": {"name": "test0"}, "spec": {}, "status": {}}},
{"type": "ADDED", "object": {"metadata": {"name": "test1"}, "spec": {}, "status": {}}},
]
side_effects = [json.dumps(_).encode('utf8') for _ in side_effects]
fake_resp.content.readline.side_effect = side_effects + [b'']

# Fake the K8s resource object to watch.
fake_api = Mock()
fake_api.get_namespaces = CoroutineMock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'

# Iteration must cease after all valid responses were received.
watch = kubernetes_asyncio.watch.Watch()
cnt = 0
async for _ in watch.stream(fake_api.get_namespaces):
cnt += 1
self.assertEqual(cnt, len(side_effects))

def test_unmarshal_with_float_object(self):
w = Watch()
Expand All @@ -64,6 +104,29 @@ def test_unmarshal_with_no_return_type(self):
self.assertEqual(["test1"], event['object'])
self.assertEqual(["test1"], event['raw_object'])

async def test_unmarshall_k8s_error_response(self):
"""Never parse messages of type ERROR.
This test uses an actually recorded error, in this case for an outdated
resource version.
"""
# An actual error response sent by K8s during testing.
k8s_err = {
'type': 'ERROR',
'object': {
'kind': 'Status', 'apiVersion': 'v1', 'metadata': {},
'status': 'Failure',
'message': 'too old resource version: 1 (8146471)',
'reason': 'Gone', 'code': 410
}
}

ret = Watch().unmarshal_event(json.dumps(k8s_err), None)
self.assertEqual(ret['type'], k8s_err['type'])
self.assertEqual(ret['object'], k8s_err['object'])
self.assertEqual(ret['object'], k8s_err['object'])

async def test_watch_with_exception(self):
fake_resp = CoroutineMock()
fake_resp.content.readline = CoroutineMock()
Expand Down

0 comments on commit 176f41d

Please sign in to comment.