Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: refine heartbeat #704

Merged
merged 5 commits into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions syncer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -33,7 +34,7 @@ import (
// GRANT SELECT,UPDATE,INSERT,CREATE ON `your_database`.`heartbeat` to 'your_replicate_user'@'your_replicate_host';

const (
// when we not need to support MySQL <=5.5, we can replace with `2006-01-02 15:04:05.000000`
// still use "2006-01-02 15:04:05" rather than `2006-01-02 15:04:05.000000` to support parse old `2006-01-02 15:04:05`.
timeFormat = "2006-01-02 15:04:05"
)

Expand Down Expand Up @@ -96,8 +97,8 @@ func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error) {
heartbeat = &Heartbeat{
lock: make(chan struct{}, 1), // with buffer 1, no recursion supported
cfg: cfg,
schema: filter.DMHeartbeatSchema,
table: filter.DMHeartbeatTable,
schema: strings.ToUpper(filter.DMHeartbeatSchema),
table: strings.ToUpper(filter.DMHeartbeatTable),
slavesTs: make(map[string]float64),
logger: log.With(zap.String("component", "heartbeat")),
}
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *Heartbeat) RemoveTask(name string) error {

// TryUpdateTaskTs tries to update task's ts
func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{}) {
if schema != h.schema || table != h.table {
if strings.ToUpper(schema) != h.schema || strings.ToUpper(table) != h.table {
h.logger.Debug("don't need to handle non-heartbeat table", zap.String("schema", schema), zap.String("table", table))
return // not heartbeat table
}
Expand Down Expand Up @@ -292,8 +293,7 @@ func (h *Heartbeat) createTable() error {

// updateTS use `REPLACE` statement to insert or update ts
func (h *Heartbeat) updateTS() error {
// when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)`
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table)
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(6), ?)", h.schema, h.table)
_, err := h.master.Exec(query, h.cfg.serverID)
h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID))
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream)
Expand All @@ -308,6 +308,9 @@ func (h *Heartbeat) calculateLag(ctx context.Context) error {
select {
case h.lock <- struct{}{}:
for taskName, ts := range h.slavesTs {
if ts == 0 {
continue // do not update metrics if no valid slave TS exists.
}
lag := masterTS - ts
reportLagFunc(taskName, lag)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,6 @@ Several convenient commands are provided:
* `check_port_alive <PORT>` - Wrapper to check a port is alive, at most 20 times.
* `check_port <HOST> <PORT>` - Checks a host:port is alive.
* `wait_process_exit <process_name>` - Wait for one or more processes to exit by given process name.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <VALUE PATTERN LIST>...` - check metric value from prometheus.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <LOWER BOUND> <UPPER BOUND>` - check metric value from prometheus.
* `truncate_trace_events <PORT>` - truncate trace server events records.

12 changes: 7 additions & 5 deletions tests/_utils/check_metric
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ set -eu
port=$1
metric_name=$2
retry_count=$3
lower=$4
upper=$5

shift 3

counter=0
while [ $counter -lt $retry_count ]; do
metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | awk '{print $2}')
for pattern in "$@"; do
if [ "$metric" == "${pattern}" ]; then
exit 0
fi
done
cmp1=$(awk 'BEGIN{ print "'$lower'"<"'$metric'" }')
cmp2=$(awk 'BEGIN{ print "'$metric'"<"'$upper'" }')
if [ $cmp1 -eq 1 ] && [ $cmp2 -eq 1 ]; then
exit 0
fi
((counter+=1))
echo "wait for valid metric for $counter-th time"
sleep 1
Expand Down
4 changes: 2 additions & 2 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ function run() {
# use sync_diff_inspector to check data now!
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why need to increase the upper to 2? It seems that use UTC_TIMESTAMP(6) to replace UTC_TIMESTAMP when update ts will decrease the replication lag.

Copy link
Member Author

@csuzhangxc csuzhangxc Jun 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw lag with 1.x (but < 1.5) in some cases. in the old UTC_TIMESTAMP it ==1, but now it > 1.

check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2

export GO_FAILPOINTS=''
}
Expand Down