diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java index 982909c5ee5..9b1e53564d7 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreDispatcherImpl.java @@ -92,8 +92,10 @@ public void dispatchWithSemaphore(FlatFileInterface flatFile) { semaphore.acquire(); this.doScheduleDispatch(flatFile, false) .whenComplete((future, throwable) -> semaphore.release()); - } catch (InterruptedException e) { + } catch (Throwable t) { semaphore.release(); + log.error("MessageStore dispatch error, topic={}, queueId={}", + flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t); } } @@ -156,8 +158,7 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, } if (currentOffset < minOffsetInQueue) { - log.warn("MessageDispatcher#dispatch, current offset is too small, " + - "topic={}, queueId={}, offset={}-{}, current={}", + log.warn("MessageDispatcher#dispatch, current offset is too small, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); flatFileStore.destroyFile(flatFile.getMessageQueue()); flatFileStore.computeIfAbsent(new MessageQueue(topic, brokerName, queueId)); @@ -165,16 +166,14 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, } if (currentOffset > maxOffsetInQueue) { - log.warn("MessageDispatcher#dispatch, current offset is too large, " + - "topic: {}, queueId: {}, offset={}-{}, current={}", + log.warn("MessageDispatcher#dispatch, current offset is too large, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); return CompletableFuture.completedFuture(false); } long interval = TimeUnit.HOURS.toMillis(storeConfig.getCommitLogRollingInterval()); if (flatFile.rollingFile(interval)) { - log.info("MessageDispatcher#dispatch, rolling file, " + - "topic: {}, queueId: {}, offset={}-{}, current={}", + log.info("MessageDispatcher#dispatch, rolling file, topic={}, queueId={}, offset={}-{}, current={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset); } @@ -189,8 +188,20 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, ConsumeQueueInterface consumeQueue = defaultStore.getConsumeQueue(topic, queueId); CqUnit cqUnit = consumeQueue.get(currentOffset); + if (cqUnit == null) { + log.warn("MessageDispatcher#dispatch cq not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + return CompletableFuture.completedFuture(false); + } + SelectMappedBufferResult message = defaultStore.selectOneMessageByOffset(cqUnit.getPos(), cqUnit.getSize()); + if (message == null) { + log.warn("MessageDispatcher#dispatch message not found, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + return CompletableFuture.completedFuture(false); + } + boolean timeout = MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) + storeConfig.getTieredStoreGroupCommitTimeout() < System.currentTimeMillis(); boolean bufferFull = maxOffsetInQueue - currentOffset > storeConfig.getTieredStoreGroupCommitCount(); @@ -198,6 +209,7 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, if (!timeout && !bufferFull && !force) { log.debug("MessageDispatcher#dispatch hold, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); + message.release(); return CompletableFuture.completedFuture(false); } else { if (MessageFormatUtil.getStoreTimeStamp(message.getByteBuffer()) + @@ -205,11 +217,11 @@ public CompletableFuture doScheduleDispatch(FlatFileInterface flatFile, log.warn("MessageDispatcher#dispatch behind too much, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); } else { - log.info("MessageDispatcher#dispatch, topic={}, queueId={}, offset={}-{}, current={}, remain={}", + log.info("MessageDispatcher#dispatch success, topic={}, queueId={}, offset={}-{}, current={}, remain={}", topic, queueId, minOffsetInQueue, maxOffsetInQueue, currentOffset, maxOffsetInQueue - currentOffset); } + message.release(); } - message.release(); long offset = currentOffset; for (; offset < targetOffset; offset++) { @@ -279,7 +291,7 @@ public CompletableFuture commitAsync(FlatFileInterface flatFile) { } flatFile.release(); } - }, MessageStoreExecutor.getInstance().bufferCommitExecutor); + }, storeExecutor.bufferCommitExecutor); } /** @@ -301,8 +313,12 @@ public void constructIndexFile(long topicId, DispatchRequest request) { public void run() { log.info("{} service started", this.getServiceName()); while (!this.isStopped()) { - flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); - this.waitForRunning(Duration.ofSeconds(20).toMillis()); + try { + flatFileStore.deepCopyFlatFileToList().forEach(this::dispatchWithSemaphore); + this.waitForRunning(Duration.ofSeconds(20).toMillis()); + } catch (Throwable t) { + log.error("MessageStore dispatch error", t); + } } log.info("{} service shutdown", this.getServiceName()); } diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java index f9604b43e6f..25cd634873d 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreFile.java @@ -38,7 +38,6 @@ import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.logfile.MappedFile; import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.common.AppendResult; import org.apache.rocketmq.tieredstore.provider.FileSegment; import org.apache.rocketmq.tieredstore.provider.PosixFileSegment; @@ -261,56 +260,55 @@ public CompletableFuture> queryAsync( protected CompletableFuture> queryAsyncFromUnsealedFile( String key, int maxCount, long beginTime, long endTime) { - return CompletableFuture.supplyAsync(() -> { - List result = new ArrayList<>(); - try { - fileReadWriteLock.readLock().lock(); - if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) { - return result; - } + List result = new ArrayList<>(); + try { + fileReadWriteLock.readLock().lock(); + if (!UNSEALED.equals(this.fileStatus.get()) && !SEALED.equals(this.fileStatus.get())) { + return CompletableFuture.completedFuture(result); + } - if (mappedFile == null || !mappedFile.hold()) { - return result; - } + if (mappedFile == null || !mappedFile.hold()) { + return CompletableFuture.completedFuture(result); + } - int hashCode = this.hashCode(key); - int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); - int slotValue = this.getSlotValue(slotPosition); + int hashCode = this.hashCode(key); + int slotPosition = this.getSlotPosition(hashCode % this.hashSlotMaxCount); + int slotValue = this.getSlotValue(slotPosition); - int left = MAX_QUERY_COUNT; - while (left > 0 && - slotValue > INVALID_INDEX && - slotValue <= this.indexItemCount.get()) { + int left = MAX_QUERY_COUNT; + while (left > 0 && + slotValue > INVALID_INDEX && + slotValue <= this.indexItemCount.get()) { - byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE]; - ByteBuffer buffer = this.byteBuffer.duplicate(); - buffer.position(this.getItemPosition(slotValue)); - buffer.get(bytes); - IndexItem indexItem = new IndexItem(bytes); - long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); - if (hashCode == indexItem.getHashCode() && - beginTime <= storeTimestamp && storeTimestamp <= endTime) { - result.add(indexItem); - if (result.size() > maxCount) { - break; - } + byte[] bytes = new byte[IndexItem.INDEX_ITEM_SIZE]; + ByteBuffer buffer = this.byteBuffer.duplicate(); + buffer.position(this.getItemPosition(slotValue)); + buffer.get(bytes); + IndexItem indexItem = new IndexItem(bytes); + long storeTimestamp = indexItem.getTimeDiff() + beginTimestamp.get(); + if (hashCode == indexItem.getHashCode() && + beginTime <= storeTimestamp && storeTimestamp <= endTime) { + result.add(indexItem); + if (result.size() > maxCount) { + break; } - slotValue = indexItem.getItemIndex(); - left--; } - - log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " + - "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", - getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime); - } catch (Exception e) { - log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " + - "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e); - } finally { - fileReadWriteLock.readLock().unlock(); - mappedFile.release(); + slotValue = indexItem.getItemIndex(); + left--; } - return result; - }, MessageStoreExecutor.getInstance().bufferFetchExecutor); + + log.debug("IndexStoreFile query from unsealed mapped file, timestamp: {}, result size: {}, " + + "key: {}, hashCode: {}, maxCount: {}, timestamp={}-{}", + getTimestamp(), result.size(), key, hashCode, maxCount, beginTime, endTime); + } catch (Exception e) { + log.error("IndexStoreFile query from unsealed mapped file error, timestamp: {}, " + + "key: {}, maxCount: {}, timestamp={}-{}", getTimestamp(), key, maxCount, beginTime, endTime, e); + } finally { + fileReadWriteLock.readLock().unlock(); + mappedFile.release(); + } + + return CompletableFuture.completedFuture(result); } protected CompletableFuture> queryAsyncFromSegmentFile( @@ -465,7 +463,7 @@ public void shutdown() { fileReadWriteLock.writeLock().lock(); this.fileStatus.set(IndexStatusEnum.SHUTDOWN); if (this.fileSegment != null && this.fileSegment instanceof PosixFileSegment) { - ((PosixFileSegment) this.fileSegment).close(); + this.fileSegment.close(); } if (this.mappedFile != null) { this.mappedFile.shutdown(TimeUnit.SECONDS.toMillis(10)); diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java index fb150c928cf..656af2ba1c6 100644 --- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java +++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java @@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.tieredstore.MessageStoreConfig; -import org.apache.rocketmq.tieredstore.MessageStoreExecutor; import org.apache.rocketmq.tieredstore.common.FileSegmentType; import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager; import org.apache.rocketmq.tieredstore.stream.FileSegmentInputStream; @@ -230,6 +229,6 @@ public CompletableFuture commit0( return false; } return true; - }, MessageStoreExecutor.getInstance().bufferCommitExecutor); + }); } }