Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3838
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
Rustin170506 authored and ti-chi-bot committed Dec 13, 2021
1 parent c4dc545 commit 7f8949d
Show file tree
Hide file tree
Showing 14 changed files with 1,411 additions and 31 deletions.
227 changes: 227 additions & 0 deletions cdc/capture/http_validator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package capture

import (
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/entry"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/txnutil/gc"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/ticdc/pkg/version"
tidbkv "github.com/pingcap/tidb/kv"
"github.com/r3labs/diff"
"github.com/tikv/client-go/v2/oracle"
)

// verifyCreateChangefeedConfig verify ChangefeedConfig for create a changefeed
func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, capture *Capture) (*model.ChangeFeedInfo, error) {
// verify sinkURI
if changefeedConfig.SinkURI == "" {
return nil, cerror.ErrSinkURIInvalid.GenWithStackByArgs("sink-uri is empty, can't not create a changefeed without sink-uri")
}

// verify changefeedID
if err := model.ValidateChangefeedID(changefeedConfig.ID); err != nil {
return nil, cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedConfig.ID)
}
// check if the changefeed exists
cfStatus, err := capture.owner.StatusProvider().GetChangeFeedStatus(ctx, changefeedConfig.ID)
if err != nil && cerror.ErrChangeFeedNotExists.NotEqual(err) {
return nil, err
}
if cfStatus != nil {
return nil, cerror.ErrChangeFeedAlreadyExists.GenWithStackByArgs(changefeedConfig.ID)
}

// verify start-ts
if changefeedConfig.StartTS == 0 {
ts, logical, err := capture.pdClient.GetTS(ctx)
if err != nil {
return nil, cerror.ErrPDEtcdAPIError.GenWithStackByArgs("fail to get ts from pd client")
}
changefeedConfig.StartTS = oracle.ComposeTS(ts, logical)
}

// Ensure the start ts is valid in the next 1 hour.
const ensureTTL = 60 * 60
if err := gc.EnsureChangefeedStartTsSafety(
ctx, capture.pdClient, changefeedConfig.ID, ensureTTL, changefeedConfig.StartTS); err != nil {
if !cerror.ErrStartTsBeforeGC.Equal(err) {
return nil, cerror.ErrPDEtcdAPIError.Wrap(err)
}
return nil, err
}

// verify target-ts
if changefeedConfig.TargetTS > 0 && changefeedConfig.TargetTS <= changefeedConfig.StartTS {
return nil, cerror.ErrTargetTsBeforeStartTs.GenWithStackByArgs(changefeedConfig.TargetTS, changefeedConfig.StartTS)
}

// init replicaConfig
replicaConfig := config.GetDefaultReplicaConfig()
replicaConfig.ForceReplicate = changefeedConfig.ForceReplicate
if changefeedConfig.MounterWorkerNum != 0 {
replicaConfig.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}
if changefeedConfig.SinkConfig != nil {
replicaConfig.Sink = changefeedConfig.SinkConfig
}
if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
replicaConfig.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}
if len(changefeedConfig.FilterRules) != 0 {
replicaConfig.Filter.Rules = changefeedConfig.FilterRules
}

captureInfos, err := capture.owner.StatusProvider().GetCaptures(ctx)
if err != nil {
return nil, err
}
// set sortEngine and EnableOldValue
cdcClusterVer, err := version.GetTiCDCClusterVersion(model.ListVersionsFromCaptureInfos(captureInfos))
if err != nil {
return nil, err
}
sortEngine := model.SortUnified
if !cdcClusterVer.ShouldEnableOldValueByDefault() {
replicaConfig.EnableOldValue = false
log.Warn("The TiCDC cluster is built from unknown branch or less than 5.0.0-rc, the old-value are disabled by default.")
if !cdcClusterVer.ShouldEnableUnifiedSorterByDefault() {
sortEngine = model.SortInMemory
}
}

// init ChangefeedInfo
info := &model.ChangeFeedInfo{
SinkURI: changefeedConfig.SinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Config: replicaConfig,
Engine: sortEngine,
State: model.StateNormal,
SyncPointEnabled: false,
SyncPointInterval: 10 * time.Minute,
CreatorVersion: version.ReleaseVersion,
}

if !replicaConfig.ForceReplicate && !changefeedConfig.IgnoreIneligibleTable {
ineligibleTables, _, err := verifyTables(replicaConfig, capture.kvStorage, changefeedConfig.StartTS)
if err != nil {
return nil, err
}
if len(ineligibleTables) != 0 {
return nil, cerror.ErrTableIneligible.GenWithStackByArgs(ineligibleTables)
}
}

tz, err := util.GetTimezone(changefeedConfig.TimeZone)
if err != nil {
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
ctx = util.PutTimezoneInCtx(ctx, tz)
if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil {
return nil, err
}

return info, nil
}

// verifyUpdateChangefeedConfig verify ChangefeedConfig for update a changefeed
func verifyUpdateChangefeedConfig(ctx context.Context, changefeedConfig model.ChangefeedConfig, oldInfo *model.ChangeFeedInfo) (*model.ChangeFeedInfo, error) {
newInfo, err := oldInfo.Clone()
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
// verify target_ts
if changefeedConfig.TargetTS != 0 {
if changefeedConfig.TargetTS <= newInfo.StartTs {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStack("can not update target-ts:%d less than start-ts:%d", changefeedConfig.TargetTS, newInfo.StartTs)
}
newInfo.TargetTs = changefeedConfig.TargetTS
}

// verify rules
if len(changefeedConfig.FilterRules) != 0 {
newInfo.Config.Filter.Rules = changefeedConfig.FilterRules
_, err = filter.VerifyRules(newInfo.Config)
if err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs(err.Error())
}
}

if len(changefeedConfig.IgnoreTxnStartTs) != 0 {
newInfo.Config.Filter.IgnoreTxnStartTs = changefeedConfig.IgnoreTxnStartTs
}

if changefeedConfig.MounterWorkerNum != 0 {
newInfo.Config.Mounter.WorkerNum = changefeedConfig.MounterWorkerNum
}

if changefeedConfig.SinkConfig != nil {
newInfo.Config.Sink = changefeedConfig.SinkConfig
}

// verify sink_uri
if changefeedConfig.SinkURI != "" {
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}

if !diff.Changed(oldInfo, newInfo) {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("changefeed config is the same with the old one, do nothing")
}

return newInfo, nil
}

func verifyTables(replicaConfig *config.ReplicaConfig, storage tidbkv.Storage, startTs uint64) (ineligibleTables, eligibleTables []model.TableName, err error) {
filter, err := filter.NewFilter(replicaConfig)
if err != nil {
return nil, nil, errors.Trace(err)
}
meta, err := kv.GetSnapshotMeta(storage, startTs)
if err != nil {
return nil, nil, errors.Trace(err)
}
snap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs, false /* explicitTables */)
if err != nil {
return nil, nil, errors.Trace(err)
}

for _, tableInfo := range snap.Tables() {
if filter.ShouldIgnoreTable(tableInfo.TableName.Schema, tableInfo.TableName.Table) {
continue
}
if !tableInfo.IsEligible(false /* forceReplicate */) {
ineligibleTables = append(ineligibleTables, tableInfo.TableName)
} else {
eligibleTables = append(eligibleTables, tableInfo.TableName)
}
}
return
}
10 changes: 10 additions & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,13 @@ func (c *CaptureInfo) Unmarshal(data []byte) error {
return errors.Annotatef(cerror.WrapError(cerror.ErrUnmarshalFailed, err),
"unmarshal data: %v", data)
}

// ListVersionsFromCaptureInfos returns the version list of the CaptureInfo list.
func ListVersionsFromCaptureInfos(captureInfos []*CaptureInfo) []string {
var captureVersions []string
for _, ci := range captureInfos {
captureVersions = append(captureVersions, ci.Version)
}

return captureVersions
}
17 changes: 17 additions & 0 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,20 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
c.Assert(err, check.IsNil)
c.Assert(decodedInfo, check.DeepEquals, info)
}

func TestListVersionsFromCaptureInfos(t *testing.T) {
infos := []*CaptureInfo{
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
},
{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7891",
AdvertiseAddr: "127.0.0.1:8300",
Version: "",
},
}

require.ElementsMatch(t, []string{"dev", ""}, ListVersionsFromCaptureInfos(infos))
}
64 changes: 59 additions & 5 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ import (
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/cyclic/mark"
cerror "github.com/pingcap/ticdc/pkg/errors"
<<<<<<< HEAD
"github.com/pingcap/tidb/store/tikv/oracle"
=======
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/version"
"github.com/tikv/client-go/v2/oracle"
>>>>>>> 944295563 (owner(ticdc): Add bootstrap and try to fix the meta information in it (#3838))
"go.uber.org/zap"
)

Expand All @@ -48,7 +54,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateRemoved FeedState = "removed"
StateFinished FeedState = "finished"
)

Expand Down Expand Up @@ -208,10 +214,10 @@ func (info *ChangeFeedInfo) Clone() (*ChangeFeedInfo, error) {
return cloned, err
}

// VerifyAndFix verifies changefeed info and may fillin some fields.
// If a must field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fillin it.
func (info *ChangeFeedInfo) VerifyAndFix() error {
// VerifyAndComplete verifies changefeed info and may fill in some fields.
// If a required field is not provided, return an error.
// If some necessary filed is missing but can use a default value, fill in it.
func (info *ChangeFeedInfo) VerifyAndComplete() error {
defaultConfig := config.GetDefaultReplicaConfig()
if info.Engine == "" {
info.Engine = SortUnified
Expand All @@ -234,6 +240,54 @@ func (info *ChangeFeedInfo) VerifyAndFix() error {
return nil
}

// FixIncompatible fixes incompatible changefeed meta info.
func (info *ChangeFeedInfo) FixIncompatible() {
creatorVersionGate := version.NewCreatorVersionGate(info.CreatorVersion)
if creatorVersionGate.ChangefeedStateFromAdminJob() {
log.Info("Start fixing incompatible changefeed state", zap.Any("changefeed", info))
info.fixState()
log.Info("Fix incompatibility changefeed state completed", zap.Any("changefeed", info))
}
}

// fixState attempts to fix state loss from upgrading the old owner to the new owner.
func (info *ChangeFeedInfo) fixState() {
// Notice: In the old owner we used AdminJobType field to determine if the task was paused or not,
// we need to handle this field in the new owner.
// Otherwise, we will see that the old version of the task is paused and then upgraded,
// and the task is automatically resumed after the upgrade.
state := info.State
// Upgrading from an old owner, we need to deal with cases where the state is normal,
// but actually contains errors and does not match the admin job type.
if state == StateNormal {
switch info.AdminJobType {
// This corresponds to the case of failure or error.
case AdminNone, AdminResume:
if info.Error != nil {
if cerrors.ChangefeedFastFailErrorCode(errors.RFCErrorCode(info.Error.Code)) {
state = StateFailed
} else {
state = StateError
}
}
case AdminStop:
state = StateStopped
case AdminFinish:
state = StateFinished
case AdminRemove:
state = StateRemoved
}
}

if state != info.State {
log.Info("handle old owner inconsistent state",
zap.String("old state", string(info.State)),
zap.String("admin job type", info.AdminJobType.String()),
zap.String("new state", string(state)))
info.State = state
}
}

// CheckErrorHistory checks error history of a changefeed
// if having error record older than GC interval, set needSave to true.
// if error counts reach threshold, set canInit to false.
Expand Down
Loading

0 comments on commit 7f8949d

Please sign in to comment.