Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Make dynamic load/release partition follow targets #38059

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions internal/querycoordv2/job/job_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ type LoadCollectionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
Expand All @@ -63,7 +62,6 @@ func NewLoadCollectionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
Expand All @@ -72,11 +70,10 @@ func NewLoadCollectionJob(
return &LoadCollectionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
Expand Down Expand Up @@ -193,12 +190,6 @@ func (job *LoadCollectionJob) Execute() error {
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}

// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{
Expand Down Expand Up @@ -264,7 +255,6 @@ type LoadPartitionJob struct {
dist *meta.DistributionManager
meta *meta.Meta
broker meta.Broker
cluster session.Cluster
targetMgr meta.TargetManagerInterface
targetObserver *observers.TargetObserver
collectionObserver *observers.CollectionObserver
Expand All @@ -277,7 +267,6 @@ func NewLoadPartitionJob(
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
collectionObserver *observers.CollectionObserver,
Expand All @@ -286,11 +275,10 @@ func NewLoadPartitionJob(
return &LoadPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
undo: NewUndoList(ctx, meta, cluster, targetMgr, targetObserver),
undo: NewUndoList(ctx, meta, targetMgr, targetObserver),
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
collectionObserver: collectionObserver,
Expand Down Expand Up @@ -399,12 +387,6 @@ func (job *LoadPartitionJob) Execute() error {
job.undo.IsReplicaCreated = true
}

// 3. loadPartitions on QueryNodes
err = loadPartitions(job.ctx, job.meta, job.cluster, job.broker, true, req.GetCollectionID(), lackPartitionIDs...)
if err != nil {
return err
}

// 4. put collection/partitions meta
partitions := lo.Map(lackPartitionIDs, func(partID int64, _ int) *meta.Partition {
return &meta.Partition{
Expand Down
8 changes: 2 additions & 6 deletions internal/querycoordv2/job/job_release.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
Expand All @@ -65,7 +64,6 @@ func NewReleaseCollectionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
Expand All @@ -86,7 +84,6 @@ func (job *ReleaseCollectionJob) Execute() error {
toRelease := lo.Map(loadedPartitions, func(partition *meta.Partition, _ int) int64 {
return partition.GetPartitionID()
})
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)

err := job.meta.CollectionManager.RemoveCollection(job.ctx, req.GetCollectionID())
if err != nil {
Expand Down Expand Up @@ -139,7 +136,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist *meta.DistributionManager,
meta *meta.Meta,
broker meta.Broker,
cluster session.Cluster,
targetMgr meta.TargetManagerInterface,
targetObserver *observers.TargetObserver,
checkerController *checkers.CheckerController,
Expand All @@ -151,7 +147,6 @@ func NewReleasePartitionJob(ctx context.Context,
dist: dist,
meta: meta,
broker: broker,
cluster: cluster,
targetMgr: targetMgr,
targetObserver: targetObserver,
checkerController: checkerController,
Expand Down Expand Up @@ -180,7 +175,6 @@ func (job *ReleasePartitionJob) Execute() error {
log.Warn("releasing partition(s) not loaded")
return nil
}
releasePartitions(job.ctx, job.meta, job.cluster, req.GetCollectionID(), toRelease...)

// If all partitions are released, clear all
if len(toRelease) == len(loadedPartitions) {
Expand Down Expand Up @@ -214,6 +208,8 @@ func (job *ReleasePartitionJob) Execute() error {
return errors.Wrap(err, msg)
}
job.targetObserver.ReleasePartition(req.GetCollectionID(), toRelease...)
// wait current target updated, so following querys will act as expected
waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
waitCollectionReleased(job.dist, job.checkerController, req.GetCollectionID(), toRelease...)
}
metrics.QueryCoordNumPartitions.WithLabelValues().Sub(float64(len(toRelease)))
Expand Down
34 changes: 17 additions & 17 deletions internal/querycoordv2/job/job_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,31 +25,36 @@ import (

"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/pkg/log"
)

type SyncNewCreatedPartitionJob struct {
*BaseJob
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
req *querypb.SyncNewCreatedPartitionRequest
meta *meta.Meta
cluster session.Cluster
broker meta.Broker
targetObserver *observers.TargetObserver
targetMgr meta.TargetManagerInterface
}

func NewSyncNewCreatedPartitionJob(
ctx context.Context,
req *querypb.SyncNewCreatedPartitionRequest,
meta *meta.Meta,
cluster session.Cluster,
broker meta.Broker,
targetObserver *observers.TargetObserver,
targetMgr meta.TargetManagerInterface,
) *SyncNewCreatedPartitionJob {
return &SyncNewCreatedPartitionJob{
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
cluster: cluster,
broker: broker,
BaseJob: NewBaseJob(ctx, req.Base.GetMsgID(), req.GetCollectionID()),
req: req,
meta: meta,
broker: broker,
targetObserver: targetObserver,
targetMgr: targetMgr,
}
}

Expand All @@ -75,11 +80,6 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
return nil
}

err := loadPartitions(job.ctx, job.meta, job.cluster, job.broker, false, req.GetCollectionID(), req.GetPartitionID())
if err != nil {
return err
}

partition := &meta.Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: req.GetCollectionID(),
Expand All @@ -89,12 +89,12 @@ func (job *SyncNewCreatedPartitionJob) Execute() error {
LoadPercentage: 100,
CreatedAt: time.Now(),
}
err = job.meta.CollectionManager.PutPartition(job.ctx, partition)
err := job.meta.CollectionManager.PutPartition(job.ctx, partition)
if err != nil {
msg := "failed to store partitions"
log.Warn(msg, zap.Error(err))
return errors.Wrap(err, msg)
}

return nil
return waitCurrentTargetUpdated(job.ctx, job.targetObserver, job.req.GetCollectionID())
}
Loading
Loading