-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Async reading in FileSystemDataVault #8126
Changes from all commits
7879ca8
2a78c11
0f11677
8dbd347
28ddb66
f8f12c0
1966dc5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,15 +1,16 @@ | ||
package com.scalableminds.webknossos.datastore.datavault | ||
|
||
import com.scalableminds.util.tools.Fox | ||
import net.liftweb.common.Box.tryo | ||
import com.scalableminds.util.tools.Fox.{bool2Fox, box2Fox} | ||
import com.scalableminds.util.tools.Fox.bool2Fox | ||
import com.scalableminds.webknossos.datastore.storage.DataVaultService | ||
import net.liftweb.common.{Box, Full} | ||
import org.apache.commons.lang3.builder.HashCodeBuilder | ||
|
||
import java.nio.ByteBuffer | ||
import java.nio.file.{Files, Path, Paths} | ||
import java.nio.channels.{AsynchronousFileChannel, CompletionHandler} | ||
import java.nio.file.{Files, Path, Paths, StandardOpenOption} | ||
import java.util.stream.Collectors | ||
import scala.concurrent.ExecutionContext | ||
import scala.concurrent.{ExecutionContext, Promise} | ||
import scala.jdk.CollectionConverters._ | ||
|
||
class FileSystemDataVault extends DataVault { | ||
|
@@ -24,31 +25,55 @@ class FileSystemDataVault extends DataVault { | |
private def readBytesLocal(localPath: Path, range: RangeSpecifier)(implicit ec: ExecutionContext): Fox[Array[Byte]] = | ||
if (Files.exists(localPath)) { | ||
range match { | ||
case Complete() => tryo(Files.readAllBytes(localPath)).toFox | ||
case Complete() => | ||
readAsync(localPath, 0, Math.toIntExact(Files.size(localPath))) | ||
|
||
case StartEnd(r) => | ||
tryo { | ||
val channel = Files.newByteChannel(localPath) | ||
val buf = ByteBuffer.allocateDirect(r.length) | ||
channel.position(r.start) | ||
channel.read(buf) | ||
buf.rewind() | ||
val arr = new Array[Byte](r.length) | ||
buf.get(arr) | ||
arr | ||
}.toFox | ||
readAsync(localPath, r.start, r.length) | ||
|
||
case SuffixLength(length) => | ||
tryo { | ||
val channel = Files.newByteChannel(localPath) | ||
val buf = ByteBuffer.allocateDirect(length) | ||
channel.position(channel.size() - length) | ||
channel.read(buf) | ||
buf.rewind() | ||
val arr = new Array[Byte](length) | ||
buf.get(arr) | ||
arr | ||
}.toFox | ||
val fileSize = Files.size(localPath) | ||
readAsync(localPath, fileSize - length, length) | ||
} | ||
} else Fox.empty | ||
} else { | ||
Fox.empty | ||
} | ||
|
||
private def readAsync(path: Path, position: Long, length: Int)(implicit ec: ExecutionContext): Fox[Array[Byte]] = { | ||
val promise = Promise[Box[Array[Byte]]]() | ||
val buffer = ByteBuffer.allocateDirect(length) | ||
var channel: AsynchronousFileChannel = null | ||
Comment on lines
+44
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Consider reusing DirectByteBuffer for performance Creating a direct ByteBuffer for each read operation is expensive. Consider implementing a buffer pool or using a ThreadLocal buffer for better performance. Would you like me to provide an implementation of a buffer pool? |
||
|
||
try { | ||
channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ) | ||
|
||
channel.read( | ||
buffer, | ||
position, | ||
buffer, | ||
new CompletionHandler[Integer, ByteBuffer] { | ||
override def completed(result: Integer, buffer: ByteBuffer): Unit = { | ||
buffer.rewind() | ||
val arr = new Array[Byte](length) | ||
buffer.get(arr) | ||
promise.success(Full(arr)) | ||
channel.close() | ||
} | ||
|
||
override def failed(exc: Throwable, buffer: ByteBuffer): Unit = { | ||
promise.failure(exc) | ||
channel.close() | ||
} | ||
} | ||
) | ||
} catch { | ||
case e: Throwable => | ||
promise.failure(e) | ||
if (channel != null && channel.isOpen) channel.close() | ||
} | ||
|
||
promise.future | ||
} | ||
|
||
override def listDirectory(path: VaultPath, maxItems: Int)(implicit ec: ExecutionContext): Fox[List[VaultPath]] = | ||
vaultPathToLocalPath(path).map( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add bounds checking for SuffixLength case
The calculation
fileSize - length
could result in a negative position iflength
is greater thanfileSize
. Add validation to prevent this scenario.Apply this diff:
📝 Committable suggestion