diff --git a/syncer/heartbeat.go b/syncer/heartbeat.go index 36f6715497..0e59f3ffb6 100644 --- a/syncer/heartbeat.go +++ b/syncer/heartbeat.go @@ -18,6 +18,7 @@ import ( "database/sql" "fmt" "reflect" + "strings" "sync" "time" @@ -33,7 +34,7 @@ import ( // GRANT SELECT,UPDATE,INSERT,CREATE ON `your_database`.`heartbeat` to 'your_replicate_user'@'your_replicate_host'; const ( - // when we not need to support MySQL <=5.5, we can replace with `2006-01-02 15:04:05.000000` + // still use "2006-01-02 15:04:05" rather than `2006-01-02 15:04:05.000000` to support parse old `2006-01-02 15:04:05`. timeFormat = "2006-01-02 15:04:05" ) @@ -96,8 +97,8 @@ func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error) { heartbeat = &Heartbeat{ lock: make(chan struct{}, 1), // with buffer 1, no recursion supported cfg: cfg, - schema: filter.DMHeartbeatSchema, - table: filter.DMHeartbeatTable, + schema: strings.ToUpper(filter.DMHeartbeatSchema), + table: strings.ToUpper(filter.DMHeartbeatTable), slavesTs: make(map[string]float64), logger: log.With(zap.String("component", "heartbeat")), } @@ -181,7 +182,7 @@ func (h *Heartbeat) RemoveTask(name string) error { // TryUpdateTaskTs tries to update task's ts func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{}) { - if schema != h.schema || table != h.table { + if strings.ToUpper(schema) != h.schema || strings.ToUpper(table) != h.table { h.logger.Debug("don't need to handle non-heartbeat table", zap.String("schema", schema), zap.String("table", table)) return // not heartbeat table } @@ -292,8 +293,7 @@ func (h *Heartbeat) createTable() error { // updateTS use `REPLACE` statement to insert or update ts func (h *Heartbeat) updateTS() error { - // when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)` - query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table) + query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(6), ?)", h.schema, h.table) _, err := h.master.Exec(query, h.cfg.serverID) h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID)) return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream) @@ -308,6 +308,9 @@ func (h *Heartbeat) calculateLag(ctx context.Context) error { select { case h.lock <- struct{}{}: for taskName, ts := range h.slavesTs { + if ts == 0 { + continue // do not update metrics if no valid slave TS exists. + } lag := masterTS - ts reportLagFunc(taskName, lag) } diff --git a/tests/README.md b/tests/README.md index b3bdfbdd06..03778146f7 100644 --- a/tests/README.md +++ b/tests/README.md @@ -91,6 +91,6 @@ Several convenient commands are provided: * `check_port_alive ` - Wrapper to check a port is alive, at most 20 times. * `check_port ` - Checks a host:port is alive. * `wait_process_exit ` - Wait for one or more processes to exit by given process name. -* `check_metric ...` - check metric value from prometheus. +* `check_metric ` - check metric value from prometheus. * `truncate_trace_events ` - truncate trace server events records. diff --git a/tests/_utils/check_metric b/tests/_utils/check_metric index f41f5e6dee..279ac2c1c7 100755 --- a/tests/_utils/check_metric +++ b/tests/_utils/check_metric @@ -9,17 +9,19 @@ set -eu port=$1 metric_name=$2 retry_count=$3 +lower=$4 +upper=$5 shift 3 counter=0 while [ $counter -lt $retry_count ]; do metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | awk '{print $2}') - for pattern in "$@"; do - if [ "$metric" == "${pattern}" ]; then - exit 0 - fi - done + cmp1=$(awk 'BEGIN{ print "'$lower'"<"'$metric'" }') + cmp2=$(awk 'BEGIN{ print "'$metric'"<"'$upper'" }') + if [ $cmp1 -eq 1 ] && [ $cmp2 -eq 1 ]; then + exit 0 + fi ((counter+=1)) echo "wait for valid metric for $counter-th time" sleep 1 diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index aa434d3ba6..f7d1a2cc6b 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -101,8 +101,8 @@ function run() { # use sync_diff_inspector to check data now! check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1 - check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1 + check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 + check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2 export GO_FAILPOINTS='' }