Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: refine heartbeat (#704) #729

Merged
merged 1 commit into from
Jun 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions syncer/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"database/sql"
"fmt"
"reflect"
"strings"
"sync"
"time"

Expand All @@ -33,7 +34,7 @@ import (
// GRANT SELECT,UPDATE,INSERT,CREATE ON `your_database`.`heartbeat` to 'your_replicate_user'@'your_replicate_host';

const (
// when we not need to support MySQL <=5.5, we can replace with `2006-01-02 15:04:05.000000`
// still use "2006-01-02 15:04:05" rather than `2006-01-02 15:04:05.000000` to support parse old `2006-01-02 15:04:05`.
timeFormat = "2006-01-02 15:04:05"
)

Expand Down Expand Up @@ -96,8 +97,8 @@ func GetHeartbeat(cfg *HeartbeatConfig) (*Heartbeat, error) {
heartbeat = &Heartbeat{
lock: make(chan struct{}, 1), // with buffer 1, no recursion supported
cfg: cfg,
schema: filter.DMHeartbeatSchema,
table: filter.DMHeartbeatTable,
schema: strings.ToUpper(filter.DMHeartbeatSchema),
table: strings.ToUpper(filter.DMHeartbeatTable),
slavesTs: make(map[string]float64),
logger: log.With(zap.String("component", "heartbeat")),
}
Expand Down Expand Up @@ -181,7 +182,7 @@ func (h *Heartbeat) RemoveTask(name string) error {

// TryUpdateTaskTs tries to update task's ts
func (h *Heartbeat) TryUpdateTaskTs(taskName, schema, table string, data [][]interface{}) {
if schema != h.schema || table != h.table {
if strings.ToUpper(schema) != h.schema || strings.ToUpper(table) != h.table {
h.logger.Debug("don't need to handle non-heartbeat table", zap.String("schema", schema), zap.String("table", table))
return // not heartbeat table
}
Expand Down Expand Up @@ -292,8 +293,7 @@ func (h *Heartbeat) createTable() error {

// updateTS use `REPLACE` statement to insert or update ts
func (h *Heartbeat) updateTS() error {
// when we not need to support MySQL <=5.5, we can replace with `UTC_TIMESTAMP(6)`
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(), ?)", h.schema, h.table)
query := fmt.Sprintf("REPLACE INTO `%s`.`%s` (`ts`, `server_id`) VALUES(UTC_TIMESTAMP(6), ?)", h.schema, h.table)
_, err := h.master.Exec(query, h.cfg.serverID)
h.logger.Debug("update ts", zap.String("sql", query), zap.Uint32("server ID", h.cfg.serverID))
return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeUpstream)
Expand All @@ -308,6 +308,9 @@ func (h *Heartbeat) calculateLag(ctx context.Context) error {
select {
case h.lock <- struct{}{}:
for taskName, ts := range h.slavesTs {
if ts == 0 {
continue // do not update metrics if no valid slave TS exists.
}
lag := masterTS - ts
reportLagFunc(taskName, lag)
}
Expand Down
2 changes: 1 addition & 1 deletion tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,6 @@ Several convenient commands are provided:
* `check_port_alive <PORT>` - Wrapper to check a port is alive, at most 20 times.
* `check_port <HOST> <PORT>` - Checks a host:port is alive.
* `wait_process_exit <process_name>` - Wait for one or more processes to exit by given process name.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <VALUE PATTERN LIST>...` - check metric value from prometheus.
* `check_metric <PORT> <METRIC_NAME> <RETRY_COUNT> <LOWER BOUND> <UPPER BOUND>` - check metric value from prometheus.
* `truncate_trace_events <PORT>` - truncate trace server events records.

12 changes: 7 additions & 5 deletions tests/_utils/check_metric
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,19 @@ set -eu
port=$1
metric_name=$2
retry_count=$3
lower=$4
upper=$5

shift 3

counter=0
while [ $counter -lt $retry_count ]; do
metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | awk '{print $2}')
for pattern in "$@"; do
if [ "$metric" == "${pattern}" ]; then
exit 0
fi
done
cmp1=$(awk 'BEGIN{ print "'$lower'"<"'$metric'" }')
cmp2=$(awk 'BEGIN{ print "'$metric'"<"'$upper'" }')
if [ $cmp1 -eq 1 ] && [ $cmp2 -eq 1 ]; then
exit 0
fi
((counter+=1))
echo "wait for valid metric for $counter-th time"
sleep 1
Expand Down
4 changes: 2 additions & 2 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ function run() {
# use sync_diff_inspector to check data now!
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
# check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
# check_metric $WORKER1_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2
# check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 2

export GO_FAILPOINTS=''
}
Expand Down