diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 7071556460d3a..c038a2ba7be99 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -23,13 +23,16 @@ import ( "sync" "time" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/eventlog" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -156,6 +159,16 @@ func (m *CollectionManager) Recover(broker Broker) error { continue } + err := m.upgradeLoadFields(collection, broker) + if err != nil { + if errors.Is(err, merr.ErrCollectionNotFound) { + log.Warn("collection not found, skip upgrade logic and wait for release") + } else { + log.Warn("upgrade load field failed", zap.Error(err)) + return err + } + } + m.collections[collection.CollectionID] = &Collection{ CollectionLoadInfo: collection, } @@ -194,6 +207,36 @@ func (m *CollectionManager) Recover(broker Broker) error { return nil } +func (m *CollectionManager) upgradeLoadFields(collection *querypb.CollectionLoadInfo, broker Broker) error { + // only fill load fields when value is nil + if collection.LoadFields != nil { + return nil + } + + // invoke describe collection to get collection schema + resp, err := broker.DescribeCollection(context.Background(), collection.CollectionID) + if err := merr.CheckRPCCall(resp, err); err != nil { + return err + } + + // fill all field id as legacy default behavior + collection.LoadFields = lo.FilterMap(resp.GetSchema().GetFields(), func(fieldSchema *schemapb.FieldSchema, _ int) (int64, bool) { + // load fields list excludes system fields + return fieldSchema.GetFieldID(), !common.IsSystemField(fieldSchema.GetFieldID()) + }) + + // put updated meta back to store + err = m.putCollection(true, &Collection{ + CollectionLoadInfo: collection, + LoadPercentage: 100, + }) + if err != nil { + return err + } + + return nil +} + // upgradeRecover recovers from old version <= 2.2.x for compatibility. func (m *CollectionManager) upgradeRecover(broker Broker) error { // for loaded collection from 2.2, it only save a old version CollectionLoadInfo without LoadType. diff --git a/internal/querycoordv2/meta/collection_manager_test.go b/internal/querycoordv2/meta/collection_manager_test.go index 320075bb977c0..30f0f7b958e6b 100644 --- a/internal/querycoordv2/meta/collection_manager_test.go +++ b/internal/querycoordv2/meta/collection_manager_test.go @@ -26,14 +26,18 @@ import ( "github.com/stretchr/testify/suite" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/querycoord" "github.com/milvus-io/milvus/internal/proto/querypb" . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) @@ -494,6 +498,17 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() { if suite.loadTypes[i] == querypb.LoadType_LoadCollection { suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil) } + suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField}, + {FieldID: common.TimeStampField}, + {FieldID: 100, Name: "pk"}, + {FieldID: 101, Name: "vector"}, + }, + }, + }, nil).Maybe() } // do recovery @@ -508,6 +523,131 @@ func (suite *CollectionManagerSuite) TestUpgradeRecover() { } } +func (suite *CollectionManagerSuite) TestUpgradeLoadFields() { + suite.releaseAll() + mgr := suite.mgr + + // put old version of collections and partitions + for i, collection := range suite.collections { + mgr.PutCollection(&Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: collection, + ReplicaNumber: suite.replicaNumber[i], + Status: querypb.LoadStatus_Loaded, + LoadType: suite.loadTypes[i], + LoadFields: nil, // use nil Load fields, mocking old load info + }, + LoadPercentage: 100, + CreatedAt: time.Now(), + }) + for j, partition := range suite.partitions[collection] { + mgr.PutPartition(&Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + CollectionID: collection, + PartitionID: partition, + Status: querypb.LoadStatus_Loaded, + }, + LoadPercentage: suite.parLoadPercent[collection][j], + CreatedAt: time.Now(), + }) + } + } + + // set expectations + for _, collection := range suite.collections { + suite.broker.EXPECT().DescribeCollection(mock.Anything, collection).Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Success(), + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + {FieldID: common.RowIDField}, + {FieldID: common.TimeStampField}, + {FieldID: 100, Name: "pk"}, + {FieldID: 101, Name: "vector"}, + }, + }, + }, nil) + } + + // do recovery + suite.clearMemory() + err := mgr.Recover(suite.broker) + suite.NoError(err) + suite.checkLoadResult() + + for _, collection := range suite.collections { + newColl := mgr.GetCollection(collection) + suite.ElementsMatch([]int64{100, 101}, newColl.GetLoadFields()) + } +} + +func (suite *CollectionManagerSuite) TestUpgradeLoadFieldsFail() { + suite.Run("normal_error", func() { + suite.releaseAll() + mgr := suite.mgr + + mgr.PutCollection(&Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 100, + ReplicaNumber: 1, + Status: querypb.LoadStatus_Loaded, + LoadType: querypb.LoadType_LoadCollection, + LoadFields: nil, // use nil Load fields, mocking old load info + }, + LoadPercentage: 100, + CreatedAt: time.Now(), + }) + mgr.PutPartition(&Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + CollectionID: 100, + PartitionID: 1000, + Status: querypb.LoadStatus_Loaded, + }, + LoadPercentage: 100, + CreatedAt: time.Now(), + }) + + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(nil, merr.WrapErrServiceInternal("mocked")).Once() + // do recovery + suite.clearMemory() + err := mgr.Recover(suite.broker) + suite.Error(err) + }) + + suite.Run("normal_error", func() { + suite.releaseAll() + mgr := suite.mgr + + mgr.PutCollection(&Collection{ + CollectionLoadInfo: &querypb.CollectionLoadInfo{ + CollectionID: 100, + ReplicaNumber: 1, + Status: querypb.LoadStatus_Loaded, + LoadType: querypb.LoadType_LoadCollection, + LoadFields: nil, // use nil Load fields, mocking old load info + }, + LoadPercentage: 100, + CreatedAt: time.Now(), + }) + mgr.PutPartition(&Partition{ + PartitionLoadInfo: &querypb.PartitionLoadInfo{ + CollectionID: 100, + PartitionID: 1000, + Status: querypb.LoadStatus_Loaded, + }, + LoadPercentage: 100, + CreatedAt: time.Now(), + }) + + suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(100)).Return(&milvuspb.DescribeCollectionResponse{ + Status: merr.Status(merr.WrapErrCollectionNotFound(100)), + }, nil).Once() + // do recovery + suite.clearMemory() + err := mgr.Recover(suite.broker) + suite.NoError(err) + }) +} + func (suite *CollectionManagerSuite) loadAll() { mgr := suite.mgr @@ -523,6 +663,7 @@ func (suite *CollectionManagerSuite) loadAll() { ReplicaNumber: suite.replicaNumber[i], Status: status, LoadType: suite.loadTypes[i], + LoadFields: []int64{100, 101}, }, LoadPercentage: suite.colLoadPercent[i], CreatedAt: time.Now(), diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index ca3a84b83965e..570482982cbd4 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -439,6 +439,7 @@ func (suite *ServerSuite) loadAll() { CollectionID: collection, ReplicaNumber: suite.replicaNumber[collection], ResourceGroups: []string{meta.DefaultResourceGroupName}, + LoadFields: []int64{100, 101}, } resp, err := suite.server.LoadCollection(ctx, req) suite.NoError(err) @@ -449,6 +450,7 @@ func (suite *ServerSuite) loadAll() { PartitionIDs: suite.partitions[collection], ReplicaNumber: suite.replicaNumber[collection], ResourceGroups: []string{meta.DefaultResourceGroupName}, + LoadFields: []int64{100, 101}, } resp, err := suite.server.LoadPartitions(ctx, req) suite.NoError(err)