-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Implement federated metric metadata API (#3686)
* support federated metadata API Signed-off-by: Ben Ye <[email protected]> * update comments Signed-off-by: yeya24 <[email protected]> * use parseInt Signed-off-by: yeya24 <[email protected]> * address Prem's comments Signed-off-by: yeya24 <[email protected]> * update proto comment Signed-off-by: yeya24 <[email protected]> * add changelog Signed-off-by: yeya24 <[email protected]>
- Loading branch information
Showing
24 changed files
with
2,281 additions
and
28 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package metadata | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/pkg/errors" | ||
"github.com/prometheus/prometheus/storage" | ||
"github.com/thanos-io/thanos/pkg/metadata/metadatapb" | ||
) | ||
|
||
var _ UnaryClient = &GRPCClient{} | ||
|
||
// UnaryClient is gRPC metadatapb.Metadata client which expands streaming metadata API. Useful for consumers that does not | ||
// support streaming. | ||
type UnaryClient interface { | ||
Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) | ||
} | ||
|
||
// GRPCClient allows to retrieve metadata from local gRPC streaming server implementation. | ||
// TODO(bwplotka): Switch to native gRPC transparent client->server adapter once available. | ||
type GRPCClient struct { | ||
proxy metadatapb.MetadataServer | ||
} | ||
|
||
func NewGRPCClient(ts metadatapb.MetadataServer) *GRPCClient { | ||
return &GRPCClient{ | ||
proxy: ts, | ||
} | ||
} | ||
|
||
func (rr *GRPCClient) Metadata(ctx context.Context, req *metadatapb.MetadataRequest) (map[string][]metadatapb.Meta, storage.Warnings, error) { | ||
srv := &metadataServer{ctx: ctx, metric: req.Metric, limit: int(req.Limit)} | ||
|
||
if req.Limit >= 0 { | ||
if req.Metric != "" { | ||
srv.metadataMap = make(map[string][]metadatapb.Meta, 1) | ||
} else if req.Limit <= 100 { | ||
srv.metadataMap = make(map[string][]metadatapb.Meta, req.Limit) | ||
} else { | ||
srv.metadataMap = make(map[string][]metadatapb.Meta) | ||
} | ||
} else { | ||
srv.metadataMap = make(map[string][]metadatapb.Meta) | ||
} | ||
|
||
if err := rr.proxy.Metadata(req, srv); err != nil { | ||
return nil, nil, errors.Wrap(err, "proxy Metadata") | ||
} | ||
|
||
return srv.metadataMap, srv.warnings, nil | ||
} | ||
|
||
type metadataServer struct { | ||
// This field just exist to pseudo-implement the unused methods of the interface. | ||
metadatapb.Metadata_MetadataServer | ||
ctx context.Context | ||
|
||
metric string | ||
limit int | ||
|
||
warnings []error | ||
metadataMap map[string][]metadatapb.Meta | ||
} | ||
|
||
func (srv *metadataServer) Send(res *metadatapb.MetadataResponse) error { | ||
if res.GetWarning() != "" { | ||
srv.warnings = append(srv.warnings, errors.New(res.GetWarning())) | ||
return nil | ||
} | ||
|
||
if res.GetMetadata() == nil { | ||
return errors.New("no metadata") | ||
} | ||
|
||
// If limit is set to 0, we don't need to add anything. | ||
if srv.limit == 0 { | ||
return nil | ||
} | ||
|
||
for k, v := range res.GetMetadata().Metadata { | ||
if metadata, ok := srv.metadataMap[k]; !ok { | ||
// If limit is set and it is positive, we limit the size of the map. | ||
if srv.limit < 0 || srv.limit > 0 && len(srv.metadataMap) < srv.limit { | ||
srv.metadataMap[k] = v.Metas | ||
} | ||
} else { | ||
// There shouldn't be many metadata for one single metric. | ||
Outer: | ||
for _, meta := range v.Metas { | ||
for _, m := range metadata { | ||
if meta == m { | ||
continue Outer | ||
} | ||
} | ||
srv.metadataMap[k] = append(srv.metadataMap[k], meta) | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (srv *metadataServer) Context() context.Context { | ||
return srv.ctx | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
// Copyright (c) The Thanos Authors. | ||
// Licensed under the Apache License 2.0. | ||
|
||
package metadatapb | ||
|
||
import ( | ||
"unsafe" | ||
) | ||
|
||
func NewMetadataResponse(metadata *MetricMetadata) *MetadataResponse { | ||
return &MetadataResponse{ | ||
Result: &MetadataResponse_Metadata{ | ||
Metadata: metadata, | ||
}, | ||
} | ||
} | ||
|
||
func NewWarningMetadataResponse(warning error) *MetadataResponse { | ||
return &MetadataResponse{ | ||
Result: &MetadataResponse_Warning{ | ||
Warning: warning.Error(), | ||
}, | ||
} | ||
} | ||
|
||
func FromMetadataMap(m map[string][]Meta) *MetricMetadata { | ||
return &MetricMetadata{Metadata: *(*map[string]MetricMetadataEntry)(unsafe.Pointer(&m))} | ||
} |
Oops, something went wrong.