Skip to content

Commit

Permalink
Merge branch 'master' into fix_4778_openapi_stuck
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Mar 7, 2022
2 parents 095421b + 7db0343 commit d2a4eda
Show file tree
Hide file tree
Showing 134 changed files with 2,791 additions and 1,071 deletions.
7 changes: 4 additions & 3 deletions cdc/api/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import (
"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"

"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/tikv/client-go/v2/oracle"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestHTTPStatus(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion cdc/api/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/clientv3"
clientv3 "go.etcd.io/etcd/client/v3"
)

// status of cdc server
Expand Down
13 changes: 7 additions & 6 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/client/v3/concurrency"
"go.etcd.io/etcd/server/v3/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/kv"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
Expand All @@ -39,12 +46,6 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdtime"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"go.etcd.io/etcd/clientv3/concurrency"
"go.etcd.io/etcd/mvcc"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// Capture represents a Capture server, it monitors the changefeed information in etcd and schedules Task on it.
Expand Down
25 changes: 18 additions & 7 deletions cdc/model/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ func DispatchTableTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableMessage is the message body for dispatching a table.
type DispatchTableMessage struct {
OwnerRev int64 `json:"owner-rev"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
OwnerRev int64 `json:"owner-rev"`
Epoch ProcessorEpoch `json:"epoch"`
ID TableID `json:"id"`
IsDelete bool `json:"is-delete"`
}

// DispatchTableResponseTopic returns a message topic for the result of
Expand All @@ -44,7 +45,8 @@ func DispatchTableResponseTopic(changefeedID ChangeFeedID) p2p.Topic {

// DispatchTableResponseMessage is the message body for the result of dispatching a table.
type DispatchTableResponseMessage struct {
ID TableID `json:"id"`
ID TableID `json:"id"`
Epoch ProcessorEpoch `json:"epoch"`
}

// AnnounceTopic returns a message topic for announcing an ownership change.
Expand All @@ -64,14 +66,23 @@ func SyncTopic(changefeedID ChangeFeedID) p2p.Topic {
return fmt.Sprintf("send-status-resp/%s", changefeedID)
}

// ProcessorEpoch designates a continuous period of the processor working normally.
type ProcessorEpoch = string

// SyncMessage is the message body for syncing the current states of a processor.
// MsgPack serialization has been implemented to minimize the size of the message.
type SyncMessage struct {
// Sends the processor's version for compatibility check
ProcessorVersion string
Running []TableID
Adding []TableID
Removing []TableID

// Epoch is reset to a unique value when the processor has
// encountered an internal error or other events so that
// it has to re-sync its states with the Owner.
Epoch ProcessorEpoch

Running []TableID
Adding []TableID
Removing []TableID
}

// Marshal serializes the message into MsgPack format.
Expand Down
8 changes: 5 additions & 3 deletions cdc/model/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,23 @@ func makeVeryLargeSyncMessage() *SyncMessage {
func TestMarshalDispatchTableMessage(t *testing.T) {
msg := &DispatchTableMessage{
OwnerRev: 1,
Epoch: "test-epoch",
ID: TableID(1),
IsDelete: true,
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"owner-rev":1,"id":1,"is-delete":true}`, string(bytes))
require.Equal(t, `{"owner-rev":1,"epoch":"test-epoch","id":1,"is-delete":true}`, string(bytes))
}

func TestMarshalDispatchTableResponseMessage(t *testing.T) {
msg := &DispatchTableResponseMessage{
ID: TableID(1),
ID: TableID(1),
Epoch: "test-epoch",
}
bytes, err := json.Marshal(msg)
require.NoError(t, err)
require.Equal(t, `{"id":1}`, string(bytes))
require.Equal(t, `{"id":1,"epoch":"test-epoch"}`, string(bytes))
}

func TestMarshalAnnounceMessage(t *testing.T) {
Expand Down
29 changes: 27 additions & 2 deletions cdc/owner/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,27 @@ func (s *schedulerV2) DispatchTable(
tableID model.TableID,
captureID model.CaptureID,
isDelete bool,
epoch model.ProcessorEpoch,
) (done bool, err error) {
topic := model.DispatchTableTopic(changeFeedID)
message := &model.DispatchTableMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
ID: tableID,
IsDelete: isDelete,
Epoch: epoch,
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: DispatchTable",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -155,13 +168,24 @@ func (s *schedulerV2) Announce(
ctx context.Context,
changeFeedID model.ChangeFeedID,
captureID model.CaptureID,
) (bool, error) {
) (done bool, err error) {
topic := model.AnnounceTopic(changeFeedID)
message := &model.AnnounceMessage{
OwnerRev: ctx.GlobalVars().OwnerRevision,
OwnerVersion: version.ReleaseSemver(),
}

defer func() {
if err != nil {
return
}
log.Info("schedulerV2: Announce",
zap.Any("message", message),
zap.Any("successful", done),
zap.String("changefeedID", changeFeedID),
zap.String("captureID", captureID))
}()

ok, err := s.trySendMessage(ctx, captureID, topic, message)
if err != nil {
return false, errors.Trace(err)
Expand Down Expand Up @@ -239,7 +263,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
func(sender string, messageI interface{}) error {
message := messageI.(*model.DispatchTableResponseMessage)
s.stats.RecordDispatchResponse()
s.OnAgentFinishedTableOperation(sender, message.ID)
s.OnAgentFinishedTableOperation(sender, message.ID, message.Epoch)
return nil
})
if err != nil {
Expand All @@ -256,6 +280,7 @@ func (s *schedulerV2) registerPeerMessageHandlers(ctx context.Context) (ret erro
s.stats.RecordSync()
s.OnAgentSyncTaskStatuses(
sender,
message.Epoch,
message.Running,
message.Adding,
message.Removing)
Expand Down
104 changes: 81 additions & 23 deletions cdc/processor/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ import (
stdContext "context"
"time"

"go.uber.org/zap/zapcore"

"github.com/benbjohnson/clock"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.etcd.io/etcd/client/v3/concurrency"
"go.uber.org/zap"
"golang.org/x/time/rate"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
Expand All @@ -28,9 +34,6 @@ import (
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/version"
"go.etcd.io/etcd/clientv3/concurrency"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -161,31 +164,70 @@ func (a *agentImpl) Tick(ctx context.Context) error {
func (a *agentImpl) FinishTableOperation(
ctx context.Context,
tableID model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
epoch model.ProcessorEpoch,
) (done bool, err error) {
message := &model.DispatchTableResponseMessage{ID: tableID, Epoch: epoch}
defer func() {
if err != nil {
return
}
log.Info("SchedulerAgent: FinishTableOperation", zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx, a.ownerCaptureID,
model.DispatchTableResponseTopic(a.changeFeed),
&model.DispatchTableResponseMessage{ID: tableID})
message)
if err != nil {
return false, errors.Trace(err)
}
return done, nil
}

func (a *agentImpl) SyncTaskStatuses(
ctx context.Context,
running, adding, removing []model.TableID,
) (bool, error) {
done, err := a.trySendMessage(
ctx context.Context, epoch model.ProcessorEpoch, adding, removing, running []model.TableID,
) (done bool, err error) {
if !a.Barrier(ctx) {
// The Sync message needs to be strongly ordered w.r.t. other messages.
return false, nil
}

message := &model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Epoch: epoch,
Running: running,
Adding: adding,
Removing: removing,
}

defer func() {
if err != nil {
return
}
if log.GetLevel() == zapcore.DebugLevel {
// The message can be REALLY large, so we do not print it
// unless the log level is debug.
log.Debug("SchedulerAgent: SyncTaskStatuses",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
return
}
log.Info("SchedulerAgent: SyncTaskStatuses",
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.SyncTopic(a.changeFeed),
&model.SyncMessage{
ProcessorVersion: version.ReleaseSemver(),
Running: running,
Adding: adding,
Removing: removing,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand All @@ -196,15 +238,30 @@ func (a *agentImpl) SendCheckpoint(
ctx context.Context,
checkpointTs model.Ts,
resolvedTs model.Ts,
) (bool, error) {
done, err := a.trySendMessage(
) (done bool, err error) {
message := &model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
}

defer func() {
if err != nil {
return
}
// This log is very often, so we only print it if the
// log level is debug.
log.Debug("SchedulerAgent: SendCheckpoint",
zap.Any("message", message),
zap.Bool("successful", done),
zap.String("changefeedID", a.changeFeed),
zap.String("ownerID", a.ownerCaptureID))
}()

done, err = a.trySendMessage(
ctx,
a.ownerCaptureID,
model.CheckpointTopic(a.changeFeed),
&model.CheckpointMessage{
CheckpointTs: checkpointTs,
ResolvedTs: resolvedTs,
})
message)
if err != nil {
return false, errors.Trace(err)
}
Expand Down Expand Up @@ -339,7 +396,8 @@ func (a *agentImpl) registerPeerMessageHandlers() (ret error) {
ownerCapture,
message.OwnerRev,
message.ID,
message.IsDelete)
message.IsDelete,
message.Epoch)
return nil
})
if err != nil {
Expand Down
Loading

0 comments on commit d2a4eda

Please sign in to comment.