Skip to content

Commit

Permalink
Moved bad lock on queue to queue pool and happend during nextFree seg…
Browse files Browse the repository at this point in the history
…ment reclamation
  • Loading branch information
andsel committed Jan 15, 2023
1 parent 9f8e559 commit 8043765
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 58 deletions.
79 changes: 33 additions & 46 deletions broker/src/main/java/io/moquette/broker/unsafequeues/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.locks.ReentrantLock;

/**
* Not thread safe disk persisted queue.
Expand All @@ -28,7 +27,7 @@ public class Queue {

private final QueuePool queuePool;
private final PagedFilesAllocator.AllocationListener allocationListener;
private final ReentrantLock lock = new ReentrantLock();
// private final ReentrantLock lock = new ReentrantLock();

Queue(String name, Segment headSegment, VirtualPointer currentHeadPtr,
Segment tailSegment, VirtualPointer currentTailPtr,
Expand Down Expand Up @@ -57,8 +56,8 @@ public void enqueue(ByteBuffer payload) throws QueueException {

LOG.debug("Head segment doesn't have enough space");
// the payload can't be fully contained into the current head segment and needs to be splitted
// with another segment. To request the next segment, it's needed to be done in global lock.
lock.lock();
// with another segment.


final int dataSize = payload.remaining();
final ByteBuffer rawData = (ByteBuffer) ByteBuffer.allocate(LENGTH_HEADER_SIZE + dataSize)
Expand All @@ -83,27 +82,25 @@ public void enqueue(ByteBuffer payload) throws QueueException {
}

Segment newSegment = null;
try {
// till the payload is not completely stored,
// save the remaining part into a new segment.
while (rawData.hasRemaining()) {
newSegment = queuePool.nextFreeSegment();
//notify segment creation for queue in queue pool
allocationListener.segmentedCreated(name, newSegment);

int copySize = (int) Math.min(rawData.remaining(), Segment.SIZE);
ByteBuffer slice = rawData.slice();
slice.limit(copySize);

currentHeadPtr = currentHeadPtr.moveForward(copySize);
writeDataNoHeader(newSegment, newSegment.begin, slice);
headSegment = newSegment;

// shift forward the consumption point
rawData.position(rawData.position() + copySize);
}
} finally {
lock.unlock();

// till the payload is not completely stored,
// save the remaining part into a new segment.
while (rawData.hasRemaining()) {
// To request the next segment, it's needed to be done in global lock.
newSegment = queuePool.nextFreeSegment();
//notify segment creation for queue in queue pool
allocationListener.segmentedCreated(name, newSegment);

int copySize = (int) Math.min(rawData.remaining(), Segment.SIZE);
ByteBuffer slice = rawData.slice();
slice.limit(copySize);

currentHeadPtr = currentHeadPtr.moveForward(copySize);
writeDataNoHeader(newSegment, newSegment.begin, slice);
headSegment = newSegment;

// shift forward the consumption point
rawData.position(rawData.position() + copySize);
}
}

Expand Down Expand Up @@ -193,30 +190,20 @@ public Optional<ByteBuffer> dequeue() throws QueueException {
return Optional.of(readData(tailSegment, dataStart, payloadLength));
} else {
// payload is split across currentSegment and next ones
lock.lock();
try {
VirtualPointer dataStart = existingTail.moveForward(LENGTH_HEADER_SIZE);

LOG.debug("Loading payload size {}", payloadLength);
return Optional.of(loadPayloadFromSegments(payloadLength, tailSegment, dataStart));
} finally {
lock.unlock();
}
VirtualPointer dataStart = existingTail.moveForward(LENGTH_HEADER_SIZE);

LOG.debug("Loading payload size {}", payloadLength);
return Optional.of(loadPayloadFromSegments(payloadLength, tailSegment, dataStart));
}
} else {
// header is split across 2 segments
lock.lock();
try {
// the currentSegment is still the tailSegment
// read the length header that's crossing 2 segments
final CrossSegmentHeaderResult result = decodeCrossHeader(tailSegment, currentTailPtr);

// load all payload parts from the segments
LOG.debug("Loading payload size {}", result.payloadLength);
return Optional.of(loadPayloadFromSegments(result.payloadLength, result.segment, result.pointer));
} finally {
lock.unlock();
}
// the currentSegment is still the tailSegment
// read the length header that's crossing 2 segments
final CrossSegmentHeaderResult result = decodeCrossHeader(tailSegment, currentTailPtr);

// load all payload parts from the segments
LOG.debug("Loading payload size {}", result.payloadLength);
return Optional.of(loadPayloadFromSegments(result.payloadLength, result.segment, result.pointer));
}
}

Expand Down
41 changes: 29 additions & 12 deletions broker/src/main/java/io/moquette/broker/unsafequeues/QueuePool.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import static io.moquette.broker.unsafequeues.PagedFilesAllocator.PAGE_SIZE;
Expand Down Expand Up @@ -97,6 +98,7 @@ public String toString() {
private final ConcurrentMap<QueueName, LinkedList<SegmentRef>> queueSegments = new ConcurrentHashMap<>();
private final ConcurrentMap<QueueName, Queue> queues = new ConcurrentHashMap<>();
private final ConcurrentSkipListSet<SegmentRef> recycledSegments = new ConcurrentSkipListSet<>();
private final ReentrantLock segmentsAllocationLock = new ReentrantLock();

private QueuePool(SegmentAllocator allocator, Path dataPath, int segmentSize) {
this.allocator = allocator;
Expand Down Expand Up @@ -249,7 +251,12 @@ private void loadRecycledSegments(Properties checkpointProps) throws QueueExcept

final List<SegmentRef> recreatedSegments = recreateSegmentHoles(usedSegments);

recycledSegments.addAll(recreatedSegments);
segmentsAllocationLock.lock();
try {
recycledSegments.addAll(recreatedSegments);
} finally {
segmentsAllocationLock.unlock();
}
}

/**
Expand Down Expand Up @@ -428,7 +435,7 @@ Segment openNextTailSegment(String name) throws QueueException {

final Path pageFile = dataPath.resolve(String.format("%d.page", pollSegment.pageId));
if (!Files.exists(pageFile)) {
throw new QueueException("Can't find file for page file" + pageFile);
throw new QueueException("Can't find file for page file" + pageFile);
}

final MappedByteBuffer tailPage;
Expand All @@ -451,19 +458,29 @@ void consumedTailSegment(String name) {
final LinkedList<SegmentRef> segmentRefs = queueSegments.get(queueName);
final SegmentRef segmentRef = segmentRefs.pollLast();
LOG.debug("Consumed tail segment {} from queue {}", segmentRef, queueName);
recycledSegments.add(segmentRef);
segmentsAllocationLock.lock();
try {
recycledSegments.add(segmentRef);
} finally {
segmentsAllocationLock.unlock();
}
}

Segment nextFreeSegment() throws QueueException {
if (recycledSegments.isEmpty()) {
LOG.debug("no recycled segments available, request the creation of new one");
return allocator.nextFreeSegment();
}
final SegmentRef recycledSegment = recycledSegments.pollFirst();
if (recycledSegment == null) {
throw new QueueException("Invalid state, expected available recycled segment");
segmentsAllocationLock.lock();
try {
if (recycledSegments.isEmpty()) {
LOG.debug("no recycled segments available, request the creation of new one");
return allocator.nextFreeSegment();
}
final SegmentRef recycledSegment = recycledSegments.pollFirst();
if (recycledSegment == null) {
throw new QueueException("Invalid state, expected available recycled segment");
}
LOG.debug("Reusing recycled segment from page: {} at page offset: {}", recycledSegment.pageId, recycledSegment.offset);
return allocator.reopenSegment(recycledSegment.pageId, recycledSegment.offset);
} finally {
segmentsAllocationLock.unlock();
}
LOG.debug("Reusing recycled segment from page: {} at page offset: {}", recycledSegment.pageId, recycledSegment.offset);
return allocator.reopenSegment(recycledSegment.pageId, recycledSegment.offset);
}
}

0 comments on commit 8043765

Please sign in to comment.