-
Notifications
You must be signed in to change notification settings - Fork 725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DNM]add user kv api #989
[DNM]add user kv api #989
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -53,6 +53,12 @@ type Client interface { | |
// The store may expire later. Caller is responsible for caching and taking care | ||
// of store change. | ||
GetStore(ctx context.Context, storeID uint64) (*metapb.Store, error) | ||
// GetUserKV get the user kv from pd. | ||
GetUserKV(ctx context.Context, key string) (string, error) | ||
// PutUserKV put the user kv to pd. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/put/puts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
PutUserKV(ctx context.Context, key string, value string) error | ||
// DeleteUserKV delete the user kv from pd. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/delete/deletes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
DeleteUserKV(ctx context.Context, key string) error | ||
// Close closes the client. | ||
Close() | ||
} | ||
|
@@ -622,6 +628,79 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e | |
return store, nil | ||
} | ||
|
||
func (c *client) GetUserKV(ctx context.Context, key string) (string, error) { | ||
if span := opentracing.SpanFromContext(ctx); span != nil { | ||
span = opentracing.StartSpan("pdclient.GetUserKV", opentracing.ChildOf(span.Context())) | ||
defer span.Finish() | ||
} | ||
start := time.Now() | ||
defer func() { cmdDuration.WithLabelValues("get_user_key").Observe(time.Since(start).Seconds()) }() | ||
|
||
ctx, cancel := context.WithTimeout(ctx, pdTimeout) | ||
resp, err := c.leaderClient().GetUserKV(ctx, &pdpb.GetUserKVRequest{ | ||
Header: c.requestHeader(), | ||
Key: key, | ||
}) | ||
requestDuration.WithLabelValues("get_user_key").Observe(time.Since(start).Seconds()) | ||
cancel() | ||
|
||
if err != nil { | ||
cmdFailedDuration.WithLabelValues("get_user_key").Observe(time.Since(start).Seconds()) | ||
c.scheduleCheckLeader() | ||
return "", errors.Trace(err) | ||
} | ||
return resp.GetValue(), nil | ||
} | ||
|
||
func (c *client) PutUserKV(ctx context.Context, key string, value string) error { | ||
if span := opentracing.SpanFromContext(ctx); span != nil { | ||
span = opentracing.StartSpan("pdclient.PutUserKV", opentracing.ChildOf(span.Context())) | ||
defer span.Finish() | ||
} | ||
start := time.Now() | ||
defer func() { cmdDuration.WithLabelValues("put_user_key").Observe(time.Since(start).Seconds()) }() | ||
|
||
ctx, cancel := context.WithTimeout(ctx, pdTimeout) | ||
_, err := c.leaderClient().PutUserKV(ctx, &pdpb.PutUserKVRequest{ | ||
Header: c.requestHeader(), | ||
Key: key, | ||
Value: value, | ||
}) | ||
requestDuration.WithLabelValues("put_user_key").Observe(time.Since(start).Seconds()) | ||
cancel() | ||
|
||
if err != nil { | ||
cmdFailedDuration.WithLabelValues("put_user_key").Observe(time.Since(start).Seconds()) | ||
c.scheduleCheckLeader() | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *client) DeleteUserKV(ctx context.Context, key string) error { | ||
if span := opentracing.SpanFromContext(ctx); span != nil { | ||
span = opentracing.StartSpan("pdclient.DeleteUserKV", opentracing.ChildOf(span.Context())) | ||
defer span.Finish() | ||
} | ||
start := time.Now() | ||
defer func() { cmdDuration.WithLabelValues("delete_user_kv").Observe(time.Since(start).Seconds()) }() | ||
|
||
ctx, cancel := context.WithTimeout(ctx, pdTimeout) | ||
_, err := c.leaderClient().DeleteUserKV(ctx, &pdpb.DeleteUserKVRequest{ | ||
Header: c.requestHeader(), | ||
Key: key, | ||
}) | ||
requestDuration.WithLabelValues("delete_user_kv").Observe(time.Since(start).Seconds()) | ||
cancel() | ||
|
||
if err != nil { | ||
cmdFailedDuration.WithLabelValues("delete_user_kv").Observe(time.Since(start).Seconds()) | ||
c.scheduleCheckLeader() | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
func (c *client) requestHeader() *pdpb.RequestHeader { | ||
return &pdpb.RequestHeader{ | ||
ClusterId: c.clusterID, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -555,3 +555,56 @@ func (s *Server) notBootstrappedHeader() *pdpb.ResponseHeader { | |
Message: "cluster is not bootstrapped", | ||
}) | ||
} | ||
|
||
// GetUserKV get key from pd under dir 'user'. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/get/gets |
||
func (s *Server) GetUserKV(ctx context.Context, request *pdpb.GetUserKVRequest) (*pdpb.GetUserKVResponse, error) { | ||
if err := s.validateRequest(request.GetHeader()); err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
value, err := s.kv.GetUserKV(request.GetKey()) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
return &pdpb.GetUserKVResponse{ | ||
Header: s.header(), | ||
Key: request.GetKey(), | ||
Value: value, | ||
}, nil | ||
} | ||
|
||
// PutUserKV put key to pd under dir 'user'. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/put/puts There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
func (s *Server) PutUserKV(ctx context.Context, request *pdpb.PutUserKVRequest) (*pdpb.PutUserKVResponse, error) { | ||
if err := s.validateRequest(request.GetHeader()); err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
err := s.kv.PutUserKV(request.GetKey(), request.GetValue()) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
return &pdpb.PutUserKVResponse{ | ||
Header: s.header(), | ||
Key: request.GetKey(), | ||
Value: request.GetValue(), | ||
}, nil | ||
} | ||
|
||
// DeleteUserKV delete key from pd under dir 'user'. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/delete/deletes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
func (s *Server) DeleteUserKV(ctx context.Context, request *pdpb.DeleteUserKVRequest) (*pdpb.DeleteUserKVResponse, error) { | ||
if err := s.validateRequest(request.GetHeader()); err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
err := s.kv.DeleteUserKV(request.GetKey()) | ||
if err != nil { | ||
return nil, errors.Trace(err) | ||
} | ||
|
||
return &pdpb.DeleteUserKVResponse{ | ||
Header: s.header(), | ||
Key: request.GetKey(), | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/get/gets
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok