Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persisted unsafe queues implementation #691

Merged
merged 14 commits into from
Jan 15, 2023

Conversation

andsel
Copy link
Collaborator

@andsel andsel commented Nov 23, 2022

Release notes

Implements a disk persistent queues system that is not thread safe.

What it does?

Create the implementation of segmented queues. The data is stored in mmapped memory pages, where each page is divided in segments. Each queue own a list of segments. The access to the each segment is intended to be done by the same thread, so no serialization is needed. The only critical section, guarded by a lock, is the interaction to require or release segment.

@hylkevds
Copy link
Collaborator

This work looks really interesting. How far do you think you are till the first integration with Moquette?

@andsel
Copy link
Collaborator Author

andsel commented Nov 23, 2022

I think we are close, just want to change the API of dequeue to return Optional.empty() instead of null to signal the empty queue condition.

@andsel
Copy link
Collaborator Author

andsel commented Nov 23, 2022

Then integrate into Moquette, with feature flag to enable it and create a new release.

@hylkevds
Copy link
Collaborator

I'm looking forward to performance-testing it in combination with the work from #608 (and #648). That should greatly increase the stability of long-running servers.

Copy link
Collaborator

@hylkevds hylkevds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not gone though the complete code yet, but added some intermediate questions...

* */
public Optional<ByteBuffer> dequeue() throws QueueException {
if (!currentHeadPtr.isGreaterThan(currentTailPtr)) {
if (currentTailPtr.compareTo(currentHeadPtr) > 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry, could use currentTailPtr.isGreaterThan(currentHeadPtr) instead of a direct compare.

private CrossSegmentHeaderResult decodeCrossHeader(Segment segment, VirtualPointer pointer) throws QueueException {
// read first part
ByteBuffer lengthBuffer = ByteBuffer.allocate(LENGTH_HEADER_SIZE);
final ByteBuffer partialHeader = segment.readAllBytesAfter(pointer);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be significantly more efficient to make the segment read methods accept an existing Buffer to put the data into, instead of having those read methods create new buffers that are then copied into the already existing buffer?


private final QueuePool queuePool;
private final PagedFilesAllocator.AllocationListener allocationListener;
private final ReentrantLock lock = new ReentrantLock();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the queue is explicitly not thread-safe, then what is the function of this lock? It's only used in the (de)queue methods, when reading cross-segment data. The comments call it a global lock, but this seems to be a queue-local lock... I would expect a global lock to be instantiated in the QueuePool. But there it should be enough to shortly lock the QueuePool in the nextFreeSegment() method.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that to correctly manage the handling of the queue segments the data structures used to track those has to be accessed serially.
The data structures that manage the segment allocation, free and reassignment are:

  • queueSegments
  • recycledSegments
    in QueuePool class.

The access should be serialized only operating phase, when various session threads ask for next free segment.
However we can't leave uncovered by lock in other cases, like queue definition loads, during QueuePool instantiation, during close or during the retrieval of next tail segment of a queue.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I'm little wrong. The lock should guard only the creation or reuse of a Segment, so only recycledSegments are part of the critical section, not queueSegments.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand the following correctly:

  • Each single Queue is only accessed by a single Thread
  • The QueuePool assigns segments to Queues
  • Once a Segment is assigned to a Queue, only this queue touches the segment
  • A queue can return a segment to the Pool. It is then added to recycledSegments.

I suspect the lock is only needed when:

  • a Queue requests a segment from the pool, since it would be bad if two Queues get the same segment.
  • closing/deleting an entire page (if that ever happens) since it would be bad if a page is deleted with a segment that a Queue just received.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you understand right

Copy link
Collaborator

@hylkevds hylkevds left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy new Year!
Looks good. I only noticed one unused field :)

I guess the next step is to implement the different repositories based on this?


private static final Logger LOG = LoggerFactory.getLogger(QueuePool.class);

static final boolean queueDebug = Boolean.parseBoolean(System.getProperty("moquette.queue.debug", "false"));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused field?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, no, it's used in two other classes.

hylkevds
hylkevds previously approved these changes Jan 12, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants