Skip to content

Commit

Permalink
enhance: [cherry-pick] Refine code for GetRecoveryInfo (#34974)
Browse files Browse the repository at this point in the history
issue: #34495 

master pr: #34973

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Jul 24, 2024
1 parent 597362e commit 12a24c3
Show file tree
Hide file tree
Showing 2 changed files with 650 additions and 94 deletions.
197 changes: 103 additions & 94 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,117 +119,126 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
levelZeroIDs = make(typeutil.UniqueSet)
)

for _, partitionID := range validPartitions {
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())

segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
}

segmentInfos := make(map[int64]*SegmentInfo)
indexedSegments := FilterInIndexedSegments(h, h.s.meta, segments...)
indexed := make(typeutil.UniqueSet)
for _, segment := range indexedSegments {
indexed.Insert(segment.GetID())
unIndexedIDs := make(typeutil.UniqueSet)

for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
if s.GetIsImporting() {
// Skip bulk insert segments.
continue
}
log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion),
)
unIndexedIDs := make(typeutil.UniqueSet)

for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
if s.GetIsImporting() {
// Skip bulk insert segments.
continue
}
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}
segmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
indexedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
indexedIDs.Insert(s.GetID())
case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), s.GetPartitionID(), channel.GetName())
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}

segmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
indexedIDs.Insert(s.GetID())
default:
unIndexedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case indexed.Contain(s.GetID()):
indexedIDs.Insert(s.GetID())
case s.GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64(): // treat small flushed segment as indexed
indexedIDs.Insert(s.GetID())
default:
unIndexedIDs.Insert(s.GetID())
}
}

// ================================================
// Segments blood relationship:
// a b
// \ /
// c d
// \ /
// e
//
// GC: a, b
// Indexed: c, d, e
// ||
// || (Index dropped and creating new index and not finished)
// \/
// UnIndexed: c, d, e
//
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := segmentInfos[id]; !ok || seg == nil {
return false
}
// ================================================
// Segments blood relationship:
// a b
// \ /
// c d
// \ /
// e
//
// GC: a, b
// Indexed: c, d, e
// ||
// || (Index dropped and creating new index and not finished)
// \/
// UnIndexed: c, d, e
//
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := segmentInfos[id]; !ok || seg == nil {
return false
}
return true
}
retrieveUnIndexed := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
compactionFrom := segmentInfos[id].GetCompactionFrom()
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
for _, fromID := range compactionFrom {
if indexed.Contain(fromID) {
indexedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
return true
}
retrieveUnIndexed := func() bool {
continueRetrieve := false
for id := range unIndexedIDs {
compactionFrom := segmentInfos[id].GetCompactionFrom()
if len(compactionFrom) > 0 && isValid(compactionFrom...) {
for _, fromID := range compactionFrom {
if indexed.Contain(fromID) {
indexedIDs.Insert(fromID)
} else {
unIndexedIDs.Insert(fromID)
continueRetrieve = true
}
unIndexedIDs.Remove(id)
droppedIDs.Remove(compactionFrom...)
}
unIndexedIDs.Remove(id)
droppedIDs.Remove(compactionFrom...)
}
return continueRetrieve
}
for retrieveUnIndexed() {
}
return continueRetrieve
}
for retrieveUnIndexed() {
}

// unindexed is flushed segments as well
indexedIDs.Insert(unIndexedIDs.Collect()...)
// unindexed is flushed segments as well
indexedIDs.Insert(unIndexedIDs.Collect()...)

for _, partitionID := range validPartitions {
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
}

log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int("result indexed", len(indexedIDs)),
zap.Int("result unIndexed", len(unIndexedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)

return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
Expand Down
Loading

0 comments on commit 12a24c3

Please sign in to comment.