Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pod exec enhancements #328

Merged
merged 4 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 130 additions & 35 deletions examples/pod_exec.py
Original file line number Diff line number Diff line change
@@ -1,52 +1,147 @@
import asyncio

from kubernetes_asyncio import client, config
from aiohttp.http import WSMsgType

from kubernetes_asyncio import client, config, utils
from kubernetes_asyncio.client.api_client import ApiClient
from kubernetes_asyncio.stream import WsApiClient
from kubernetes_asyncio.stream.ws_client import (
ERROR_CHANNEL, STDERR_CHANNEL, STDOUT_CHANNEL,
)

BUSYBOX_POD = "busybox-test"

async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
await config.load_kube_config()

v1 = client.CoreV1Api()
async def find_busybox_pod():
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
ret = await v1.list_pod_for_all_namespaces()
for i in ret.items:
if i.metadata.namespace == 'default' and i.metadata.name == BUSYBOX_POD:
print(f"Found busybox pod: {i.metadata.name}")
return i.metadata.name
return None

print("Try to find a pod with busybox (name busybox*) ...")
ret = await v1.list_pod_for_all_namespaces()

for i in ret.items:
if i.metadata.name.startswith("busybox"):
pod = i.metadata.name
namespace = i.metadata.namespace
print("Buxy box", pod, "namespace", namespace)
break
else:
print("Busybox not found !")
return
async def create_busybox_pod():
print(f"Pod {BUSYBOX_POD} does not exist. Creating it...")
manifest = {
'apiVersion': 'v1',
'kind': 'Pod',
'metadata': {
'name': BUSYBOX_POD,
},
'spec': {
'containers': [{
'image': 'busybox',
'name': 'sleep',
"args": [
"/bin/sh",
"-c",
"while true; do date; sleep 5; done"
]
}]
}
}
async with ApiClient() as api:
objects = await utils.create_from_dict(api, manifest, namespace="default")
pod = objects[0]
print(f"Created pod {pod.metadata.name}.")
return pod.metadata.name

v1_ws = client.CoreV1Api(api_client=WsApiClient())

exec_command = [
"/bin/sh",
"-c",
"echo This message goes to stderr >&2; echo This message goes to stdout",
]
async def wait_busybox_pod_ready():
print(f"Waiting pod {BUSYBOX_POD} to be ready.")
async with ApiClient() as api:
v1 = client.CoreV1Api(api)
while True:
ret = await v1.read_namespaced_pod(name=BUSYBOX_POD, namespace="default")
if ret.status.phase != 'Pending':
break
await asyncio.sleep(1)

resp = v1_ws.connect_get_namespaced_pod_exec(
pod,
namespace,
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)

ret = await resp
async def main():
# Configs can be set in Configuration class directly or using helper
# utility. If no argument provided, the config will be loaded from
# default location.
await config.load_kube_config()

pod = await find_busybox_pod()
if not pod:
pod = await create_busybox_pod()
await wait_busybox_pod_ready()

print("Response: ", ret)
# Execute a command in a pod non-interactively, and display its output
print("-------------")
async with WsApiClient() as ws_api:
v1_ws = client.CoreV1Api(api_client=ws_api)
exec_command = [
"/bin/sh",
"-c",
"echo This message goes to stderr >&2; echo This message goes to stdout",
]
ret = await v1_ws.connect_get_namespaced_pod_exec(
pod,
"default",
command=exec_command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
)
print(f"Response: {ret}")

# Execute a command interactively. If _preload_content=False is passed to
olivier-matz-6wind marked this conversation as resolved.
Show resolved Hide resolved
# connect_get_namespaced_pod_exec(), the returned object is an aiohttp ClientWebSocketResponse
# object, that can be manipulated directly.
print("-------------")
async with WsApiClient() as ws_api:
v1_ws = client.CoreV1Api(api_client=ws_api)
exec_command = ['/bin/sh']
websocket = await v1_ws.connect_get_namespaced_pod_exec(
BUSYBOX_POD,
"default",
command=exec_command,
stderr=True,
stdin=True,
stdout=True,
tty=False,
_preload_content=False,
)
commands = [
"echo 'This message goes to stdout'\n",
"echo 'This message goes to stderr' >&2\n",
"exit 1\n",
]
error_data = ""
closed = False
async with websocket as ws:
while commands and not closed:
command = commands.pop(0)
stdin_channel_prefix = chr(0)
await ws.send_bytes((stdin_channel_prefix + command).encode("utf-8"))
while True:
try:
msg = await ws.receive(timeout=1)
except asyncio.TimeoutError:
break
if msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING, WSMsgType.CLOSED):
closed = True
break
channel = msg.data[0]
data = msg.data[1:].decode("utf-8")
if not data:
continue
if channel == STDOUT_CHANNEL:
print(f"stdout: {data}")
elif channel == STDERR_CHANNEL:
print(f"stderr: {data}")
elif channel == ERROR_CHANNEL:
error_data += data
if error_data:
returncode = ws_api.parse_error_data(error_data)
print(f"Exit code: {returncode}")

if __name__ == "__main__":
loop = asyncio.get_event_loop()
Expand Down
14 changes: 13 additions & 1 deletion kubernetes_asyncio/stream/ws_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.

import json

from six.moves.urllib.parse import urlencode, urlparse, urlunparse

from kubernetes_asyncio.client import ApiClient
Expand Down Expand Up @@ -54,6 +56,16 @@
super().__init__(configuration, header_name, header_value, cookie, pool_threads)
self.heartbeat = heartbeat

@classmethod
def parse_error_data(cls, error_data):
"""
Parse data received on ERROR_CHANNEL and return the command exit code.
"""
error_data_json = json.loads(error_data)
olivier-matz-6wind marked this conversation as resolved.
Show resolved Hide resolved
if error_data_json.get("status") == "Success":
return 0
return int(error_data_json["details"]["causes"][0]['message'])

async def request(self, method, url, query_params=None, headers=None,
post_params=None, body=None, _preload_content=True,
_request_timeout=None):
Expand Down Expand Up @@ -96,4 +108,4 @@

else:

return await self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)
return self.rest_client.pool_manager.ws_connect(url, headers=headers, heartbeat=self.heartbeat)

Check warning on line 111 in kubernetes_asyncio/stream/ws_client.py

View check run for this annotation

Codecov / codecov/patch

kubernetes_asyncio/stream/ws_client.py#L111

Added line #L111 was not covered by tests
olivier-matz-6wind marked this conversation as resolved.
Show resolved Hide resolved
14 changes: 14 additions & 0 deletions kubernetes_asyncio/stream/ws_client_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ async def test_exec_ws_with_heartbeat(self):
},
heartbeat=30
)

def test_parse_error_data_success(self):
error_data = '{"metadata":{},"status":"Success"}'
return_code = WsApiClient.parse_error_data(error_data)
self.assertEqual(return_code, 0)

def test_parse_error_data_failure(self):
error_data = (
'{"metadata":{},"status":"Failure",'
'"message":"command terminated with non-zero exit code",'
'"reason":"NonZeroExitCode",'
'"details":{"causes":[{"reason":"ExitCode","message":"1"}]}}')
return_code = WsApiClient.parse_error_data(error_data)
self.assertEqual(return_code, 1)