diff --git a/opendevin/sandbox/sandbox.py b/opendevin/sandbox/sandbox.py index 1a18d7345b19..741a24146727 100644 --- a/opendevin/sandbox/sandbox.py +++ b/opendevin/sandbox/sandbox.py @@ -1,9 +1,10 @@ -import atexit import os -import select import sys +import tty import time import uuid +import select +import atexit from collections import namedtuple from typing import Dict, List, Tuple @@ -11,6 +12,7 @@ import concurrent.futures from opendevin import config +from opendevin.sandbox.utils import Stream InputType = namedtuple("InputType", ["content"]) OutputType = namedtuple("OutputType", ["content"]) @@ -152,8 +154,11 @@ def read_logs(self, id) -> str: def execute(self, cmd: str) -> Tuple[int, str]: # TODO: each execute is not stateful! We need to keep track of the current working directory + def run_command(container, command): + import pdb; pdb.set_trace() return container.exec_run(command,workdir="/workspace") + # Use ThreadPoolExecutor to control command and set timeout with concurrent.futures.ThreadPoolExecutor() as executor: future = executor.submit(run_command, self.container, self.get_exec_cmd(cmd)) @@ -231,6 +236,28 @@ def stop_docker_container(self): except docker.errors.NotFound: pass + def _container_info(self): + """ + Thin wrapper around client.inspect_container(). + """ + return self.docker_client.inspect_container(self.container) + + def _attach_sockets(self): + config = self._container_info()["Config"] + assert config["AttachStdin"], "Container must have stdin attached" + assert config["AttachStdout"], "Container must have stdout attached" + assert config["AttachStderr"], "Container must have stderr attached" + assert config["Tty"], "TTY must be enabled to attach streams" + self.stdin_stream: Stream = Stream(self.docker_client.attach_socket( + self.container, params={"stdin": 1, "stream": 1} + )) + self.stdout_stream: Stream = Stream(self.docker_client.attach_socket( + self.container, params={"stdout": 1, "stream": 1} + )) + self.stderr_stream: Stream = Stream(self.docker_client.attach_socket( + self.container, params={"stderr": 1, "stream": 1} + )) + def restart_docker_container(self): try: self.stop_docker_container() @@ -240,18 +267,35 @@ def restart_docker_container(self): try: # Initialize docker client. Throws an exception if Docker is not reachable. - docker_client = docker.from_env() + # docker_client = docker.from_env() + self.docker_client = docker.APIClient() # start the container - self.container = docker_client.containers.run( + # low-level api + # https://docker-py.readthedocs.io/en/stable/api.html#docker.api.container.ContainerApiMixin.create_container + self.container = self.docker_client.create_container( self.container_image, - command="tail -f /dev/null", - network_mode="host", + command="/bin/bash", + user="devin" if RUN_AS_DEVIN else None, + stdin_open=True, + tty=True, working_dir="/workspace", + # networking_config=docker_client.create_networking_config( name=self.container_name, - detach=True, - volumes={self.workspace_dir: {"bind": "/workspace", "mode": "rw"}}, + # detach=True, + volumes=["/workspace"], + host_config=self.docker_client.create_host_config( + binds={self.workspace_dir: {"bind": "/workspace", "mode": "rw"},}, + network_mode="host", + ), ) + self._attach_sockets() + self.stdin_stream.write("echo 'hello'\n".encode("utf-8")) + # TODO: FIXME: The stdout_stream.read() is blocking indefinitely + print(self.stdout_stream.read().decode("utf-8")) + import pdb; pdb.set_trace() + print(self.stderr_stream.read().decode("utf-8")) + except Exception as e: print(f"Failed to start container: {e}") raise e @@ -266,7 +310,7 @@ def restart_docker_container(self): break time.sleep(1) elapsed += 1 - self.container = docker_client.containers.get(self.container_name) + self.container = self.docker_client.containers.get(self.container_name) if elapsed > self.timeout: break if self.container.status != "running": diff --git a/opendevin/sandbox/utils.py b/opendevin/sandbox/utils.py new file mode 100644 index 000000000000..434a6b17a91a --- /dev/null +++ b/opendevin/sandbox/utils.py @@ -0,0 +1,157 @@ +# Adopot from dockerpty/io.py +# dockerpty: io.py +# +# Copyright 2014 Chris Corbyn +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import fcntl +import errno + + +def set_blocking(fd, blocking=True): + """ + Set the given file-descriptor blocking or non-blocking. + + Returns the original blocking status. + """ + + old_flag = fcntl.fcntl(fd, fcntl.F_GETFL) + + if blocking: + new_flag = old_flag & ~ os.O_NONBLOCK + else: + new_flag = old_flag | os.O_NONBLOCK + + fcntl.fcntl(fd, fcntl.F_SETFL, new_flag) + + return not bool(old_flag & os.O_NONBLOCK) + + +class Stream(object): + """ + Generic Stream class. + + This is a file-like abstraction on top of os.read() and os.write(), which + add consistency to the reading of sockets and files alike. + """ + + """ + Recoverable IO/OS Errors. + """ + ERRNO_RECOVERABLE = [ + errno.EINTR, + errno.EDEADLK, + errno.EWOULDBLOCK, + ] + + def __init__(self, fd): + """ + Initialize the Stream for the file descriptor `fd`. + + The `fd` object must have a `fileno()` method. + """ + self.fd = fd + self.buffer = b'' + self.close_requested = False + self.closed = False + + def fileno(self): + """ + Return the fileno() of the file descriptor. + """ + + return self.fd.fileno() + + def set_blocking(self, value): + if hasattr(self.fd, 'setblocking'): + self.fd.setblocking(value) + return True + else: + return set_blocking(self.fd, value) + + def read(self, n=4096): + """ + Return `n` bytes of data from the Stream, or None at end of stream. + """ + + while True: + try: + if hasattr(self.fd, 'recv'): + return self.fd.recv(n) + return os.read(self.fd.fileno(), n) + except EnvironmentError as e: + if e.errno not in Stream.ERRNO_RECOVERABLE: + raise e + + + def write(self, data): + """ + Write `data` to the Stream. Not all data may be written right away. + Use select to find when the stream is writeable, and call do_write() + to flush the internal buffer. + """ + + if not data: + return None + + self.buffer += data + self.do_write() + + return len(data) + + def do_write(self): + """ + Flushes as much pending data from the internal write buffer as possible. + """ + while True: + try: + written = 0 + + if hasattr(self.fd, 'send'): + written = self.fd.send(self.buffer) + else: + written = os.write(self.fd.fileno(), self.buffer) + + self.buffer = self.buffer[written:] + + # try to close after writes if a close was requested + if self.close_requested and len(self.buffer) == 0: + self.close() + + return written + except EnvironmentError as e: + if e.errno not in Stream.ERRNO_RECOVERABLE: + raise e + + def needs_write(self): + """ + Returns True if the stream has data waiting to be written. + """ + return len(self.buffer) > 0 + + def close(self): + self.close_requested = True + + # We don't close the fd immediately, as there may still be data pending + # to write. + if not self.closed and len(self.buffer) == 0: + self.closed = True + if hasattr(self.fd, 'close'): + self.fd.close() + else: + os.close(self.fd.fileno()) + + def __repr__(self): + return "{cls}({fd})".format(cls=type(self).__name__, fd=self.fd)