Skip to content

Commit

Permalink
Set current partition stats version to 0 by default when not present
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Oct 30, 2024
1 parent 3a34046 commit 2632721
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 2 deletions.
6 changes: 6 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,12 @@ func (t *clusteringCompactionTask) completeTask() error {
return err
}

err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetTaskProto().GetCollectionID(),
t.GetTaskProto().GetPartitionID(), t.GetTaskProto().GetChannel(), t.GetTaskProto().GetPlanID())
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}

return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
}

Expand Down
2 changes: 0 additions & 2 deletions internal/datacoord/partition_stats_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat
}

psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos[info.GetVersion()] = info
// after v2.5.0, the current version will be updated when updating the partition stats info, so there’s no need to split it into two steps
psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].currentVersion = info.Version
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/partition_stats_meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (s *PartitionStatsMetaSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SavePartitionStatsInfo(mock.Anything, mock.Anything).Return(nil).Maybe()
catalog.EXPECT().ListPartitionStatsInfos(mock.Anything).Return(nil, nil).Maybe()
catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
s.catalog = catalog
}

Expand Down Expand Up @@ -74,6 +75,9 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
ps := partitionStatsMeta.GetPartitionStats(1, 2, "ch-1", 100)
s.NotNil(ps)

err = partitionStatsMeta.SaveCurrentPartitionStatsVersion(1, 2, "ch-1", 100)
s.NoError(err)

currentVersion := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(int64(100), currentVersion)

Expand Down
5 changes: 5 additions & 0 deletions internal/metastore/kv/datacoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strconv"
"strings"

"github.com/cockroachdb/errors"
"go.uber.org/zap"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
Expand All @@ -43,6 +44,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -930,6 +932,9 @@ func (kc *Catalog) GetCurrentPartitionStatsVersion(ctx context.Context, collID,
key := buildCurrentPartitionStatsVersionPath(collID, partID, vChannel)
valueStr, err := kc.MetaKv.Load(key)
if err != nil {
if errors.Is(err, merr.ErrIoKeyNotFound) {
return 0, nil
}
return 0, err
}

Expand Down

0 comments on commit 2632721

Please sign in to comment.