Skip to content

Commit

Permalink
Fixed stats buffer rotation (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Sep 16, 2016
1 parent 47a213c commit b2cbf4e
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.yahoo.pulsar.broker.admin;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
import java.util.Map;
Expand All @@ -33,23 +32,21 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;
import com.yahoo.pulsar.broker.loadbalance.ResourceUnit;
import com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl;
import com.yahoo.pulsar.broker.stats.AllocatorStatsGenerator;
import com.yahoo.pulsar.broker.stats.BookieClientStatsGenerator;
import com.yahoo.pulsar.broker.stats.MBeanStatsGenerator;
import com.yahoo.pulsar.broker.stats.Metrics;
import com.yahoo.pulsar.broker.web.RestException;
import com.yahoo.pulsar.common.naming.NamespaceName;
import com.yahoo.pulsar.common.policies.data.loadbalancer.LoadReport;
import com.yahoo.pulsar.common.stats.AllocatorStats;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;

@Path("/broker-stats")
@Api(value = "/broker-stats", description = "Stats for broker", tags = "broker-stats")
Expand Down Expand Up @@ -99,19 +96,13 @@ public Collection<Metrics> getMBeans() throws Exception {
public StreamingOutput getDestinations2() throws Exception {
// Ensure super user access only
validateSuperUserAccess();
return new StreamingOutput() {
public void write(OutputStream output) throws IOException, WebApplicationException {
ByteBuf statsBuf = null;
try {
statsBuf = pulsar().getBrokerService().getDimensionMetrics();
output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes());
} catch (Exception e) {
throw new WebApplicationException(e);
} finally {
ReferenceCountUtil.release(statsBuf);
}
return output -> pulsar().getBrokerService().getDimensionMetrics(statsBuf -> {
try {
output.write(statsBuf.array(), statsBuf.arrayOffset(), statsBuf.readableBytes());
} catch (Exception e) {
throw new WebApplicationException(e);
}
};
});
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
Expand Down Expand Up @@ -591,8 +592,8 @@ public void updateRates() {
}
}

public ByteBuf getDimensionMetrics() {
return pulsarStats.getDimensionMetrics();
public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
pulsarStats.getDimensionMetrics(consumer);
}

public List<Metrics> getDestinationMetrics() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class PulsarStats implements Closeable {
private List<Metrics> metricsCollection;
private final BrokerOperabilityMetrics brokerOperabilityMetrics;

private final ReentrantReadWriteLock bufferLock = new ReentrantReadWriteLock();

public PulsarStats(PulsarService pulsar) {
this.topicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024);
this.tempTopicStatsBuf = PooledByteBufAllocator.DEFAULT.heapBuffer(16 * 1024);
Expand Down Expand Up @@ -148,21 +152,23 @@ public synchronized void updateStats(
metricsCollection = tempMetricsCollection;
tempMetricsCollection = tempRefMetrics;

ByteBuf tmp = topicStatsBuf;
topicStatsBuf = tempTopicStatsBuf;
tempTopicStatsBuf = tmp;
bufferLock.writeLock().lock();
try {
ByteBuf tmp = topicStatsBuf;
topicStatsBuf = tempTopicStatsBuf;
tempTopicStatsBuf = tmp;
tempTopicStatsBuf.clear();
} finally {
bufferLock.writeLock().unlock();
}
}

public ByteBuf getDimensionMetrics() {
while (true) {
ByteBuf topicStatsBuf = this.topicStatsBuf;
try {
topicStatsBuf.retain();
return topicStatsBuf;
} catch (Exception e) {
// Re-fetch the buffer, since it have been swapped and release
continue;
}
public void getDimensionMetrics(Consumer<ByteBuf> consumer) {
bufferLock.readLock().lock();
try {
consumer.accept(topicStatsBuf);
} finally {
bufferLock.readLock().unlock();
}
}

Expand Down

0 comments on commit b2cbf4e

Please sign in to comment.