diff --git a/python/pyarrow/error.pxd b/python/pyarrow/error.pxd index 97ba0ef2e9fcb..1fb6fad396a8b 100644 --- a/python/pyarrow/error.pxd +++ b/python/pyarrow/error.pxd @@ -18,5 +18,5 @@ from pyarrow.includes.libarrow cimport CStatus from pyarrow.includes.pyarrow cimport * -cdef check_cstatus(const CStatus& status) -cdef check_status(const Status& status) +cdef int check_cstatus(const CStatus& status) nogil except -1 +cdef int check_status(const Status& status) nogil except -1 diff --git a/python/pyarrow/error.pyx b/python/pyarrow/error.pyx index 5a6a038a92e43..244019321a7fd 100644 --- a/python/pyarrow/error.pyx +++ b/python/pyarrow/error.pyx @@ -22,16 +22,18 @@ from pyarrow.compat import frombytes class ArrowException(Exception): pass -cdef check_cstatus(const CStatus& status): +cdef int check_cstatus(const CStatus& status) nogil except -1: if status.ok(): - return + return 0 cdef c_string c_message = status.ToString() - raise ArrowException(frombytes(c_message)) + with gil: + raise ArrowException(frombytes(c_message)) -cdef check_status(const Status& status): +cdef int check_status(const Status& status) nogil except -1: if status.ok(): - return + return 0 cdef c_string c_message = status.ToString() - raise ArrowException(frombytes(c_message)) + with gil: + raise ArrowException(frombytes(c_message)) diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx index f2951c2b72bfb..d73337273753d 100644 --- a/python/pyarrow/io.pyx +++ b/python/pyarrow/io.pyx @@ -74,7 +74,8 @@ cdef class HDFSClient: def close(self): self._ensure_client() - check_cstatus(self.client.get().Disconnect()) + with nogil: + check_cstatus(self.client.get().Disconnect()) self.is_open = False cdef _ensure_client(self): @@ -102,11 +103,15 @@ cdef class HDFSClient: ------- client : HDFSClient """ - cdef HDFSClient out = HDFSClient() + cdef: + c_string c_host = tobytes(host) + c_string c_user = tobytes(user) + int c_port = port + HDFSClient out = HDFSClient() - check_cstatus( - CHDFSClient.Connect(tobytes(host), port, tobytes(user), - &out.client)) + with nogil: + check_cstatus( + CHDFSClient.Connect(c_host, c_port, c_user, &out.client)) out.is_open = True return out @@ -119,7 +124,10 @@ cdef class HDFSClient: self._ensure_client() cdef c_string c_path = tobytes(path) - return self.client.get().Exists(c_path) + cdef c_bool result + with nogil: + result = self.client.get().Exists(c_path) + return result def ls(self, path, bint full_info=True): """ @@ -144,8 +152,9 @@ cdef class HDFSClient: self._ensure_client() - check_cstatus(self.client.get() - .ListDirectory(c_path, &sp_listing)) + with nogil: + check_cstatus(self.client.get() + .ListDirectory(c_path, &sp_listing)) listing = sp_listing.get() @@ -184,10 +193,11 @@ cdef class HDFSClient: self._ensure_client() cdef c_string c_path = tobytes(path) - check_cstatus(self.client.get() - .CreateDirectory(c_path)) + with nogil: + check_cstatus(self.client.get() + .CreateDirectory(c_path)) - def delete(self, path, recursive=False): + def delete(self, path, bint recursive=False): """ Delete the indicated file or directory @@ -200,8 +210,9 @@ cdef class HDFSClient: self._ensure_client() cdef c_string c_path = tobytes(path) - check_cstatus(self.client.get() - .Delete(c_path, recursive)) + with nogil: + check_cstatus(self.client.get() + .Delete(c_path, recursive)) def open(self, path, mode='rb', buffer_size=None, replication=None, default_block_size=None): @@ -221,33 +232,33 @@ cdef class HDFSClient: cdef c_bool append = False # 0 in libhdfs means "use the default" - buffer_size = buffer_size or 0 + cdef int32_t c_buffer_size = buffer_size or 0 + cdef int16_t c_replication = replication or 0 + cdef int64_t c_default_block_size = default_block_size or 0 if mode in ('wb', 'ab'): if mode == 'ab': append = True - replication = replication or 0 - default_block_size = default_block_size or 0 - - check_cstatus( - self.client.get() - .OpenWriteable(c_path, append, - buffer_size, replication, - default_block_size, - &out.wr_file)) + with nogil: + check_cstatus( + self.client.get() + .OpenWriteable(c_path, append, c_buffer_size, + c_replication, c_default_block_size, + &out.wr_file)) out.is_readonly = False else: - check_cstatus(self.client.get() - .OpenReadable(c_path, &out.rd_file)) + with nogil: + check_cstatus(self.client.get() + .OpenReadable(c_path, &out.rd_file)) out.is_readonly = True - if buffer_size == 0: - buffer_size = 2 ** 16 + if c_buffer_size == 0: + c_buffer_size = 2 ** 16 out.mode = mode - out.buffer_size = buffer_size + out.buffer_size = c_buffer_size out.parent = self out.is_open = True @@ -297,7 +308,7 @@ cdef class HDFSClient: writer_thread.join() - def download(self, path, stream, buffer_size=2**16): + def download(self, path, stream, buffer_size=None): f = self.open(path, 'rb', buffer_size=buffer_size) f.download(stream) @@ -322,10 +333,11 @@ cdef class HDFSFile: self.close() def close(self): - if self.is_readonly: - check_cstatus(self.rd_file.get().Close()) - else: - check_cstatus(self.wr_file.get().Close()) + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Close()) + else: + check_cstatus(self.wr_file.get().Close()) self.is_open = False cdef _assert_readable(self): @@ -338,15 +350,17 @@ cdef class HDFSFile: def tell(self): cdef int64_t position - if self.is_readonly: - check_cstatus(self.rd_file.get().Tell(&position)) - else: - check_cstatus(self.wr_file.get().Tell(&position)) + with nogil: + if self.is_readonly: + check_cstatus(self.rd_file.get().Tell(&position)) + else: + check_cstatus(self.wr_file.get().Tell(&position)) return position - def seek(self, position): + def seek(self, int64_t position): self._assert_readable() - check_cstatus(self.rd_file.get().Seek(position)) + with nogil: + check_cstatus(self.rd_file.get().Seek(position)) def read(self, int nbytes): """ @@ -369,16 +383,17 @@ cdef class HDFSFile: cdef int rpc_chunksize = min(self.buffer_size, nbytes) try: - while total_bytes < nbytes: - check_cstatus(self.rd_file.get() - .Read(rpc_chunksize, &bytes_read, - buf + total_bytes)) + with nogil: + while total_bytes < nbytes: + check_cstatus(self.rd_file.get() + .Read(rpc_chunksize, &bytes_read, + buf + total_bytes)) - total_bytes += bytes_read + total_bytes += bytes_read - # EOF - if bytes_read == 0: - break + # EOF + if bytes_read == 0: + break result = cp.PyBytes_FromStringAndSize(buf, total_bytes) finally: @@ -440,9 +455,13 @@ cdef class HDFSFile: cdef int64_t total_bytes = 0 try: + i = 0 while True: - check_cstatus(self.rd_file.get() - .Read(self.buffer_size, &bytes_read, buf)) + with nogil: + check_cstatus(self.rd_file.get() + .Read(self.buffer_size, &bytes_read, buf)) + + i += 1 total_bytes += bytes_read @@ -473,4 +492,6 @@ cdef class HDFSFile: data = tobytes(data) cdef const uint8_t* buf = cp.PyBytes_AS_STRING(data) - check_cstatus(self.wr_file.get().Write(buf, len(data))) + cdef int32_t bufsize = len(data) + with nogil: + check_cstatus(self.wr_file.get().Write(buf, bufsize))