Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ttl: ttl use timer framework to trigger jobs #45469

Merged
merged 2 commits into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -3090,7 +3090,7 @@ func (do *Domain) StartTTLJobManager() {
logutil.BgLogger().Info("ttlJobManager exited.")
}()

ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient)
ttlJobManager := ttlworker.NewJobManager(do.ddl.GetID(), do.sysSessionPool, do.store, do.etcdClient, do.ddl.OwnerManager().IsOwner)
do.ttlJobManager.Store(ttlJobManager)
ttlJobManager.Start()

Expand Down
7 changes: 7 additions & 0 deletions timer/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ func (rt *TimerGroupRuntime) Start() {
go rt.loop()
}

// Running returns whether the runtime is running
func (rt *TimerGroupRuntime) Running() bool {
lcwangchao marked this conversation as resolved.
Show resolved Hide resolved
rt.mu.Lock()
defer rt.mu.Unlock()
return rt.ctx != nil && rt.cancel != nil
}

func (rt *TimerGroupRuntime) initCtx() {
rt.ctx, rt.cancel = context.WithCancel(context.Background())
}
Expand Down
2 changes: 2 additions & 0 deletions timer/runtime/runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ func TestRuntimeStartStop(t *testing.T) {
Build()

runtime.Start()
require.True(t, runtime.Running())
waitDone(timerProcessed, time.Minute)
go func() {
runtime.Stop()
require.False(t, runtime.Running())
cancel()
}()
waitDone(ctx.Done(), time.Minute)
Expand Down
2 changes: 1 addition & 1 deletion timer/runtime/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
workerRecvChanCap = 8
workerRecvChanCap = 128
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to enlarge this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make the timer trigger more smoothness when we have many timers in one hook (.i.e 1000 ttl tables)

workerRespChanCap = 128
workerEventDefaultRetryInterval = 10 * time.Second
chanBlockInterval = time.Second
Expand Down
4 changes: 2 additions & 2 deletions ttl/client/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ type TriggerNewTTLJobTableResult struct {
DBName string `json:"db_name"`
TableName string `json:"table_name"`
PartitionName string `json:"partition_name,omitempty"`
JobID string `json:"job_id"`
ErrorMessage string `json:"error_message"`
JobID string `json:"job_id,omitempty"`
ErrorMessage string `json:"error_message,omitempty"`
}

// TriggerNewTTLJobResponse is the response detail for trigger_ttl_job command
Expand Down
34 changes: 31 additions & 3 deletions ttl/client/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,41 @@ func NewMockNotificationClient() NotificationClient {
}

// Notify implements the NotificationClient
func (c *mockClient) Notify(_ context.Context, typ string, data string) error {
func (c *mockClient) Notify(ctx context.Context, typ string, data string) error {
c.Lock()
defer c.Unlock()

for _, ch := range c.notificationWatchers[typ] {
ch <- clientv3.WatchResponse{}
watchers, ok := c.notificationWatchers[typ]
if !ok {
return nil
}

var unsent []chan clientv3.WatchResponse
loop:
for i, ch := range watchers {
select {
case <-ctx.Done():
return ctx.Err()
case ch <- clientv3.WatchResponse{}:
default:
unsent = make([]chan clientv3.WatchResponse, len(watchers), 0)
copy(unsent, watchers[i:])
break loop
}
}

if len(unsent) > 0 {
go func() {
for _, ch := range unsent {
select {
case <-ctx.Done():
return
case ch <- clientv3.WatchResponse{}:
}
}
}()
}

return nil
}

Expand Down
8 changes: 6 additions & 2 deletions ttl/ttlworker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//store/driver/error",
"//timer/api",
"//timer/runtime",
"//timer/tablestore",
"//ttl/cache",
"//ttl/client",
"//ttl/metrics",
Expand All @@ -37,12 +38,13 @@ go_library(
"//util/logutil",
"//util/sqlexec",
"//util/timeutil",
"@com_github_google_uuid//:uuid",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@io_etcd_go_etcd_client_v3//:client",
"@org_golang_x_exp//maps",
"@org_golang_x_exp//slices",
"@org_golang_x_time//rate",
"@org_uber_go_multierr//:multierr",
Expand All @@ -67,7 +69,7 @@ go_test(
embed = [":ttlworker"],
flaky = True,
race = "on",
shard_count = 41,
shard_count = 46,
deps = [
"//domain",
"//infoschema",
Expand Down Expand Up @@ -99,6 +101,8 @@ go_test(
"@com_github_stretchr_testify//mock",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_client_go_v2//tikvrpc",
"@org_golang_x_time//rate",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
Loading