Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8955] Fix message buffer not release and dispatch thread exit in tiered storage #8965

Merged
merged 1 commit into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,10 @@ public void dispatchWithSemaphore(FlatFileInterface flatFile) {
semaphore.acquire();
this.doScheduleDispatch(flatFile, false)
.whenComplete((future, throwable) -> semaphore.release());
} catch (InterruptedException e) {
} catch (Throwable t) {
semaphore.release();
log.error("MessageStore dispatch error, topic={}, queueId={}",
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
}
}

Expand Down Expand Up @@ -156,25 +158,22 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,
}

if (currentOffset < minOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too small, " +
"topic={}, queueId={}, offset={}-{}, current={}",
log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
flatFileStore.destroyFile(flatFile.getMessageQueue());
flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId));
return CompletableFuture.completedFuture(true);
}

if (currentOffset > maxOffsetInQueue) {
log.warn("MessageDispatcher#dispatch, current offset is too large, " +
"topic: {}, queueId: {}, offset={}-{}, current={}",
log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
return CompletableFuture.completedFuture(false);
}

long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval());
if (flatFile.rollingFile(interval)) {
log.info("MessageDispatcher#dispatch, rolling file, " +
"topic: {}, queueId: {}, offset={}-{}, current={}",
log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset);
}

Expand All @@ -189,27 +188,40 @@ public CompletableFuture<Boolean> doScheduleDispatch(FlatFileInterface flatFile,

ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId);
CqUnit cqUnit = consumeQueue.get(currentOffset);
if (cqUnit == null) {
log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}

SelectMappedBufferResult message =
defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize());
if (message == null) {
log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
return CompletableFuture.completedFuture(false);
}

boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis();
boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount();

if (!timeout && !bufferFull && !force) {
log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
message.release();
return CompletableFuture.completedFuture(false);
} else {
if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) +
TimeUnit.MINUTES.toMillis(5) < System.currentTimeMillis()) {
log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
} else {
log.info("MessageDispatcher#dispatch, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}",
topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset);
}
message.release();
}
message.release();

long offset = currentOffset;
for (; offset < targetOffset; offset++) {
Expand Down Expand Up @@ -279,7 +291,7 @@ public CompletableFuture<Void> commitAsync(FlatFileInterface flatFile) {
}
flatFile.release();
}
}, MessageStoreExecutor.getInstance().bufferCommitExecutor);
}, storeExecutor.bufferCommitExecutor);
}

/**
Expand All @@ -301,8 +313,12 @@ public void constructIndexFile(long topicId, DispatchRequest request) {
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
this.waitForRunning(Duration.ofSeconds(20).toMillis());
try {
flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore);
this.waitForRunning(Duration.ofSeconds(20).toMillis());
} catch (Throwable t) {
log.error("MessageStore dispatch error", t);
}
}
log.info("{} service shutdown", this.getServiceName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.AppendResult;
import org.apache.rocketmq.tieredstore.provider.FileSegment;
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
Expand Down Expand Up @@ -261,56 +260,55 @@ public CompletableFuture<List<IndexItem>> queryAsync(
protected CompletableFuture<List<IndexItem>> queryAsyncFromUnsealedFile(
String key, int maxCount, long beginTime, long endTime) {

return CompletableFuture.supplyAsync(() -> {
List<IndexItem> result = new ArrayList<>();
try {
fileReadWriteLock.readLock().lock();
if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
return result;
}
List<IndexItem> result = new ArrayList<>();
try {
fileReadWriteLock.readLock().lock();
if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) {
return CompletableFuture.completedFuture(result);
}

if (mappedFile == null || !mappedFile.hold()) {
return result;
}
if (mappedFile == null || !mappedFile.hold()) {
return CompletableFuture.completedFuture(result);
}

int hashCode = this.hashCode(key);
int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
int slotValue = this.getSlotValue(slotPosition);
int hashCode = this.hashCode(key);
int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount);
int slotValue = this.getSlotValue(slotPosition);

int left = MAX_QUERY_COUNT;
while (left > 0 &&
slotValue > INVALID_INDEX &&
slotValue <= this.indexItemCount.get()) {
int left = MAX_QUERY_COUNT;
while (left > 0 &&
slotValue > INVALID_INDEX &&
slotValue <= this.indexItemCount.get()) {

byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
ByteBuffer buffer = this.byteBuffer.duplicate();
buffer.position(this.getItemPosition(slotValue));
buffer.get(bytes);
IndexItem indexItem = new IndexItem(bytes);
long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
if (hashCode == indexItem.getHashCode() &&
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
result.add(indexItem);
if (result.size() > maxCount) {
break;
}
byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE];
ByteBuffer buffer = this.byteBuffer.duplicate();
buffer.position(this.getItemPosition(slotValue));
buffer.get(bytes);
IndexItem indexItem = new IndexItem(bytes);
long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get();
if (hashCode == indexItem.getHashCode() &&
beginTime <= storeTimestamp && storeTimestamp <= endTime) {
result.add(indexItem);
if (result.size() > maxCount) {
break;
}
slotValue = indexItem.getItemIndex();
left--;
}

log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
"key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
} catch (Exception e) {
log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
"key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
mappedFile.release();
slotValue = indexItem.getItemIndex();
left--;
}
return result;
}, MessageStoreExecutor.getInstance().bufferFetchExecutor);

log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " +
"key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}",
getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime);
} catch (Exception e) {
log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " +
"key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e);
} finally {
fileReadWriteLock.readLock().unlock();
mappedFile.release();
}

return CompletableFuture.completedFuture(result);
}

protected CompletableFuture<List<IndexItem>> queryAsyncFromSegmentFile(
Expand Down Expand Up @@ -465,7 +463,7 @@ public void shutdown() {
fileReadWriteLock.writeLock().lock();
this.fileStatus.set(IndexStatusEnum.SHUTDOWN);
if (this.fileSegment != null && this.fileSegment instanceof PosixFileSegment) {
((PosixFileSegment) this.fileSegment).close();
this.fileSegment.close();
}
if (this.mappedFile != null) {
this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.MessageStoreExecutor;
import org.apache.rocketmq.tieredstore.common.FileSegmentType;
import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream;
Expand Down Expand Up @@ -230,6 +229,6 @@ public CompletableFuture<Boolean> commit0(
return false;
}
return true;
}, MessageStoreExecutor.getInstance().bufferCommitExecutor);
});
}
}
Loading