From 3a91ab5975d04517b56f16d47ecabc912c8a17c5 Mon Sep 17 00:00:00 2001 From: Tigran Mkrtchyan Date: Thu, 6 Apr 2023 17:08:43 +0200 Subject: [PATCH] io: calculate IO bandwidth per IO block --- .../cta/xrootd/DataServerHandler.java | 18 ++-- .../dcache/nearline/cta/xrootd/IoStats.java | 83 +++++++++++++++++++ .../cta/xrootd/ZeroCopyReadResponse.java | 7 +- .../nearline/cta/xrootd/IoStatsTest.java | 30 +++++++ 4 files changed, 131 insertions(+), 7 deletions(-) create mode 100644 src/main/java/org/dcache/nearline/cta/xrootd/IoStats.java create mode 100644 src/test/java/org/dcache/nearline/cta/xrootd/IoStatsTest.java diff --git a/src/main/java/org/dcache/nearline/cta/xrootd/DataServerHandler.java b/src/main/java/org/dcache/nearline/cta/xrootd/DataServerHandler.java index 206ad02..33c66e8 100644 --- a/src/main/java/org/dcache/nearline/cta/xrootd/DataServerHandler.java +++ b/src/main/java/org/dcache/nearline/cta/xrootd/DataServerHandler.java @@ -111,6 +111,8 @@ private static class MigrationRequest { */ private final Instant btime = Instant.now(); + private final IoStats ioStat = new IoStats(); + public MigrationRequest(NearlineRequest request, FileChannel fileChannel) { this.request = request; this.fileChannel = fileChannel; @@ -272,13 +274,15 @@ protected OpenResponse doOnOpen(ChannelHandlerContext ctx, @Override protected Object doOnRead(ChannelHandlerContext ctx, ReadRequest msg) throws XrootdException { - FileChannel fileChannel = getOpenFile(msg.getFileHandle()).fileChannel(); + var request = getOpenFile(msg.getFileHandle()); + FileChannel fileChannel = request.fileChannel(); if (msg.bytesToRead() == 0) { return withOk(msg); } try { - return new ZeroCopyReadResponse(msg, fileChannel); + var ioStat = request.ioStat.newRequest(); + return new ZeroCopyReadResponse(msg, fileChannel, ioStat); } catch (IOException e) { throw new XrootdException(kXR_IOError, e.getMessage()); } @@ -295,9 +299,12 @@ protected Object doOnRead(ChannelHandlerContext ctx, ReadRequest msg) protected OkResponse doOnWrite(ChannelHandlerContext ctx, WriteRequest msg) throws XrootdException { try { - FileChannel fileChannel = getOpenFile(msg.getFileHandle()).fileChannel(); + var request = getOpenFile(msg.getFileHandle()); + FileChannel fileChannel = request.fileChannel(); fileChannel.position(msg.getWriteOffset()); + var ioRequest = request.ioStat.newRequest(); msg.getData(fileChannel); + ioRequest.done(msg.getDataLength()); return withOk(msg); } catch (IOException e) { throw new XrootdException(kXR_IOError, e.getMessage()); @@ -340,12 +347,11 @@ protected OkResponse doOnClose(ChannelHandlerContext ctx, CloseReq long size = file.length(); long duration = Duration.between(migrationRequest.getCreationTime(), Instant.now()) .toMillis(); - double bandwidth = (double) size / duration; - LOGGER.info("Closing file {}. Transferred {} in {}, {}", file, + LOGGER.info("Closing file {}. Transferred {} in {}, disk performance {}", file, Strings.humanReadableSize(size), TimeUtils.describeDuration(duration, TimeUnit.MILLISECONDS), - Strings.describeBandwidth(bandwidth * 1000) + Strings.describeBandwidth(migrationRequest.ioStat.getMean()) ); if (r instanceof StageRequest) { diff --git a/src/main/java/org/dcache/nearline/cta/xrootd/IoStats.java b/src/main/java/org/dcache/nearline/cta/xrootd/IoStats.java new file mode 100644 index 0000000..de08aba --- /dev/null +++ b/src/main/java/org/dcache/nearline/cta/xrootd/IoStats.java @@ -0,0 +1,83 @@ +package org.dcache.nearline.cta.xrootd; + +// rip off RequestExecutionTimeGaugeImpl#Statistics + +import java.util.concurrent.TimeUnit; + +/** + * Encapsulates an online algorithm for maintaining various statistics about samples. + *

+ * See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance for an explanation. + */ +public class IoStats { + + private double mean; + private double m2; + private double min = Double.NaN; + private double max = Double.NaN; + private long n; + + public synchronized void update(double x) { + this.min = this.n == 0L ? x : Math.min(x, this.min); + this.max = this.n == 0L ? x : Math.max(x, this.max); + ++this.n; + double nextMean = this.mean + (x - this.mean) / (double) this.n; + double nextM2 = this.m2 + (x - this.mean) * (x - nextMean); + this.mean = nextMean; + this.m2 = nextM2; + } + + public synchronized double getMean() { + return this.n > 0L ? this.mean : Double.NaN; + } + + public synchronized double getSampleVariance() { + return this.n > 1L ? this.m2 / (double) (this.n - 1L) : Double.NaN; + } + + public synchronized double getPopulationVariance() { + return this.n > 0L ? this.m2 / (double) this.n : Double.NaN; + } + + public synchronized double getSampleStandardDeviation() { + return Math.sqrt(this.getSampleVariance()); + } + + public synchronized double getPopulationStandardDeviation() { + return Math.sqrt(this.getPopulationVariance()); + } + + public synchronized double getStandardError() { + return this.getSampleStandardDeviation() / Math.sqrt((double) this.n); + } + + public synchronized long getSampleSize() { + return this.n; + } + + public synchronized double getMin() { + return this.min; + } + + public synchronized double getMax() { + return this.max; + } + + public IoRequest newRequest() { + return new IoRequest(); + } + + public class IoRequest { + + long t0 = System.nanoTime(); + + private IoRequest() { + } + + public void done(long size) { + long delta = System.nanoTime() - t0; + double bandwith = ((double) size / delta ) * TimeUnit.SECONDS.toNanos(1); + update(bandwith); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/dcache/nearline/cta/xrootd/ZeroCopyReadResponse.java b/src/main/java/org/dcache/nearline/cta/xrootd/ZeroCopyReadResponse.java index 6f35b92..d83ee24 100644 --- a/src/main/java/org/dcache/nearline/cta/xrootd/ZeroCopyReadResponse.java +++ b/src/main/java/org/dcache/nearline/cta/xrootd/ZeroCopyReadResponse.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; +import org.dcache.nearline.cta.xrootd.IoStats.IoRequest; import org.dcache.xrootd.protocol.messages.ReadRequest; import org.dcache.xrootd.protocol.messages.XrootdResponse; @@ -36,10 +37,13 @@ public class ZeroCopyReadResponse implements XrootdResponse { private final FileChannel file; private final int count; - public ZeroCopyReadResponse(ReadRequest request, FileChannel file) throws IOException { + private final IoRequest ioRequest; + + public ZeroCopyReadResponse(ReadRequest request, FileChannel file, IoStats.IoRequest ioStat) throws IOException { this.request = checkNotNull(request); this.file = checkNotNull(file); this.count = (int) Math.min(request.bytesToRead(), file.size() - request.getReadOffset()); + this.ioRequest = ioStat; } @Override @@ -73,6 +77,7 @@ public void writeTo(ChannelHandlerContext ctx, final ChannelPromise promise) { (ChannelFutureListener) future -> { if (future.isSuccess()) { promise.trySuccess(); + ioRequest.done(count); } else { promise.tryFailure(future.cause()); } diff --git a/src/test/java/org/dcache/nearline/cta/xrootd/IoStatsTest.java b/src/test/java/org/dcache/nearline/cta/xrootd/IoStatsTest.java new file mode 100644 index 0000000..71f0d6b --- /dev/null +++ b/src/test/java/org/dcache/nearline/cta/xrootd/IoStatsTest.java @@ -0,0 +1,30 @@ +package org.dcache.nearline.cta.xrootd; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; +import org.dcache.util.ByteUnit; +import org.dcache.util.ByteUnits; +import org.junit.Test; + +public class IoStatsTest { + + + private IoStats ioStats = new IoStats(); + + + @Test + public void testStats() throws InterruptedException { + var stats = ioStats.newRequest(); + + var speedInMB = 300; + var sleepTime = 5; // 5 sec + + TimeUnit.SECONDS.sleep(sleepTime); + stats.done(ByteUnit.MB.toBytes(speedInMB) * sleepTime); + + // expected 300 MB/s + assertEquals(speedInMB, ByteUnit.BYTES.toMB(ioStats.getMean()), 1.0); + } + +} \ No newline at end of file