diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java index a132cade34c3..343902c87ccd 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/BlockInStream.java @@ -265,11 +265,6 @@ public int read(byte[] b, int off, int len) throws IOException { } private int readInternal(byte[] b, int off, int len) throws IOException { - if (mPos >= mLength) - { - return -1; - } - readChunk(); if (mCurrentChunk == null) { mEOF = true; @@ -279,16 +274,11 @@ private int readInternal(byte[] b, int off, int len) throws IOException { Preconditions .checkState(mPos >= mLength, PreconditionMessage.BLOCK_LENGTH_INCONSISTENT.toString(), mId, mLength, mPos); - // Should this be -1? (ruizh) return -1; } int toRead = Math.min(len, mCurrentChunk.readableBytes()); mCurrentChunk.readBytes(b, off, toRead); mPos += toRead; - if (mPos >= mLength) - { - closeDataReader(); - } return toRead; } diff --git a/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFileDataReader.java b/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFileDataReader.java index beffebfe79d4..9283fd17f18e 100644 --- a/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFileDataReader.java +++ b/core/client/fs/src/main/java/alluxio/client/block/stream/LocalFileDataReader.java @@ -148,10 +148,8 @@ public Factory(FileSystemContext context, WorkerNetAddress address, long blockId .toString()); mStream.send(request, mDataTimeoutMs); OpenLocalBlockResponse response = mStream.receive(mDataTimeoutMs); - mStream.close(); Preconditions.checkState(response.hasPath()); mPath = response.getPath(); - mBlockWorker.close(); } catch (Exception e) { mBlockWorker.close(); throw e; diff --git a/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java b/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java index 5e38dd2de56c..f95541b87bc9 100644 --- a/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java +++ b/core/client/fs/src/main/java/alluxio/client/file/AlluxioFileInStream.java @@ -390,7 +390,7 @@ private void closeBlockInStream(BlockInStream stream) throws IOException { if (blockSource == BlockInStream.BlockInStreamSource.LOCAL) { return; } -// triggerAsyncCaching(stream); + triggerAsyncCaching(stream); } } diff --git a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java index 2530bc1e721c..7d68d4b3c249 100644 --- a/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java +++ b/core/server/worker/src/main/java/alluxio/worker/block/UnderFileSystemBlockStore.java @@ -147,8 +147,8 @@ public void closeReaderOrWriter(long sessionId, long blockId) throws IOException try (LockResource lr = new LockResource(mLock)) { blockInfo = mBlocks.get(new Key(sessionId, blockId)); if (blockInfo == null) { -// LOG.warn("Key (block ID: {}, session ID {}) is not found when cleaning up the UFS block.", -// blockId, sessionId); + LOG.warn("Key (block ID: {}, session ID {}) is not found when cleaning up the UFS block.", + blockId, sessionId); return; } } @@ -166,8 +166,8 @@ public void releaseAccess(long sessionId, long blockId) { try (LockResource lr = new LockResource(mLock)) { Key key = new Key(sessionId, blockId); if (!mBlocks.containsKey(key)) { -// LOG.warn("Key (block ID: {}, session ID {}) is not found when releasing the UFS block.", -// blockId, sessionId); + LOG.warn("Key (block ID: {}, session ID {}) is not found when releasing the UFS block.", + blockId, sessionId); } mBlocks.remove(key); Set blockIds = mSessionIdToBlockIds.get(sessionId); diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java index 077d148f7bfd..8ea7c9bbff43 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/BlockReadHandler.java @@ -160,9 +160,6 @@ private void openBlock(BlockReadRequestContext context) context.setBlockReader(reader); mWorker.accessBlock(request.getSessionId(), request.getId()); ((FileChannel) reader.getChannel()).position(request.getStart()); - // Close block after reading - mWorker.unlockBlock(context.getRequest().getSessionId(), - context.getRequest().getId()); return; } catch (Exception e) { mWorker.unlockBlock(lockId); diff --git a/core/server/worker/src/main/java/alluxio/worker/grpc/ShortCircuitBlockReadHandler.java b/core/server/worker/src/main/java/alluxio/worker/grpc/ShortCircuitBlockReadHandler.java index 92bfb18e2677..8a5d6623af98 100644 --- a/core/server/worker/src/main/java/alluxio/worker/grpc/ShortCircuitBlockReadHandler.java +++ b/core/server/worker/src/main/java/alluxio/worker/grpc/ShortCircuitBlockReadHandler.java @@ -98,8 +98,6 @@ public OpenLocalBlockResponse call() throws Exception { } OpenLocalBlockResponse response = OpenLocalBlockResponse.newBuilder() .setPath(mWorker.readBlock(mSessionId, mRequest.getBlockId(), mLockId)).build(); - mWorker.unlockBlock(mLockId); - mLockId = BlockLockManager.INVALID_LOCK_ID; return response; } @@ -145,7 +143,7 @@ public OpenLocalBlockResponse call() throws Exception { mWorker.unlockBlock(mLockId); mLockId = BlockLockManager.INVALID_LOCK_ID; } else if (mRequest != null) { -// LOG.warn("Close a closed block {}.", mRequest.getBlockId()); + LOG.warn("Close a closed block {}.", mRequest.getBlockId()); } return null; } diff --git a/integration/fuse/src/main/java/alluxio/fuse/AlluxioJniFuseFileSystem.java b/integration/fuse/src/main/java/alluxio/fuse/AlluxioJniFuseFileSystem.java index eabc2092b8ce..c2d114af65da 100644 --- a/integration/fuse/src/main/java/alluxio/fuse/AlluxioJniFuseFileSystem.java +++ b/integration/fuse/src/main/java/alluxio/fuse/AlluxioJniFuseFileSystem.java @@ -339,7 +339,6 @@ private int readInternal(String path, ByteBuffer buf, long size, long offset, Fu long fd = fi.fh.get(); // FileInStream is not thread safe try (LockResource r1 = new LockResource(getFileLock(fd).writeLock())) { - // LOG.warn("ReadInternal acquired lock for fileID={}", fd); FileInStream is = mOpenFileEntries.get(fd); if (is == null) { LOG.error("Cannot find fd {} for {}", fd, path); diff --git a/integration/fuse/src/main/native/libjnifuse/jnifuse_impls.cc b/integration/fuse/src/main/native/libjnifuse/jnifuse_impls.cc index cf0378de63d1..6f144d417753 100644 --- a/integration/fuse/src/main/native/libjnifuse/jnifuse_impls.cc +++ b/integration/fuse/src/main/native/libjnifuse/jnifuse_impls.cc @@ -64,7 +64,6 @@ int flush_wrapper(const char *path, struct fuse_file_info *fi) { } int release_wrapper(const char *path, struct fuse_file_info *fi) { - LOGD("release: %s", path); return jnifuse::JniFuseFileSystem::getInstance()->releaseOper->call(path, fi); }