Skip to content

Commit

Permalink
Add resourceVersion to list result
Browse files Browse the repository at this point in the history
  • Loading branch information
XeCycle committed Jan 7, 2025
1 parent 9aff99e commit eba6e1c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 15 deletions.
6 changes: 3 additions & 3 deletions lightkube/core/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from ..config.kubeconfig import SingleConfig, KubeConfig
from ..core import resource as r
from .generic_client import GenericAsyncClient
from .generic_client import GenericAsyncClient, GenericAsyncIterator
from ..core.exceptions import ConditionError, ObjectDeleted
from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise
from .internal_resources import core_v1
Expand Down Expand Up @@ -234,7 +234,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> AsyncIterable[GlobalResource]: ...
) -> GenericAsyncIterator[GlobalResource]: ...

@overload
def list(
Expand All @@ -245,7 +245,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> AsyncIterable[NamespacedResource]: ...
) -> GenericAsyncIterator[NamespacedResource]: ...

def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None):
"""Return an iterator of objects matching the selection criteria.
Expand Down
6 changes: 3 additions & 3 deletions lightkube/core/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ..config.kubeconfig import SingleConfig, KubeConfig
from .. import operators
from ..core import resource as r
from .generic_client import GenericSyncClient
from .generic_client import GenericSyncClient, GenericIterator
from ..core.exceptions import ConditionError, ObjectDeleted
from ..types import OnErrorHandler, PatchType, CascadeType, on_error_raise
from .internal_resources import core_v1
Expand Down Expand Up @@ -237,7 +237,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> Iterator[GlobalResource]: ...
) -> GenericIterator[GlobalResource]: ...

@overload
def list(
Expand All @@ -248,7 +248,7 @@ def list(
chunk_size: int = None,
labels: LabelSelector = None,
fields: FieldSelector = None,
) -> Iterator[NamespacedResource]: ...
) -> GenericIterator[NamespacedResource]: ...

def list(self, res, *, namespace=None, chunk_size=None, labels=None, fields=None):
"""Return an iterator of objects matching the selection criteria.
Expand Down
90 changes: 81 additions & 9 deletions lightkube/core/generic_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
import time
from typing import Type, Any, Dict, Union
from typing import (
AsyncIterable,
Type,
Any,
Dict,
Union,
Iterator,
AsyncIterator,
Tuple,
TypeVar,
Iterable,
)
import dataclasses
from dataclasses import dataclass
import json
Expand Down Expand Up @@ -72,6 +83,52 @@ def process_one_line(self, line):
return tp, self._convert(obj, lazy=self._lazy)


class ErrorWhenNotReady:
def __init__(self, message: str) -> None:
self.message = message

def __set_name__(self, owner, name: str) -> None:
self.name = name

def __get__(self, instance, owner=None):
raise Exception(self.name, self.message)


def error_when_not_ready(message: str) -> Any:
"Returns Any to silence type checkers"
return ErrorWhenNotReady(message)


T = TypeVar("T")


class GenericIterator(Iterable[T]):
resourceVersion: str
resourceVersion = error_when_not_ready("only available after iteration started")

def __init__(self, inner_iter: Iterator[Tuple[str, Iterator[T]]]) -> None:
self.inner_iter = inner_iter

def __iter__(self) -> Iterator[T]:
for rv, chunk in self.inner_iter:
self.resourceVersion = rv
yield from chunk


class GenericAsyncIterator(AsyncIterable[T]):
resourceVersion: str
resourceVersion = error_when_not_ready("only available after iteration started")

def __init__(self, inner_iter: AsyncIterator[Tuple[str, Iterator[T]]]) -> None:
self.inner_iter = inner_iter

async def __aiter__(self) -> AsyncIterator[T]:
async for rv, chunk in self.inner_iter:
self.resourceVersion = rv
for item in chunk:
yield item


class GenericClient:
AdapterClient = staticmethod(client_adapter.Client)

Expand Down Expand Up @@ -260,7 +317,15 @@ def handle_response(self, method, resp, br):
br.params["continue"] = data["metadata"]["continue"]
else:
cont = False
return cont, (self.convert_to_resource(res, obj) for obj in data["items"])
try:
rv = data["metadata"]["resourceVersion"]
except KeyError:
rv = None
return (
cont,
rv,
(self.convert_to_resource(res, obj) for obj in data["items"]),
)
else:
if res is not None:
return self.convert_to_resource(res, data)
Expand Down Expand Up @@ -310,13 +375,16 @@ def request(
resp = self.send(req)
return self.handle_response(method, resp, br)

def list(self, br: BasicRequest) -> Any:
def list_chunks(self, br: BasicRequest) -> Iterator[Tuple[str, Iterator]]:
cont = True
while cont:
req = self.build_adapter_request(br)
resp = self.send(req)
cont, chunk = self.handle_response("list", resp, br)
yield from chunk
cont, rv, chunk = self.handle_response("list", resp, br)
yield rv, chunk

def list(self, br: BasicRequest) -> GenericIterator:
return GenericIterator(self.list_chunks(br))


class GenericAsyncClient(GenericClient):
Expand Down Expand Up @@ -367,14 +435,18 @@ async def request(
resp = await self.send(req)
return self.handle_response(method, resp, br)

async def list(self, br: BasicRequest) -> Any:
async def list_chunks(
self, br: BasicRequest
) -> AsyncIterator[Tuple[str, Iterator]]:
cont = True
while cont:
req = self.build_adapter_request(br)
resp = await self.send(req)
cont, chunk = self.handle_response("list", resp, br)
for item in chunk:
yield item
cont, rv, chunk = self.handle_response("list", resp, br)
yield rv, chunk

def list(self, br: BasicRequest) -> GenericAsyncIterator:
return GenericAsyncIterator(self.list_chunks(br))

async def close(self):
await self._client.aclose()

0 comments on commit eba6e1c

Please sign in to comment.