From 483eb98d188080fb2ccb4ae849fcdb45ebd718f9 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Fri, 2 Aug 2024 17:08:25 +0800 Subject: [PATCH] fix: sequence overflow when dropping a table using a message queue as WAL (#1550) ## Rationale Fix the issue of sequence overflow when dropping a table using a message queue as WAL. close #1543 ## Detailed Changes Check the maximum value of sequence to prevent overflow. ## Test Plan CI. --- src/wal/src/message_queue_impl/wal.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/wal/src/message_queue_impl/wal.rs b/src/wal/src/message_queue_impl/wal.rs index 89876cbf12..293f4a9d6d 100644 --- a/src/wal/src/message_queue_impl/wal.rs +++ b/src/wal/src/message_queue_impl/wal.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use async_trait::async_trait; -use common_types::SequenceNumber; +use common_types::{SequenceNumber, MAX_SEQUENCE_NUMBER}; use generic_error::BoxError; use message_queue::{kafka::kafka_impl::KafkaImpl, ConsumeIterator, MessageQueue}; use runtime::Runtime; @@ -70,8 +70,13 @@ impl WalManager for MessageQueueImpl { location: WalLocation, sequence_num: SequenceNumber, ) -> Result<()> { + let delete_to_sequence_num = if sequence_num < MAX_SEQUENCE_NUMBER { + sequence_num + 1 + } else { + MAX_SEQUENCE_NUMBER + }; self.0 - .mark_delete_to(location, sequence_num + 1) + .mark_delete_to(location, delete_to_sequence_num) .await .box_err() .context(Delete)