Skip to content

Commit

Permalink
[IoTConsensusV2 X Pipe Deletion]: Release deletion resource when dele…
Browse files Browse the repository at this point in the history
…te region & Improve for release when buffer is null (#14227)

* release deletion resource when delete region

* improve
  • Loading branch information
Pengzna authored Nov 28, 2024
1 parent 0bb51c6 commit 4fddd58
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.PriorityBlockingQueue;
Expand Down Expand Up @@ -142,7 +143,8 @@ public void start() {
public boolean isAllDeletionFlushed() {
buffersLock.lock();
try {
return deletionResources.isEmpty() && serializeBuffer.position() == 0;
int pos = Optional.ofNullable(serializeBuffer).map(ByteBuffer::position).orElse(0);
return deletionResources.isEmpty() && pos == 0;
} finally {
buffersLock.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager;
import org.apache.iotdb.db.protocol.client.cn.DnToCnRequestType;
Expand Down Expand Up @@ -2252,6 +2253,9 @@ public TSStatus deleteRegion(TConsensusGroupId tconsensusGroupId) {
if (consensusGroupId instanceof DataRegionId) {
try {
DataRegionConsensusImpl.getInstance().deleteLocalPeer(consensusGroupId);
Optional.ofNullable(
DeletionResourceManager.getInstance(String.valueOf(tconsensusGroupId.getId())))
.ifPresent(DeletionResourceManager::close);
} catch (ConsensusException e) {
if (!(e instanceof ConsensusGroupNotExistException)) {
return RpcUtils.getStatus(TSStatusCode.DELETE_REGION_ERROR, e.getMessage());
Expand Down

0 comments on commit 4fddd58

Please sign in to comment.