Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: add follower option #7465

Merged
merged 4 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
// client should retry later.
GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionFromMember gets a region from certain members.
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error)
GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error)
// GetPrevRegion gets the previous region and its leader Peer of the region where the key is located.
GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error)
// GetRegionByID gets a region and its leader Peer from PD by id.
Expand All @@ -100,7 +100,7 @@
// Limit limits the maximum number of regions returned.
// If a region has no leader, corresponding leader will be placed by a peer
// with empty value (PeerID is 0).
ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error)
ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error)
// GetStore gets a store from PD by store id.
// The store may expire later. Caller is responsible for caching and taking care
// of store change.
Expand Down Expand Up @@ -200,7 +200,8 @@

// GetRegionOp represents available options when getting regions.
type GetRegionOp struct {
needBuckets bool
needBuckets bool
allowFollowerHandle bool
}

// GetRegionOption configures GetRegionOp.
Expand All @@ -211,6 +212,11 @@
return func(op *GetRegionOp) { op.needBuckets = true }
}

// WithAllowFollowerHandle means that client can send request to follower and let it handle this request.
func WithAllowFollowerHandle() GetRegionOption {
return func(op *GetRegionOp) { op.allowFollowerHandle = true }

Check warning on line 217 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L217

Added line #L217 was not covered by tests
}

// LeaderHealthCheckInterval might be changed in the unit to shorten the testing time.
var LeaderHealthCheckInterval = time.Second

Expand Down Expand Up @@ -701,6 +707,12 @@
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
}
c.option.setEnableTSOFollowerProxy(enable)
case EnableFollowerHandle:
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")

Check warning on line 713 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L710-L713

Added lines #L710 - L713 were not covered by tests
}
c.option.setEnableFollowerHandle(enable)

Check warning on line 715 in client/client.go

View check run for this annotation

Codecov / codecov/patch

client/client.go#L715

Added line #L715 was not covered by tests
default:
return errors.New("[pd] unsupported client option")
}
Expand Down Expand Up @@ -952,7 +964,7 @@
return code == codes.Unavailable || code == codes.DeadlineExceeded
}

func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) {
func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down Expand Up @@ -1056,7 +1068,7 @@
return handleRegionResponse(resp), nil
}

func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) {
func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context()))
defer span.Finish()
Expand Down
17 changes: 17 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
maxInitClusterRetries = 100
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -40,6 +41,8 @@
// EnableTSOFollowerProxy is the TSO Follower Proxy option.
// It is stored as bool.
EnableTSOFollowerProxy
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle

dynamicOptionCount
)
Expand Down Expand Up @@ -72,6 +75,7 @@

co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
return co
}

Expand All @@ -88,6 +92,19 @@
return nil
}

// setEnableFollowerHandle set the Follower Handle option.
func (o *option) setEnableFollowerHandle(enable bool) {
old := o.getEnableFollowerHandle()
if enable != old {
o.dynamicOptions[EnableFollowerHandle].Store(enable)

Check warning on line 99 in client/option.go

View check run for this annotation

Codecov / codecov/patch

client/option.go#L97-L99

Added lines #L97 - L99 were not covered by tests
}
}

// getMaxTSOBatchWaitIntervalgets the Follower Handle enable option.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// getMaxTSOBatchWaitIntervalgets the Follower Handle enable option.
// getMaxTSOBatchWaitInterval gets the Follower Handle enable option.

func (o *option) getEnableFollowerHandle() bool {
return o.dynamicOptions[EnableFollowerHandle].Load().(bool)

Check warning on line 105 in client/option.go

View check run for this annotation

Codecov / codecov/patch

client/option.go#L105

Added line #L105 was not covered by tests
}

// getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option.
func (o *option) getMaxTSOBatchWaitInterval() time.Duration {
return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration)
Expand Down
8 changes: 8 additions & 0 deletions client/option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ func TestDynamicOptionChange(t *testing.T) {
// Check the default value setting.
re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval())
re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy())
re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle())

// Check the invalid value setting.
re.NotNil(o.setMaxTSOBatchWaitInterval(time.Second))
Expand Down Expand Up @@ -55,4 +56,11 @@ func TestDynamicOptionChange(t *testing.T) {
close(o.enableTSOFollowerProxyCh)
// Setting the same value should not notify the channel.
o.setEnableTSOFollowerProxy(expectBool)

expectBool = true
o.setEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.getEnableFollowerHandle())
expectBool = false
o.setEnableFollowerHandle(expectBool)
re.Equal(expectBool, o.getEnableFollowerHandle())
}
Loading