Skip to content

Commit

Permalink
enhance: add the related data size for the read apis (#31816)
Browse files Browse the repository at this point in the history
issue: #30436
origin pr: #30438
related pr: #31772

---------

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Apr 10, 2024
1 parent c9faa6d commit 90bed1c
Show file tree
Hide file tree
Showing 17 changed files with 220 additions and 122 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/metrics_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *Server) getCollectionMetrics(ctx context.Context) *metricsinfo.DataCoor
ret.Collections[collectionID].IndexInfo = append(ret.Collections[collectionID].IndexInfo, &metricsinfo.DataCoordIndexInfo{
NumEntitiesIndexed: info.GetIndexedRows(),
IndexName: info.GetIndexName(),
FieldID: info.GetIndexID(),
FieldID: info.GetFieldID(),
})
}
}
Expand Down
1 change: 1 addition & 0 deletions internal/proto/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ message CostAggregation {
int64 responseTime = 1;
int64 serviceTime = 2;
int64 totalNQ = 3;
int64 totalRelatedDataSize = 4;
}

message RetrieveRequest {
Expand Down
173 changes: 107 additions & 66 deletions internal/proxy/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2436,23 +2436,31 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
successCnt := it.result.InsertCnt - int64(len(it.result.ErrIndex))
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName

v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeInsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(request),
hookutil.SuccessCntKey: successCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
hookutil.OpTypeKey: hookutil.OpTypeInsert,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.RequestDataSizeKey: proto.Size(request),
hookutil.SuccessCntKey: successCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
})
SetReportValue(it.result.GetStatus(), v)
if merr.Ok(it.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeInsert, request.DbName, username).Add(float64(v))
}
metrics.ProxyInsertVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(successCnt))
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.InsertLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.InsertLabel, request.CollectionName).
WithLabelValues(nodeID, metrics.InsertLabel, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
return it.result, nil
}
Expand Down Expand Up @@ -2511,7 +2519,7 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
log.Debug("Run delete in Proxy")

if err := dr.Run(ctx); err != nil {
log.Error("Failed to enqueue delete task: " + err.Error())
log.Error("Failed to run delete task: " + err.Error())
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.FailLabel, request.GetDbName(), request.GetCollectionName()).Inc()

Expand All @@ -2526,21 +2534,28 @@ func (node *Proxy) Delete(ctx context.Context, request *milvuspb.DeleteRequest)
successCnt := dr.result.GetDeleteCnt()
metrics.ProxyDeleteVectors.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(successCnt))

username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeDelete,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.SuccessCntKey: successCnt,
hookutil.RelatedCntKey: dr.allQueryCnt.Load(),
})
SetReportValue(dr.result.GetStatus(), v)

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
if merr.Ok(dr.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeDelete, dbName, username).Add(float64(v))
}
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method,
metrics.SuccessLabel, dbName, collectionName).Inc()
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.DeleteLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.DeleteLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.DeleteLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
return dr.result, nil
}

Expand Down Expand Up @@ -2652,28 +2667,34 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
// UpsertCnt always equals to the number of entities in the request
it.result.UpsertCnt = int64(request.NumRows)

username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeUpsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(it.req),
hookutil.SuccessCntKey: it.result.UpsertCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
hookutil.OpTypeKey: hookutil.OpTypeUpsert,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.RequestDataSizeKey: proto.Size(it.req),
hookutil.SuccessCntKey: it.result.UpsertCnt,
hookutil.FailCntKey: len(it.result.ErrIndex),
})
SetReportValue(it.result.GetStatus(), v)

rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.DeleteMsg.Size()+it.upsertMsg.DeleteMsg.Size()))

metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel, request.GetDbName(), request.GetCollectionName()).Inc()
rateCol.Add(internalpb.RateType_DMLUpsert.String(), float64(it.upsertMsg.InsertMsg.Size()+it.upsertMsg.DeleteMsg.Size()))
if merr.Ok(it.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeUpsert, dbName, username).Add(float64(v))
}
metrics.ProxyFunctionCall.WithLabelValues(nodeID, method,
metrics.SuccessLabel, dbName, collectionName).Inc()
successCnt := it.result.UpsertCnt - int64(len(it.result.ErrIndex))
metrics.ProxyUpsertVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(successCnt))
metrics.ProxyMutationLatency.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, metrics.UpsertLabel, dbName, collectionName).
Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.UpsertLabel, request.CollectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.ProxyCollectionMutationLatency.WithLabelValues(nodeID, metrics.UpsertLabel, collectionName).Observe(float64(tr.ElapseSpan().Milliseconds()))

log.Debug("Finish processing upsert request in Proxy")
return it.result, nil
Expand All @@ -2686,7 +2707,8 @@ func (node *Proxy) Search(ctx context.Context, request *milvuspb.SearchRequest)
Status: merr.Success(),
}
err2 := retry.Handle(ctx, func() (bool, error) {
rsp, err = node.search(ctx, request)
rsp, err = node.
search(ctx, request)
if errors.Is(merr.Error(rsp.GetStatus()), merr.ErrInconsistentRequery) {
return true, merr.Error(rsp.GetStatus())
}
Expand Down Expand Up @@ -2835,51 +2857,59 @@ func (node *Proxy) search(ctx context.Context, request *milvuspb.SearchRequest)
}

span := tr.CtxRecord(ctx, "wait search result")
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
).Observe(float64(span.Milliseconds()))

tr.CtxRecord(ctx, "wait search result")
log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Inc()

metrics.ProxySearchVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(qt.result.GetResults().GetNumQueries()))

searchDur := tr.ElapseSpan().Milliseconds()
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Observe(float64(searchDur))

metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.SearchLabel,
request.CollectionName,
collectionName,
).Observe(float64(searchDur))

if qt.result != nil {
username := GetCurUserFromContextOrDefault(ctx)
sentSize := proto.Size(qt.result)
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeSearch,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: sentSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
hookutil.OpTypeKey: hookutil.OpTypeSearch,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: sentSize,
hookutil.RelatedDataSizeKey: qt.relatedDataSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
})
SetReportValue(qt.result.GetStatus(), v)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeSearch, dbName, username).Add(float64(v))
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
}
return qt.result, nil
Expand Down Expand Up @@ -3016,51 +3046,59 @@ func (node *Proxy) hybridSearch(ctx context.Context, request *milvuspb.HybridSea
}

span := tr.CtxRecord(ctx, "wait hybrid search result")
nodeID := paramtable.GetStringNodeID()
dbName := request.DbName
collectionName := request.CollectionName
metrics.ProxyWaitForSearchResultLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
).Observe(float64(span.Milliseconds()))

tr.CtxRecord(ctx, "wait hybrid search result")
log.Debug(rpcDone(method))

metrics.ProxyFunctionCall.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
method,
metrics.SuccessLabel,
request.GetDbName(),
request.GetCollectionName(),
).Inc()

metrics.ProxySearchVectors.
WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), request.GetDbName(), request.GetCollectionName()).
WithLabelValues(nodeID, dbName, collectionName).
Add(float64(len(request.GetRequests())))

searchDur := tr.ElapseSpan().Milliseconds()
metrics.ProxySQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
request.GetDbName(),
request.GetCollectionName(),
dbName,
collectionName,
).Observe(float64(searchDur))

metrics.ProxyCollectionSQLatency.WithLabelValues(
strconv.FormatInt(paramtable.GetNodeID(), 10),
nodeID,
metrics.HybridSearchLabel,
request.CollectionName,
collectionName,
).Observe(float64(searchDur))

if qt.result != nil {
sentSize := proto.Size(qt.result)
username := GetCurUserFromContextOrDefault(ctx)
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeHybridSearch,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: sentSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
hookutil.OpTypeKey: hookutil.OpTypeHybridSearch,
hookutil.DatabaseKey: dbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: sentSize,
hookutil.RelatedDataSizeKey: qt.relatedDataSize,
hookutil.RelatedCntKey: qt.result.GetResults().GetAllSearchCount(),
})
SetReportValue(qt.result.GetStatus(), v)
metrics.ProxyReadReqSendBytes.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Add(float64(sentSize))
if merr.Ok(qt.result.GetStatus()) {
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeHybridSearch, dbName, username).Add(float64(v))
}
metrics.ProxyReadReqSendBytes.WithLabelValues(nodeID).Add(float64(sentSize))
rateCol.Add(metricsinfo.ReadResultThroughput, float64(sentSize))
}
return qt.result, nil
Expand Down Expand Up @@ -3350,15 +3388,18 @@ func (node *Proxy) Query(ctx context.Context, request *milvuspb.QueryRequest) (*
}
res, err := node.query(ctx, qt)
if merr.Ok(res.Status) && err == nil {
username := GetCurUserFromContextOrDefault(ctx)
nodeID := paramtable.GetStringNodeID()
v := Extension.Report(map[string]any{
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: GetCurUserFromContextOrDefault(ctx),
hookutil.DataSizeKey: proto.Size(res),
hookutil.RelatedCntKey: qt.allQueryCnt,
hookutil.DimensionKey: qt.dimension,
hookutil.OpTypeKey: hookutil.OpTypeQuery,
hookutil.DatabaseKey: request.DbName,
hookutil.UsernameKey: username,
hookutil.ResultDataSizeKey: proto.Size(res),
hookutil.RelatedDataSizeKey: qt.totalRelatedDataSize,
hookutil.RelatedCntKey: qt.allQueryCnt,
})
SetReportValue(res.Status, v)
metrics.ProxyReportValue.WithLabelValues(nodeID, hookutil.OpTypeQuery, request.DbName, username).Add(float64(v))
}
return res, err
}
Expand Down
13 changes: 5 additions & 8 deletions internal/proxy/task_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type queryTask struct {
collectionName string
queryParams *queryParams
schema *schemaInfo
dimension int64

userOutputFields []string

Expand All @@ -66,8 +65,9 @@ type queryTask struct {
channelsMvcc map[string]Timestamp
fastSkip bool

reQuery bool
allQueryCnt int64
reQuery bool
allQueryCnt int64
totalRelatedDataSize int64
}

type queryParams struct {
Expand Down Expand Up @@ -341,11 +341,6 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
return err
}
t.schema = schema
t.dimension, err = typeutil.GetCollectionDim(t.schema.CollectionSchema)
if err != nil {
log.Warn("get collection dimension failed", zap.Error(err))
return err
}

if t.ids != nil {
pkField := ""
Expand Down Expand Up @@ -481,6 +476,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error {

toReduceResults := make([]*internalpb.RetrieveResults, 0)
t.allQueryCnt = 0
t.totalRelatedDataSize = 0
select {
case <-t.TraceCtx().Done():
log.Warn("proxy", zap.Int64("Query: wait to finish failed, timeout!, msgID:", t.ID()))
Expand All @@ -490,6 +486,7 @@ func (t *queryTask) PostExecute(ctx context.Context) error {
t.resultBuf.Range(func(res *internalpb.RetrieveResults) bool {
toReduceResults = append(toReduceResults, res)
t.allQueryCnt += res.GetAllRetrieveCount()
t.totalRelatedDataSize += res.GetCostAggregation().GetTotalRelatedDataSize()
log.Debug("proxy receives one query result", zap.Int64("sourceID", res.GetBase().GetSourceID()))
return true
})
Expand Down
Loading

0 comments on commit 90bed1c

Please sign in to comment.