Skip to content

Commit

Permalink
yaml: heartbet
Browse files Browse the repository at this point in the history
Signed-off-by: Sugu Sougoumarane <[email protected]>
  • Loading branch information
sougou committed Apr 19, 2020
1 parent 1da39de commit 7adcba5
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 23 deletions.
7 changes: 4 additions & 3 deletions go/vt/vttablet/heartbeat/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,17 @@ type Reader struct {
// NewReader returns a new heartbeat reader.
func NewReader(env tabletenv.Env) *Reader {
config := env.Config()
if !config.HeartbeatEnable {
if config.HeartbeatIntervalMilliseconds == 0 {
return &Reader{}
}

heartbeatInterval := time.Duration(config.HeartbeatIntervalMilliseconds) * time.Millisecond
return &Reader{
env: env,
enabled: true,
now: time.Now,
interval: config.HeartbeatInterval,
ticks: timer.NewTimer(config.HeartbeatInterval),
interval: heartbeatInterval,
ticks: timer.NewTimer(heartbeatInterval),
errorLog: logutil.NewThrottledLogger("HeartbeatReporter", 60*time.Second),
pool: connpool.New(env, "HeartbeatReadPool", tabletenv.ConnPoolConfig{
Size: 1,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/heartbeat/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestReaderReadHeartbeatError(t *testing.T) {

func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *Reader {
config := tabletenv.NewDefaultConfig()
config.HeartbeatEnable = true
config.HeartbeatIntervalMilliseconds = 1000
params, _ := db.ConnParams().MysqlParams()
cp := *params
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
Expand Down
7 changes: 4 additions & 3 deletions go/vt/vttablet/heartbeat/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ type Writer struct {
// NewWriter creates a new Writer.
func NewWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *Writer {
config := env.Config()
if !config.HeartbeatEnable {
if config.HeartbeatIntervalMilliseconds == 0 {
return &Writer{}
}
heartbeatInterval := time.Duration(config.HeartbeatIntervalMilliseconds) * time.Millisecond
return &Writer{
env: env,
enabled: true,
tabletAlias: alias,
now: time.Now,
interval: config.HeartbeatInterval,
ticks: timer.NewTimer(config.HeartbeatInterval),
interval: heartbeatInterval,
ticks: timer.NewTimer(heartbeatInterval),
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
pool: connpool.New(env, "HeartbeatWritePool", tabletenv.ConnPoolConfig{
Size: 1,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/heartbeat/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func TestWriteHeartbeatError(t *testing.T) {

func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer {
config := tabletenv.NewDefaultConfig()
config.HeartbeatEnable = true
config.HeartbeatIntervalMilliseconds = 1000

params, _ := db.ConnParams().MysqlParams()
cp := *params
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/heartbeat_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type Reporter struct {
// RegisterReporter registers the heartbeat reader as a healthcheck reporter so that its
// measurements will be picked up in healthchecks.
func registerHeartbeatReporter(controller tabletserver.Controller) {
if !tabletenv.NewCurrentConfig().HeartbeatEnable {
if tabletenv.NewCurrentConfig().HeartbeatIntervalMilliseconds == 0 {
return
}

Expand Down
29 changes: 15 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ var (
enableHotRowProtectionDryRun bool
enableConsolidator bool
enableConsolidatorReplicas bool
enableHeartbeat bool
heartbeatInterval time.Duration
)

func init() {
Expand Down Expand Up @@ -126,8 +128,8 @@ func init() {
flag.BoolVar(&currentConfig.TransactionLimitByComponent, "transaction_limit_by_component", defaultConfig.TransactionLimitByComponent, "Include CallerID.component when considering who the user is for the purpose of transaction limit.")
flag.BoolVar(&currentConfig.TransactionLimitBySubcomponent, "transaction_limit_by_subcomponent", defaultConfig.TransactionLimitBySubcomponent, "Include CallerID.subcomponent when considering who the user is for the purpose of transaction limit.")

flag.BoolVar(&currentConfig.HeartbeatEnable, "heartbeat_enable", defaultConfig.HeartbeatEnable, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&currentConfig.HeartbeatInterval, "heartbeat_interval", defaultConfig.HeartbeatInterval, "How frequently to read and write replication heartbeat.")
flag.BoolVar(&enableHeartbeat, "heartbeat_enable", false, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&heartbeatInterval, "heartbeat_interval", 1*time.Second, "How frequently to read and write replication heartbeat.")

flag.BoolVar(&currentConfig.EnforceStrictTransTables, "enforce_strict_trans_tables", defaultConfig.EnforceStrictTransTables, "If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database.")
flag.BoolVar(&enableConsolidator, "enable-consolidator", true, "This option enables the query consolidator.")
Expand Down Expand Up @@ -160,6 +162,10 @@ func Init() {
currentConfig.Consolidator = Disable
}

if enableHeartbeat {
currentConfig.HeartbeatIntervalMilliseconds = int(heartbeatInterval / time.Millisecond)
}

switch *streamlog.QueryLogFormat {
case streamlog.QueryLogFormatText:
case streamlog.QueryLogFormatJSON:
Expand All @@ -178,12 +184,13 @@ func Init() {

// TabletConfig contains all the configuration for query service
type TabletConfig struct {
OltpReadPool ConnPoolConfig `json:"oltpReadPool,omitempty"`
OlapReadPool ConnPoolConfig `json:"olapReadPool,omitempty"`
TxPool ConnPoolConfig `json:"txPool,omitempty"`
Oltp OltpConfig `json:"oltp,omitempty"`
HotRowProtection HotRowProtectionConfig `json:"hotRowProtection,omitempty"`
Consolidator string `json:"consolidator,omitempty"`
OltpReadPool ConnPoolConfig `json:"oltpReadPool,omitempty"`
OlapReadPool ConnPoolConfig `json:"olapReadPool,omitempty"`
TxPool ConnPoolConfig `json:"txPool,omitempty"`
Oltp OltpConfig `json:"oltp,omitempty"`
HotRowProtection HotRowProtectionConfig `json:"hotRowProtection,omitempty"`
Consolidator string `json:"consolidator,omitempty"`
HeartbeatIntervalMilliseconds int `json:"heartbeatIntervalMilliseconds,omitempty"`

MessagePostponeCap int `json:"-"`
TxShutDownGracePeriod float64 `json:"-"`
Expand All @@ -206,9 +213,6 @@ type TabletConfig struct {

TransactionLimitConfig `json:"-"`

HeartbeatEnable bool `json:"-"`
HeartbeatInterval time.Duration `json:"-"`

EnforceStrictTransTables bool `json:"-"`
EnableQueryPlanFieldCaching bool `json:"-"`
}
Expand Down Expand Up @@ -374,9 +378,6 @@ var defaultConfig = TabletConfig{

TransactionLimitConfig: defaultTransactionLimitConfig(),

HeartbeatEnable: false,
HeartbeatInterval: 1 * time.Second,

EnforceStrictTransTables: true,
EnableQueryPlanFieldCaching: true,
}
Expand Down

0 comments on commit 7adcba5

Please sign in to comment.