diff --git a/docker/transport/npipesocket.py b/docker/transport/npipesocket.py index 766372aef..9cbe40cc7 100644 --- a/docker/transport/npipesocket.py +++ b/docker/transport/npipesocket.py @@ -4,6 +4,9 @@ import win32file import win32pipe +import pywintypes +import win32event +import win32api cERROR_PIPE_BUSY = 0xe7 cSECURITY_SQOS_PRESENT = 0x100000 @@ -54,7 +57,9 @@ def connect(self, address, retry_count=0): 0, None, win32file.OPEN_EXISTING, - cSECURITY_ANONYMOUS | cSECURITY_SQOS_PRESENT, + (cSECURITY_ANONYMOUS + | cSECURITY_SQOS_PRESENT + | win32file.FILE_FLAG_OVERLAPPED), 0 ) except win32pipe.error as e: @@ -131,22 +136,37 @@ def recv_into(self, buf, nbytes=0): if not isinstance(buf, memoryview): readbuf = memoryview(buf) - err, data = win32file.ReadFile( - self._handle, - readbuf[:nbytes] if nbytes else readbuf - ) - return len(data) - - def _recv_into_py2(self, buf, nbytes): - err, data = win32file.ReadFile(self._handle, nbytes or len(buf)) - n = len(data) - buf[:n] = data - return n + event = win32event.CreateEvent(None, True, True, None) + try: + overlapped = pywintypes.OVERLAPPED() + overlapped.hEvent = event + err, data = win32file.ReadFile( + self._handle, + readbuf[:nbytes] if nbytes else readbuf, + overlapped + ) + wait_result = win32event.WaitForSingleObject(event, self._timeout) + if wait_result == win32event.WAIT_TIMEOUT: + win32file.CancelIo(self._handle) + raise TimeoutError + return win32file.GetOverlappedResult(self._handle, overlapped, 0) + finally: + win32api.CloseHandle(event) @check_closed def send(self, string, flags=0): - err, nbytes = win32file.WriteFile(self._handle, string) - return nbytes + event = win32event.CreateEvent(None, True, True, None) + try: + overlapped = pywintypes.OVERLAPPED() + overlapped.hEvent = event + win32file.WriteFile(self._handle, string, overlapped) + wait_result = win32event.WaitForSingleObject(event, self._timeout) + if wait_result == win32event.WAIT_TIMEOUT: + win32file.CancelIo(self._handle) + raise TimeoutError + return win32file.GetOverlappedResult(self._handle, overlapped, 0) + finally: + win32api.CloseHandle(event) @check_closed def sendall(self, string, flags=0): @@ -165,15 +185,12 @@ def setblocking(self, flag): def settimeout(self, value): if value is None: # Blocking mode - self._timeout = win32pipe.NMPWAIT_WAIT_FOREVER + self._timeout = win32event.INFINITE elif not isinstance(value, (float, int)) or value < 0: raise ValueError('Timeout value out of range') - elif value == 0: - # Non-blocking mode - self._timeout = win32pipe.NMPWAIT_NO_WAIT else: # Timeout mode - Value converted to milliseconds - self._timeout = value * 1000 + self._timeout = int(value * 1000) def gettimeout(self): return self._timeout