Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: refine heartbeat (#704)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored Jun 11, 2020
1 parent 615ad9f commit 9b6c012
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 14 deletions.
15 changes: 9 additions & 6 deletions syncer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -33,7 +34,7 @@ import (
// GRANT SELECT,UPDATE,INSERT,CREATE ON `your_database`.`heartbeat` to 'your_replicate_user'@'your_replicate_host';

const (
// when we not need to support MySQL <=5.5, we can replace with `2006-01-02 15:04:05.000000`
// still use "2006-01-02 15:04:05" rather than `2006-01-02 15:04:05.000000` to support parse old `2006-01-02 15:04:05`.
timeFormat = "2006-01-02 15:04:05"
)

Expand Down Expand Up @@ -96,8 +97,8 @@ func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error) {
heartbeat = &Heartbeat{
lock: make(chan struct{}, 1), // with buffer 1, no recursion supported
cfg: cfg,
schema: filter.DMHeartbeatSchema,
table: filter.DMHeartbeatTable,
schema: strings.ToUpper(filter.DMHeartbeatSchema),
table: strings.ToUpper(filter.DMHeartbeatTable),
slavesTs: make(map[string]float64),
logger: log.With(zap.String("component", "heartbeat")),
}
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *Heartbeat) RemoveTask(name string) error {

// TryUpdateTaskTs tries to update task's ts
func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{}) {
if schema != h.schema || table != h.table {
if strings.ToUpper(schema) != h.schema || strings.ToUpper(table) != h.table {
h.logger.Debug("don't need to handle non-heartbeat table", zap.String("schema", schema), zap.String("table", table))
return // not heartbeat table
}
Expand Down Expand Up @@ -292,8 +293,7 @@ func (h *Heartbeat) createTable() error {

// updateTS use `REPLACE` statement to insert or update ts
func (h *Heartbeat) updateTS() error {
// when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)`
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table)
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(6), ?)", h.schema, h.table)
_, err := h.master.Exec(query, h.cfg.serverID)
h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID))
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream)
Expand All @@ -308,6 +308,9 @@ func (h *Heartbeat) calculateLag(ctx context.Context) error {
select {
case h.lock <- struct{}{}:
for taskName, ts := range h.slavesTs {
if ts == 0 {
continue // do not update metrics if no valid slave TS exists.
}
lag := masterTS - ts
reportLagFunc(taskName, lag)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ Several convenient commands are provided:
* `check_port_alive <PORT>` - Wrapper to check a port is alive, at most 20 times.
* `check_port <HOST> <PORT>` - Checks a host:port is alive.
* `wait_process_exit <process_name>` - Wait for one or more processes to exit by given process name.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <VALUE PATTERN LIST>...` - check metric value from prometheus.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <LOWER BOUND> <UPPER BOUND>` - check metric value from prometheus.
* `truncate_trace_events <PORT>` - truncate trace server events records.

12 changes: 7 additions & 5 deletions tests/_utils/check_metric
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ set -eu
port=$1
metric_name=$2
retry_count=$3
lower=$4
upper=$5

shift 3

counter=0
while [ $counter -lt $retry_count ]; do
metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | awk '{print $2}')
for pattern in "$@"; do
if [ "$metric" == "${pattern}" ]; then
exit 0
fi
done
cmp1=$(awk 'BEGIN{ print "'$lower'"<"'$metric'" }')
cmp2=$(awk 'BEGIN{ print "'$metric'"<"'$upper'" }')
if [ $cmp1 -eq 1 ] && [ $cmp2 -eq 1 ]; then
exit 0
fi
((counter+=1))
echo "wait for valid metric for $counter-th time"
sleep 1
Expand Down
4 changes: 2 additions & 2 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ function run() {
# use sync_diff_inspector to check data now!
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2
check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2

export GO_FAILPOINTS=''
}
Expand Down

0 comments on commit 9b6c012

Please sign in to comment.