Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix collection leak in querynode (#37061) #37079

Merged
merged 1 commit into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/querynodev2/segments/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func (m *collectionManager) Unref(collectionID int64, count uint32) bool {

if collection, ok := m.collections[collectionID]; ok {
if collection.Unref(count) == 0 {
log.Info("release collection due to ref count to 0", zap.Int64("collectionID", collectionID))
log.Info("release collection due to ref count to 0",
zap.Int64("nodeID", paramtable.GetNodeID()), zap.Int64("collectionID", collectionID))
delete(m.collections, collectionID)
DeleteCollection(collection)

Expand Down Expand Up @@ -214,7 +215,8 @@ func (c *Collection) GetLoadType() querypb.LoadType {

func (c *Collection) Ref(count uint32) uint32 {
refCount := c.refCount.Add(count)
log.Debug("collection ref increment",
log.Info("collection ref increment",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", c.ID()),
zap.Uint32("refCount", refCount),
)
Expand All @@ -223,7 +225,8 @@ func (c *Collection) Ref(count uint32) uint32 {

func (c *Collection) Unref(count uint32) uint32 {
refCount := c.refCount.Sub(count)
log.Debug("collection ref decrement",
log.Info("collection ref decrement",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.Int64("collectionID", c.ID()),
zap.Uint32("refCount", refCount),
)
Expand Down
4 changes: 2 additions & 2 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,10 +368,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC

node.pipelineManager.Remove(req.GetChannelName())
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithType(segments.SegmentTypeGrowing))
node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
_, sealed := node.manager.Segment.RemoveBy(ctx, segments.WithChannel(req.GetChannelName()), segments.WithLevel(datapb.SegmentLevel_L0))
node.tSafeManager.Remove(ctx, req.GetChannelName())

node.manager.Collection.Unref(req.GetCollectionID(), 1)
node.manager.Collection.Unref(req.GetCollectionID(), uint32(1+sealed))
}
log.Info("unsubscribed channel")

Expand Down
7 changes: 6 additions & 1 deletion tests/integration/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

Expand Down Expand Up @@ -138,9 +139,13 @@ func (s *MiniClusterSuite) TearDownTest() {
if err == nil {
for idx, collectionName := range resp.GetCollectionNames() {
if resp.GetInMemoryPercentages()[idx] == 100 || resp.GetQueryServiceAvailable()[idx] {
s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{
status, err := s.Cluster.Proxy.ReleaseCollection(context.Background(), &milvuspb.ReleaseCollectionRequest{
CollectionName: collectionName,
})
err = merr.CheckRPCCall(status, err)
s.NoError(err)
collectionID := resp.GetCollectionIds()[idx]
s.CheckCollectionCacheReleased(collectionID)
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions tests/integration/util_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/testutils"
)

Expand Down Expand Up @@ -99,6 +101,36 @@
}
}

// CheckCollectionCacheReleased checks if the collection cache was released from querynodes.
func (s *MiniClusterSuite) CheckCollectionCacheReleased(collectionID int64) {
for _, qn := range s.Cluster.GetAllQueryNodes() {
s.Eventually(func() bool {
state, err := qn.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{})
s.NoError(err)
if state.GetState().GetStateCode() != commonpb.StateCode_Healthy {
// skip checking stopping/stopped node
return true
}
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
s.NoError(err)
resp, err := qn.GetQueryNode().GetMetrics(context.Background(), req)
err = merr.CheckRPCCall(resp.GetStatus(), err)
s.NoError(err)
infos := metricsinfo.QueryNodeInfos{}
err = metricsinfo.UnmarshalComponentInfos(resp.Response, &infos)
s.NoError(err)
for _, id := range infos.QuotaMetrics.Effect.CollectionIDs {
if id == collectionID {
s.T().Logf("collection %d was not released in querynode %d", collectionID, qn.GetQueryNode().GetNodeID())
return false
}

Check warning on line 126 in tests/integration/util_query.go

View check run for this annotation

Codecov / codecov/patch

tests/integration/util_query.go#L124-L126

Added lines #L124 - L126 were not covered by tests
}
s.T().Logf("collection %d has been released from querynode %d", collectionID, qn.GetQueryNode().GetNodeID())
return true
}, 3*time.Minute, 200*time.Millisecond)
}
}

func ConstructSearchRequest(
dbName, collectionName string,
expr string,
Expand Down
Loading