Skip to content

Commit

Permalink
Separate inbound & outbound histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner committed Nov 25, 2021
1 parent b527e82 commit c775e4e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
"Transport handling time histogram":
- skip:
version: " - 8.0.99"
reason: "handling_time_histogram was added in 8.1"
reason: "handling_time_histograms were added in 8.1"
features: [arbitrary_key]

- do:
Expand All @@ -37,16 +37,31 @@
nodes.stats:
metric: [ transport ]

- length: { nodes.$node_id.transport.handling_time_histogram: 17 }
- length: { nodes.$node_id.transport.inbound_handling_time_histogram: 18 }

- gte: { nodes.$node_id.transport.handling_time_histogram.0.count: 0 }
- is_false: nodes.$node_id.transport.handling_time_histogram.0.ge_millis
- match: { nodes.$node_id.transport.handling_time_histogram.0.lt_millis: 1 }
- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.0.count: 0 }
- is_false: nodes.$node_id.transport.inbound_handling_time_histogram.0.ge_millis
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.0.lt_millis: 1 }

- gte: { nodes.$node_id.transport.handling_time_histogram.1.count: 0 }
- match: { nodes.$node_id.transport.handling_time_histogram.1.ge_millis: 1 }
- match: { nodes.$node_id.transport.handling_time_histogram.1.lt_millis: 2 }
- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.1.count: 0 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.1.ge_millis: 1 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.1.lt_millis: 2 }

- gte: { nodes.$node_id.transport.handling_time_histogram.16.count: 0 }
- match: { nodes.$node_id.transport.handling_time_histogram.16.ge_millis: 65536 }
- is_false: nodes.$node_id.transport.handling_time_histogram.16.lt_millis
- gte: { nodes.$node_id.transport.inbound_handling_time_histogram.17.count: 0 }
- match: { nodes.$node_id.transport.inbound_handling_time_histogram.17.ge_millis: 65536 }
- is_false: nodes.$node_id.transport.inbound_handling_time_histogram.17.lt_millis


- length: { nodes.$node_id.transport.outbound_handling_time_histogram: 18 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.0.count: 0 }
- is_false: nodes.$node_id.transport.outbound_handling_time_histogram.0.ge_millis
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.0.lt_millis: 1 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.1.count: 0 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.1.ge_millis: 1 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.1.lt_millis: 2 }

- gte: { nodes.$node_id.transport.outbound_handling_time_histogram.17.count: 0 }
- match: { nodes.$node_id.transport.outbound_handling_time_histogram.17.ge_millis: 65536 }
- is_false: nodes.$node_id.transport.outbound_handling_time_histogram.17.lt_millis
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.HandlingTimeTracker;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
Expand Down Expand Up @@ -116,6 +117,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements

private final TransportHandshaker handshaker;
private final TransportKeepAlive keepAlive;
private final HandlingTimeTracker outboundHandlingTimeTracker = new HandlingTimeTracker();
private final OutboundHandler outboundHandler;
private final InboundHandler inboundHandler;
private final ResponseHandlers responseHandlers = new ResponseHandlers();
Expand All @@ -141,14 +143,7 @@ public TcpTransport(
String nodeName = Node.NODE_NAME_SETTING.get(settings);

this.recycler = createRecycler(settings, pageCacheRecycler);
this.outboundHandler = new OutboundHandler(
nodeName,
version,
statsTracker,
threadPool,
recycler,
networkService.getHandlingTimeTracker()
);
this.outboundHandler = new OutboundHandler(nodeName, version, statsTracker, threadPool, recycler, outboundHandlingTimeTracker);
this.handshaker = new TransportHandshaker(
version,
threadPool,
Expand Down Expand Up @@ -927,7 +922,8 @@ public final TransportStats getStats() {
bytesRead,
messagesSent,
bytesWritten,
networkService.getHandlingTimeTracker().getHistogram()
networkService.getHandlingTimeTracker().getHistogram(),
outboundHandlingTimeTracker.getHistogram()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class TransportStats implements Writeable, ToXContentFragment {
private final long rxSize;
private final long txCount;
private final long txSize;
private final long[] handlingTimeBucketFrequencies;
private final long[] inboundHandlingTimeBucketFrequencies;
private final long[] outboundHandlingTimeBucketFrequencies;

public TransportStats(
long serverOpen,
Expand All @@ -37,16 +38,18 @@ public TransportStats(
long rxSize,
long txCount,
long txSize,
long[] handlingTimeBucketFrequencies
long[] inboundHandlingTimeBucketFrequencies,
long[] outboundHandlingTimeBucketFrequencies
) {
this.serverOpen = serverOpen;
this.totalOutboundConnections = totalOutboundConnections;
this.rxCount = rxCount;
this.rxSize = rxSize;
this.txCount = txCount;
this.txSize = txSize;
this.handlingTimeBucketFrequencies = handlingTimeBucketFrequencies;
assert assertHistogramConsistent();
this.inboundHandlingTimeBucketFrequencies = inboundHandlingTimeBucketFrequencies;
this.outboundHandlingTimeBucketFrequencies = outboundHandlingTimeBucketFrequencies;
assert assertHistogramsConsistent();
}

public TransportStats(StreamInput in) throws IOException {
Expand All @@ -57,14 +60,19 @@ public TransportStats(StreamInput in) throws IOException {
txCount = in.readVLong();
txSize = in.readVLong();
if (in.getVersion().onOrAfter(Version.V_8_1_0) && in.readBoolean()) {
handlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < handlingTimeBucketFrequencies.length; i++) {
handlingTimeBucketFrequencies[i] = in.readVLong();
inboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
inboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
outboundHandlingTimeBucketFrequencies = new long[HandlingTimeTracker.BUCKET_COUNT];
for (int i = 0; i < inboundHandlingTimeBucketFrequencies.length; i++) {
outboundHandlingTimeBucketFrequencies[i] = in.readVLong();
}
} else {
handlingTimeBucketFrequencies = new long[0];
inboundHandlingTimeBucketFrequencies = new long[0];
outboundHandlingTimeBucketFrequencies = new long[0];
}
assert assertHistogramConsistent();
assert assertHistogramsConsistent();
}

@Override
Expand All @@ -76,8 +84,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(txCount);
out.writeVLong(txSize);
if (out.getVersion().onOrAfter(Version.V_8_1_0)) {
out.writeBoolean(handlingTimeBucketFrequencies.length > 0);
for (long handlingTimeBucketFrequency : handlingTimeBucketFrequencies) {
assert (inboundHandlingTimeBucketFrequencies.length > 0) == (outboundHandlingTimeBucketFrequencies.length > 0);
out.writeBoolean(inboundHandlingTimeBucketFrequencies.length > 0);
for (long handlingTimeBucketFrequency : inboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
for (long handlingTimeBucketFrequency : outboundHandlingTimeBucketFrequencies) {
out.writeVLong(handlingTimeBucketFrequency);
}
}
Expand Down Expand Up @@ -123,16 +135,21 @@ public ByteSizeValue getTxSize() {
return txSize();
}

public long[] getHandlingTimeBucketFrequencies() {
return Arrays.copyOf(handlingTimeBucketFrequencies, handlingTimeBucketFrequencies.length);
public long[] getInboundHandlingTimeBucketFrequencies() {
return Arrays.copyOf(inboundHandlingTimeBucketFrequencies, inboundHandlingTimeBucketFrequencies.length);
}

private boolean assertHistogramConsistent() {
if (handlingTimeBucketFrequencies.length == 0) {
public long[] getOutboundHandlingTimeBucketFrequencies() {
return Arrays.copyOf(outboundHandlingTimeBucketFrequencies, outboundHandlingTimeBucketFrequencies.length);
}

private boolean assertHistogramsConsistent() {
assert inboundHandlingTimeBucketFrequencies.length == outboundHandlingTimeBucketFrequencies.length;
if (inboundHandlingTimeBucketFrequencies.length == 0) {
// Stats came from before v8.1
assert Version.CURRENT.major == Version.V_8_0_0.major;
} else {
assert handlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
assert inboundHandlingTimeBucketFrequencies.length == HandlingTimeTracker.BUCKET_COUNT;
}
return true;
}
Expand All @@ -146,22 +163,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.RX_SIZE_IN_BYTES, Fields.RX_SIZE, new ByteSizeValue(rxSize));
builder.field(Fields.TX_COUNT, txCount);
builder.humanReadableField(Fields.TX_SIZE_IN_BYTES, Fields.TX_SIZE, new ByteSizeValue(txSize));
if (handlingTimeBucketFrequencies.length > 0) {
final int[] handlingTimeBucketBounds = HandlingTimeTracker.getBucketUpperBounds();
assert handlingTimeBucketFrequencies.length == handlingTimeBucketBounds.length + 1;
builder.startArray(Fields.HANDLING_TIME_HISTOGRAM);
for (int i = 0; i < handlingTimeBucketFrequencies.length; i++) {
builder.startObject();
if (i > 0 && i <= handlingTimeBucketBounds.length) {
builder.field("ge_millis", handlingTimeBucketBounds[i - 1]);
}
if (i < handlingTimeBucketBounds.length) {
builder.field("lt_millis", handlingTimeBucketBounds[i]);
}
builder.field("count", handlingTimeBucketFrequencies[i]);
builder.endObject();
}
builder.endArray();
if (inboundHandlingTimeBucketFrequencies.length > 0) {
histogramToXContent(builder, inboundHandlingTimeBucketFrequencies, Fields.INBOUND_HANDLING_TIME_HISTOGRAM);
histogramToXContent(builder, outboundHandlingTimeBucketFrequencies, Fields.OUTBOUND_HANDLING_TIME_HISTOGRAM);
} else {
// Stats came from before v8.1
assert Version.CURRENT.major == Version.V_8_0_0.major;
Expand All @@ -170,6 +174,24 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

private void histogramToXContent(XContentBuilder builder, long[] bucketFrequencies, String fieldName) throws IOException {
final int[] bucketBounds = HandlingTimeTracker.getBucketUpperBounds();
assert bucketFrequencies.length == bucketBounds.length + 1;
builder.startArray(fieldName);
for (int i = 0; i < bucketFrequencies.length; i++) {
builder.startObject();
if (i > 0 && i <= bucketBounds.length) {
builder.field("ge_millis", bucketBounds[i - 1]);
}
if (i < bucketBounds.length) {
builder.field("lt_millis", bucketBounds[i]);
}
builder.field("count", bucketFrequencies[i]);
builder.endObject();
}
builder.endArray();
}

static final class Fields {
static final String TRANSPORT = "transport";
static final String SERVER_OPEN = "server_open";
Expand All @@ -180,6 +202,7 @@ static final class Fields {
static final String TX_COUNT = "tx_count";
static final String TX_SIZE = "tx_size";
static final String TX_SIZE_IN_BYTES = "tx_size_in_bytes";
static final String HANDLING_TIME_HISTOGRAM = "handling_time_histogram";
static final String INBOUND_HANDLING_TIME_HISTOGRAM = "inbound_handling_time_histogram";
static final String OUTBOUND_HANDLING_TIME_HISTOGRAM = "outbound_handling_time_histogram";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,12 @@ public void testSerialization() throws IOException {
assertEquals(nodeStats.getTransport().getTxCount(), deserializedNodeStats.getTransport().getTxCount());
assertEquals(nodeStats.getTransport().getTxSize(), deserializedNodeStats.getTransport().getTxSize());
assertArrayEquals(
nodeStats.getTransport().getHandlingTimeBucketFrequencies(),
deserializedNodeStats.getTransport().getHandlingTimeBucketFrequencies()
nodeStats.getTransport().getInboundHandlingTimeBucketFrequencies(),
deserializedNodeStats.getTransport().getInboundHandlingTimeBucketFrequencies()
);
assertArrayEquals(
nodeStats.getTransport().getOutboundHandlingTimeBucketFrequencies(),
deserializedNodeStats.getTransport().getOutboundHandlingTimeBucketFrequencies()
);
}
if (nodeStats.getHttp() == null) {
Expand Down Expand Up @@ -679,6 +683,7 @@ public static NodeStats createNodeStats() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT).mapToLong(i -> randomNonNegativeLong()).toArray(),
IntStream.range(0, HandlingTimeTracker.BUCKET_COUNT).mapToLong(i -> randomNonNegativeLong()).toArray()
)
: null;
Expand Down

0 comments on commit c775e4e

Please sign in to comment.