Skip to content

Commit

Permalink
sink(cdc): fallback when preparing statements meets error (#8532)
Browse files Browse the repository at this point in the history
ref #8508
  • Loading branch information
hicqu authored Mar 17, 2023
1 parent bbf2d87 commit a59888c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 82 deletions.
69 changes: 40 additions & 29 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"database/sql/driver"
"fmt"
"math"
"net/url"
"time"

Expand Down Expand Up @@ -51,6 +52,9 @@ const (
networkDriftDuration = 5 * time.Second

defaultDMLMaxRetry uint64 = 8

// To limit memory usage for prepared statements.
prepStmtCacheSize int = 16 * 1024
)

type mysqlBackend struct {
Expand All @@ -63,9 +67,11 @@ type mysqlBackend struct {
events []*dmlsink.TxnCallbackableEvent
rows int

statistics *metrics.Statistics
metricTxnSinkDMLBatchCommit prometheus.Observer
metricTxnSinkDMLBatchCallback prometheus.Observer
statistics *metrics.Statistics
metricTxnSinkDMLBatchCommit prometheus.Observer
metricTxnSinkDMLBatchCallback prometheus.Observer
metricTxnPrepareStatementErrors prometheus.Counter

// implement stmtCache to improve performance, especially when the downstream is TiDB
stmtCache *lru.Cache
// Indicate if the CachePrepStmts should be enabled or not
Expand Down Expand Up @@ -122,29 +128,28 @@ func NewMySQLBackends(

// Inherit the default value of the prepared statement cache from the SinkURI Options
cachePrepStmts := cfg.CachePrepStmts
prepStmtCacheSize := cfg.PrepStmtCacheSize

var stmtCache *lru.Cache
if cachePrepStmts {
// query the size of the prepared statement cache on serverside
maxPreparedStmtCount, err := pmysql.QueryMaxPreparedStmtCount(ctx, db)
if err != nil {
return nil, err
}
if maxPreparedStmtCount == -1 {
// NOTE: seems TiDB doesn't follow MySQL's specification.
maxPreparedStmtCount = math.MaxInt
}
// if maxPreparedStmtCount == 0,
// it means that the prepared statement cache is disabled on serverside.
// if maxPreparedStmtCount/(cfg.WorkerCount+1) == 0, for each single connection,
// it means that the prepared statement cache is disabled on clientsize.
// Because each connection can not hold at lease one prepared statement.
if maxPreparedStmtCount == 0 || maxPreparedStmtCount/(cfg.WorkerCount+1) == 0 {
cachePrepStmts = false
} else if maxPreparedStmtCount/(cfg.WorkerCount+1) < prepStmtCacheSize {
// if maxPreparedStmtCount/(cfg.WorkerCount+1) < prepStmtCacheSize,
// it means that the prepared statement cache is too large on clientsize.
// adjust the size of the prepared statement cache on clientsize.
// to avoid error `Can't create more than max_prepared_stmt_count statements`
prepStmtCacheSize = maxPreparedStmtCount / (cfg.WorkerCount + 1)
}
}

var stmtCache *lru.Cache
if cachePrepStmts {
stmtCache, err = lru.NewWithEvict(prepStmtCacheSize, func(key, value interface{}) {
stmt := value.(*sql.Stmt)
stmt.Close()
Expand All @@ -164,10 +169,11 @@ func NewMySQLBackends(
dmlMaxRetry: defaultDMLMaxRetry,
statistics: statistics,

metricTxnSinkDMLBatchCommit: txn.SinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCallback: txn.SinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
stmtCache: stmtCache,
cachePrepStmts: cachePrepStmts,
metricTxnSinkDMLBatchCommit: txn.SinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCallback: txn.SinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnPrepareStatementErrors: txn.PrepareStatementErrors.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
stmtCache: stmtCache,
cachePrepStmts: cachePrepStmts,
})
}

Expand Down Expand Up @@ -656,23 +662,28 @@ func (s *mysqlBackend) sequenceExecute(
log.Debug("exec row", zap.Int("workerID", s.workerID),
zap.String("sql", query), zap.Any("args", args))
ctx, cancelFunc := context.WithTimeout(ctx, writeTimeout)
var execError error
if s.cachePrepStmts {
stmt, ok := s.stmtCache.Get(query)
if !ok {
var err error
stmt, err = s.db.Prepare(query)
if err != nil {
cancelFunc()
return errors.Trace(err)
}

var prepStmt *sql.Stmt
if s.cachePrepStmts {
if stmt, ok := s.stmtCache.Get(query); ok {
prepStmt = stmt.(*sql.Stmt)
} else if stmt, err := s.db.Prepare(query); err == nil {
prepStmt = stmt
s.stmtCache.Add(query, stmt)
} else {
// Generally it means the downstream database doesn't allow
// too many preapred statements. So clean some of them.
s.stmtCache.RemoveOldest()
s.metricTxnPrepareStatementErrors.Inc()
}
//nolint:sqlclosecheck
_, execError = tx.Stmt(stmt.(*sql.Stmt)).ExecContext(ctx, args...)
} else {
}

var execError error
if prepStmt == nil {
_, execError = tx.ExecContext(ctx, query, args...)
} else {
//nolint:sqlclosecheck
_, execError = tx.Stmt(prepStmt).ExecContext(ctx, args...)
}
if execError != nil {
err := logDMLTxnErr(
Expand Down
9 changes: 9 additions & 0 deletions cdc/sink/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ var (
Help: "Duration of execuing a batch of callbacks",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18), // 10ms~1300s
}, []string{"namespace", "changefeed"})

PrepareStatementErrors = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_prepare_statement_errors",
Help: "Prepare statement errors",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file.
Expand All @@ -90,4 +98,5 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
registry.MustRegister(PrepareStatementErrors)
}
44 changes: 5 additions & 39 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ const (

// defaultcachePrepStmts is the default value of cachePrepStmts
defaultCachePrepStmts = true
// defaultPrepStmtCacheSize is the default size of prepared statement cache
// default size = (default max_prepared_stmt_count) / (default worker count + 1)
// 16382 / (16 + 1) = 963
defaultPrepStmtCacheSize = 963
// The upper limit of the max size of prepared statement cache
maxPrepStmtCacheSize = 1048576
)

// Config is the configs for MySQL backend.
Expand All @@ -100,12 +94,11 @@ type Config struct {
ForceReplicate bool
EnableOldValue bool

IsTiDB bool // IsTiDB is true if the downstream is TiDB
SourceID uint64
BatchDMLEnable bool
MultiStmtEnable bool
CachePrepStmts bool
PrepStmtCacheSize int
IsTiDB bool // IsTiDB is true if the downstream is TiDB
SourceID uint64
BatchDMLEnable bool
MultiStmtEnable bool
CachePrepStmts bool
}

// NewConfig returns the default mysql backend config.
Expand All @@ -123,7 +116,6 @@ func NewConfig() *Config {
BatchDMLEnable: defaultBatchDMLEnable,
MultiStmtEnable: defaultMultiStmtEnable,
CachePrepStmts: defaultCachePrepStmts,
PrepStmtCacheSize: defaultPrepStmtCacheSize,
}
}

Expand Down Expand Up @@ -185,9 +177,6 @@ func (c *Config) Apply(
if err = getCachePrepStmts(query, &c.CachePrepStmts); err != nil {
return err
}
if err = getPrepStmtCacheSize(query, &c.PrepStmtCacheSize); err != nil {
return err
}
c.EnableOldValue = replicaConfig.EnableOldValue
c.ForceReplicate = replicaConfig.ForceReplicate
c.SourceID = replicaConfig.Sink.TiDBSourceID
Expand Down Expand Up @@ -414,26 +403,3 @@ func getCachePrepStmts(values url.Values, cachePrepStmts *bool) error {
}
return nil
}

func getPrepStmtCacheSize(values url.Values, prepStmtCacheSize *int) error {
s := values.Get("prep-stmt-cache-size")
if len(s) == 0 {
return nil
}

c, err := strconv.Atoi(s)
if err != nil {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
if c <= 0 {
return cerror.WrapError(cerror.ErrMySQLInvalidConfig,
fmt.Errorf("invalid prep-stmt-cache-size %d, which must be greater than 0", c))
}
if c > maxPrepStmtCacheSize {
log.Warn("prep-stmt-cache-size too large",
zap.Int("original", c), zap.Int("override", maxPrepStmtCacheSize))
c = maxPrepStmtCacheSize
}
*prepStmtCacheSize = c
return nil
}
14 changes: 0 additions & 14 deletions pkg/sink/mysql/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,6 @@ func TestApplySinkURIParamsToConfig(t *testing.T) {
expected.tidbTxnMode = "pessimistic"
expected.EnableOldValue = true
expected.CachePrepStmts = true
expected.PrepStmtCacheSize = 1000000
uriStr := "mysql://127.0.0.1:3306/?worker-count=64&max-txn-row=20" +
"&max-multi-update-row=80&max-multi-update-row-size=512" +
"&safe-mode=false" +
Expand Down Expand Up @@ -260,16 +259,6 @@ func TestParseSinkURIOverride(t *testing.T) {
checker: func(sp *Config) {
require.EqualValues(t, sp.tidbTxnMode, defaultTiDBTxnMode)
},
}, {
uri: "mysql://127.0.0.1:3306/?prep-stmt-cache-size=1048576",
checker: func(sp *Config) {
require.EqualValues(t, sp.PrepStmtCacheSize, maxPrepStmtCacheSize)
},
}, {
uri: "mysql://127.0.0.1:3306/",
checker: func(sp *Config) {
require.EqualValues(t, sp.PrepStmtCacheSize, defaultPrepStmtCacheSize)
},
}, {
uri: "mysql://127.0.0.1:3306/?cache-prep-stmts=false",
checker: func(sp *Config) {
Expand Down Expand Up @@ -313,9 +302,6 @@ func TestParseSinkURIBadQueryString(t *testing.T) {
"mysql://127.0.0.1:3306/?write-timeout=badduration",
"mysql://127.0.0.1:3306/?read-timeout=badduration",
"mysql://127.0.0.1:3306/?timeout=badduration",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=not-number",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=-1",
"mysql://127.0.0.1:3306/?prep-stmt-cache-size=0",
}
ctx := context.TODO()
var uri *url.URL
Expand Down

0 comments on commit a59888c

Please sign in to comment.