Skip to content

Commit

Permalink
io: calculate IO bandwidth per IO block
Browse files Browse the repository at this point in the history
  • Loading branch information
kofemann committed Apr 6, 2023
1 parent ef389a6 commit 3a91ab5
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private static class MigrationRequest {
*/
private final Instant btime = Instant.now();

private final IoStats ioStat = new IoStats();

public MigrationRequest(NearlineRequest request, FileChannel fileChannel) {
this.request = request;
this.fileChannel = fileChannel;
Expand Down Expand Up @@ -272,13 +274,15 @@ protected OpenResponse doOnOpen(ChannelHandlerContext ctx,
@Override
protected Object doOnRead(ChannelHandlerContext ctx, ReadRequest msg)
throws XrootdException {
FileChannel fileChannel = getOpenFile(msg.getFileHandle()).fileChannel();
var request = getOpenFile(msg.getFileHandle());
FileChannel fileChannel = request.fileChannel();
if (msg.bytesToRead() == 0) {
return withOk(msg);
}

try {
return new ZeroCopyReadResponse(msg, fileChannel);
var ioStat = request.ioStat.newRequest();
return new ZeroCopyReadResponse(msg, fileChannel, ioStat);
} catch (IOException e) {
throw new XrootdException(kXR_IOError, e.getMessage());
}
Expand All @@ -295,9 +299,12 @@ protected Object doOnRead(ChannelHandlerContext ctx, ReadRequest msg)
protected OkResponse<WriteRequest> doOnWrite(ChannelHandlerContext ctx, WriteRequest msg)
throws XrootdException {
try {
FileChannel fileChannel = getOpenFile(msg.getFileHandle()).fileChannel();
var request = getOpenFile(msg.getFileHandle());
FileChannel fileChannel = request.fileChannel();
fileChannel.position(msg.getWriteOffset());
var ioRequest = request.ioStat.newRequest();
msg.getData(fileChannel);
ioRequest.done(msg.getDataLength());
return withOk(msg);
} catch (IOException e) {
throw new XrootdException(kXR_IOError, e.getMessage());
Expand Down Expand Up @@ -340,12 +347,11 @@ protected OkResponse<CloseRequest> doOnClose(ChannelHandlerContext ctx, CloseReq
long size = file.length();
long duration = Duration.between(migrationRequest.getCreationTime(), Instant.now())
.toMillis();
double bandwidth = (double) size / duration;

LOGGER.info("Closing file {}. Transferred {} in {}, {}", file,
LOGGER.info("Closing file {}. Transferred {} in {}, disk performance {}", file,
Strings.humanReadableSize(size),
TimeUtils.describeDuration(duration, TimeUnit.MILLISECONDS),
Strings.describeBandwidth(bandwidth * 1000)
Strings.describeBandwidth(migrationRequest.ioStat.getMean())
);

if (r instanceof StageRequest) {
Expand Down
83 changes: 83 additions & 0 deletions src/main/java/org/dcache/nearline/cta/xrootd/IoStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package org.dcache.nearline.cta.xrootd;

// rip off RequestExecutionTimeGaugeImpl#Statistics

import java.util.concurrent.TimeUnit;

/**
* Encapsulates an online algorithm for maintaining various statistics about samples.
* <p>
* See https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance for an explanation.
*/
public class IoStats {

private double mean;
private double m2;
private double min = Double.NaN;
private double max = Double.NaN;
private long n;

public synchronized void update(double x) {
this.min = this.n == 0L ? x : Math.min(x, this.min);
this.max = this.n == 0L ? x : Math.max(x, this.max);
++this.n;
double nextMean = this.mean + (x - this.mean) / (double) this.n;
double nextM2 = this.m2 + (x - this.mean) * (x - nextMean);
this.mean = nextMean;
this.m2 = nextM2;
}

public synchronized double getMean() {
return this.n > 0L ? this.mean : Double.NaN;
}

public synchronized double getSampleVariance() {
return this.n > 1L ? this.m2 / (double) (this.n - 1L) : Double.NaN;
}

public synchronized double getPopulationVariance() {
return this.n > 0L ? this.m2 / (double) this.n : Double.NaN;
}

public synchronized double getSampleStandardDeviation() {
return Math.sqrt(this.getSampleVariance());
}

public synchronized double getPopulationStandardDeviation() {
return Math.sqrt(this.getPopulationVariance());
}

public synchronized double getStandardError() {
return this.getSampleStandardDeviation() / Math.sqrt((double) this.n);
}

public synchronized long getSampleSize() {
return this.n;
}

public synchronized double getMin() {
return this.min;
}

public synchronized double getMax() {
return this.max;
}

public IoRequest newRequest() {
return new IoRequest();
}

public class IoRequest {

long t0 = System.nanoTime();

private IoRequest() {
}

public void done(long size) {
long delta = System.nanoTime() - t0;
double bandwith = ((double) size / delta ) * TimeUnit.SECONDS.toNanos(1);
update(bandwith);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.io.IOException;
import java.nio.channels.FileChannel;
import org.dcache.nearline.cta.xrootd.IoStats.IoRequest;
import org.dcache.xrootd.protocol.messages.ReadRequest;
import org.dcache.xrootd.protocol.messages.XrootdResponse;

Expand All @@ -36,10 +37,13 @@ public class ZeroCopyReadResponse implements XrootdResponse<ReadRequest> {
private final FileChannel file;
private final int count;

public ZeroCopyReadResponse(ReadRequest request, FileChannel file) throws IOException {
private final IoRequest ioRequest;

public ZeroCopyReadResponse(ReadRequest request, FileChannel file, IoStats.IoRequest ioStat) throws IOException {
this.request = checkNotNull(request);
this.file = checkNotNull(file);
this.count = (int) Math.min(request.bytesToRead(), file.size() - request.getReadOffset());
this.ioRequest = ioStat;
}

@Override
Expand Down Expand Up @@ -73,6 +77,7 @@ public void writeTo(ChannelHandlerContext ctx, final ChannelPromise promise) {
(ChannelFutureListener) future -> {
if (future.isSuccess()) {
promise.trySuccess();
ioRequest.done(count);
} else {
promise.tryFailure(future.cause());
}
Expand Down
30 changes: 30 additions & 0 deletions src/test/java/org/dcache/nearline/cta/xrootd/IoStatsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.dcache.nearline.cta.xrootd;

import static org.junit.Assert.*;

import java.util.concurrent.TimeUnit;
import org.dcache.util.ByteUnit;
import org.dcache.util.ByteUnits;
import org.junit.Test;

public class IoStatsTest {


private IoStats ioStats = new IoStats();


@Test
public void testStats() throws InterruptedException {
var stats = ioStats.newRequest();

var speedInMB = 300;
var sleepTime = 5; // 5 sec

TimeUnit.SECONDS.sleep(sleepTime);
stats.done(ByteUnit.MB.toBytes(speedInMB) * sleepTime);

// expected 300 MB/s
assertEquals(speedInMB, ByteUnit.BYTES.toMB(ioStats.getMean()), 1.0);
}

}

0 comments on commit 3a91ab5

Please sign in to comment.