From b6041f85f43d854f75a699e7d4bb0d227cfab9af Mon Sep 17 00:00:00 2001 From: Xuecheng Zhang Date: Thu, 11 Jun 2020 12:59:53 +0800 Subject: [PATCH] syncer: refine heartbeat (#704) --- syncer/heartbeat.go | 15 +++++++++------ tests/README.md | 2 +- tests/_utils/check_metric | 12 +++++++----- tests/all_mode/run.sh | 4 ++-- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/syncer/heartbeat.go b/syncer/heartbeat.go index 36f6715497..0e59f3ffb6 100644 --- a/syncer/heartbeat.go +++ b/syncer/heartbeat.go @@ -18,6 +18,7 @@ import ( "database/sql" "fmt" "reflect" + "strings" "sync" "time" @@ -33,7 +34,7 @@ import ( // GRANT SELECT,UPDATE,INSERT,CREATE ON `your_database`.`heartbeat` to 'your_replicate_user'@'your_replicate_host'; const ( - // when we not need to support MySQL <=5.5, we can replace with `2006-01-02 15:04:05.000000` + // still use "2006-01-02 15:04:05" rather than `2006-01-02 15:04:05.000000` to support parse old `2006-01-02 15:04:05`. timeFormat = "2006-01-02 15:04:05" ) @@ -96,8 +97,8 @@ func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error) { heartbeat = &Heartbeat{ lock: make(chan struct{}, 1), // with buffer 1, no recursion supported cfg: cfg, - schema: filter.DMHeartbeatSchema, - table: filter.DMHeartbeatTable, + schema: strings.ToUpper(filter.DMHeartbeatSchema), + table: strings.ToUpper(filter.DMHeartbeatTable), slavesTs: make(map[string]float64), logger: log.With(zap.String("component", "heartbeat")), } @@ -181,7 +182,7 @@ func (h *Heartbeat) RemoveTask(name string) error { // TryUpdateTaskTs tries to update task's ts func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{}) { - if schema != h.schema || table != h.table { + if strings.ToUpper(schema) != h.schema || strings.ToUpper(table) != h.table { h.logger.Debug("don't need to handle non-heartbeat table", zap.String("schema", schema), zap.String("table", table)) return // not heartbeat table } @@ -292,8 +293,7 @@ func (h *Heartbeat) createTable() error { // updateTS use `REPLACE` statement to insert or update ts func (h *Heartbeat) updateTS() error { - // when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)` - query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table) + query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(6), ?)", h.schema, h.table) _, err := h.master.Exec(query, h.cfg.serverID) h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID)) return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream) @@ -308,6 +308,9 @@ func (h *Heartbeat) calculateLag(ctx context.Context) error { select { case h.lock <- struct{}{}: for taskName, ts := range h.slavesTs { + if ts == 0 { + continue // do not update metrics if no valid slave TS exists. + } lag := masterTS - ts reportLagFunc(taskName, lag) } diff --git a/tests/README.md b/tests/README.md index d523a1946a..0d26289bcd 100644 --- a/tests/README.md +++ b/tests/README.md @@ -90,6 +90,6 @@ Several convenient commands are provided: * `check_port_alive ` - Wrapper to check a port is alive, at most 20 times. * `check_port ` - Checks a host:port is alive. * `wait_process_exit ` - Wait for one or more processes to exit by given process name. -* `check_metric ...` - check metric value from prometheus. +* `check_metric ` - check metric value from prometheus. * `truncate_trace_events ` - truncate trace server events records. diff --git a/tests/_utils/check_metric b/tests/_utils/check_metric index f41f5e6dee..279ac2c1c7 100755 --- a/tests/_utils/check_metric +++ b/tests/_utils/check_metric @@ -9,17 +9,19 @@ set -eu port=$1 metric_name=$2 retry_count=$3 +lower=$4 +upper=$5 shift 3 counter=0 while [ $counter -lt $retry_count ]; do metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | awk '{print $2}') - for pattern in "$@"; do - if [ "$metric" == "${pattern}" ]; then - exit 0 - fi - done + cmp1=$(awk 'BEGIN{ print "'$lower'"<"'$metric'" }') + cmp2=$(awk 'BEGIN{ print "'$metric'"<"'$upper'" }') + if [ $cmp1 -eq 1 ] && [ $cmp2 -eq 1 ]; then + exit 0 + fi ((counter+=1)) echo "wait for valid metric for $counter-th time" sleep 1 diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 5de8f1e732..c2ded3c44c 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -142,8 +142,8 @@ function run() { # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1 - # check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1 + # check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 + # check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 export GO_FAILPOINTS='' }