diff --git a/be/src/io/fs/hdfs_file_reader.cpp b/be/src/io/fs/hdfs_file_reader.cpp index 383958802f2552..6c4f456e3779d1 100644 --- a/be/src/io/fs/hdfs_file_reader.cpp +++ b/be/src/io/fs/hdfs_file_reader.cpp @@ -133,13 +133,21 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r return Status::OK(); } - tSize r = hdfsPread(_handle->fs(), _handle->file(), offset, to, bytes_req); - if (r == -1) { - return Status::InternalError( - "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", - BackendOptions::get_localhost(), _name_node, _path.string(), hdfs_error()); + size_t has_read = 0; + while (has_read < bytes_req) { + tSize loop_read = hdfsPread(_handle->fs(), _handle->file(), offset + has_read, + to + has_read, bytes_req - has_read); + if (loop_read < 0) { + return Status::InternalError( + "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}", + BackendOptions::get_localhost(), _name_node, _path.string(), hdfs_error()); + } + if (loop_read == 0) { + break; + } + has_read += loop_read; } - *bytes_read = bytes_req; + *bytes_read = has_read; return Status::OK(); } diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java index d86df86fe0b952..d25947e33bf270 100644 --- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java +++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java @@ -1204,27 +1204,32 @@ public ByteBuffer pread(TBrokerFD fd, long offset, long length) { // Avoid using the ByteBuffer based read for Hadoop because some FSDataInputStream // implementations are not ByteBufferReadable, // See https://issues.apache.org/jira/browse/HADOOP-14603 - byte[] buf; - if (length > readBufferSize) { - buf = new byte[readBufferSize]; - } else { - buf = new byte[(int) length]; - } - try { - int readLength = readBytesFully(fsDataInputStream, buf); - if (readLength < 0) { - throw new BrokerException(TBrokerOperationStatusCode.END_OF_FILE, - "end of file reached"); - } - if (logger.isDebugEnabled()) { - logger.debug("read buffer from input stream, buffer size:" + buf.length + ", read length:" + readLength); + int hasRead = 0; + byte[] buf = new byte[(int)length]; + while (hasRead < length) { + int bufSize = Math.min((int) length - hasRead, readBufferSize); + try { + int readLength = fsDataInputStream.read(buf, hasRead, bufSize); + if (readLength < 0) { + throw new BrokerException(TBrokerOperationStatusCode.END_OF_FILE, + "end of file reached"); + } + if (logger.isDebugEnabled()) { + logger.debug("read buffer from input stream, buffer size:" + buf.length + ", read length:" + + readLength); + } + hasRead += readLength; + } catch (IOException e) { + logger.error("errors while read data from stream", e); + throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, + e, "errors while read data from stream"); } - return ByteBuffer.wrap(buf, 0, readLength); - } catch (IOException e) { - logger.error("errors while read data from stream", e); + } + if (hasRead != length) { throw new BrokerException(TBrokerOperationStatusCode.TARGET_STORAGE_SERVICE_ERROR, - e, "errors while write data to output stream"); + String.format("errors while read data from stream: hasRead(%d) != length(%d)", hasRead, length)); } + return ByteBuffer.wrap(buf, 0, hasRead); } } @@ -1325,19 +1330,6 @@ private static TBrokerFD parseUUIDToFD(UUID uuid) { return new TBrokerFD(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); } - private int readBytesFully(FSDataInputStream is, byte[] dest) throws IOException { - int readLength = 0; - while (readLength < dest.length) { - int availableReadLength = dest.length - readLength; - int n = is.read(dest, readLength, availableReadLength); - if (n <= 0) { - break; - } - readLength += n; - } - return readLength; - } - /** * In view of the different expiration mechanisms of different authentication modes, * there are two ways to determine whether BrokerFileSystem has expired: