Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: make set system variable log shorter #7029

Merged
merged 3 commits into from
Jul 11, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ func (a *ExecStmt) buildExecutor(ctx sessionctx.Context) (Executor, error) {
var err error
isPointGet := IsPointGetWithPKOrUniqueKeyByAutoCommit(ctx, a.Plan)
if isPointGet {
log.Debugf("[con:%d][InitTxnWithStartTS] %s", ctx.GetSessionVars().ConnectionID, a.Text)
log.Debugf("con:%d InitTxnWithStartTS %s", ctx.GetSessionVars().ConnectionID, a.Text)
err = ctx.InitTxnWithStartTS(math.MaxUint64)
} else {
log.Debugf("[con:%d][ActivePendingTxn] %s", ctx.GetSessionVars().ConnectionID, a.Text)
log.Debugf("con:%d ActivePendingTxn %s", ctx.GetSessionVars().ConnectionID, a.Text)
err = ctx.ActivePendingTxn()
}
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func GetInfoSchema(ctx sessionctx.Context) infoschema.InfoSchema {
var is infoschema.InfoSchema
if snap := sessVar.SnapshotInfoschema; snap != nil {
is = snap.(infoschema.InfoSchema)
log.Infof("[con:%d] use snapshot schema %d", sessVar.ConnectionID, is.SchemaMetaVersion())
log.Infof("con:%d use snapshot schema %d", sessVar.ConnectionID, is.SchemaMetaVersion())
} else {
is = sessVar.TxnCtx.InfoSchema.(infoschema.InfoSchema)
}
Expand Down
4 changes: 2 additions & 2 deletions executor/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
valStr, err = value.ToString()
terror.Log(errors.Trace(err))
}
log.Infof("[con:%d] set system variable %s = %s", sessionVars.ConnectionID, name, valStr)
log.Infof("con:%d %s=%s", sessionVars.ConnectionID, name, valStr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to keep this format [con:%d]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are places where con: is not enclosed by '[]'.
How about we should remove all the [] that enclosing con:?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we more shorter use c: instead of conn:?

and I have a idea: maybe we encapsulation all log method and in ctx idioms, in the method body it use prealloc log header in thread load level(e.g. "c:123, tx:12, tr:33"), then need for every place to to fomat conn mannul, and no need to alloc header part every time?

}

if name == variable.TxnIsolation {
Expand Down Expand Up @@ -259,7 +259,7 @@ func (e *SetExecutor) loadSnapshotInfoSchemaIfNeeded(name string) error {
vars.SnapshotInfoschema = nil
return nil
}
log.Infof("[con:%d] loadSnapshotInfoSchema, SnapshotTS:%d", vars.ConnectionID, vars.SnapshotTS)
log.Infof("con:%d loadSnapshotInfoSchema, SnapshotTS:%d", vars.ConnectionID, vars.SnapshotTS)
dom := domain.GetDomain(e.ctx)
snapInfo, err := dom.GetSnapshotInfoSchema(vars.SnapshotTS)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (e *SimpleExec) executeCommit(s *ast.CommitStmt) {

func (e *SimpleExec) executeRollback(s *ast.RollbackStmt) error {
sessVars := e.ctx.GetSessionVars()
log.Debugf("[con:%d] execute rollback statement", sessVars.ConnectionID)
log.Debugf("con:%d execute rollback statement", sessVars.ConnectionID)
sessVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
if e.ctx.Txn().Valid() {
e.ctx.GetSessionVars().TxnCtx.ClearDelta()
Expand Down
8 changes: 4 additions & 4 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ func (cc *clientConn) Run() {
if terror.ErrorNotEqual(err, io.EOF) {
errStack := errors.ErrorStack(err)
if !strings.Contains(errStack, "use of closed network connection") {
log.Errorf("[con:%d] read packet error, close this connection %s",
log.Errorf("con:%d read packet error, close this connection %s",
cc.connectionID, errStack)
}
}
Expand All @@ -472,11 +472,11 @@ func (cc *clientConn) Run() {
cc.addMetrics(data[0], startTime, nil)
return
} else if terror.ErrResultUndetermined.Equal(err) {
log.Errorf("[con:%d] result undetermined error, close this connection %s",
log.Errorf("con:%d result undetermined error, close this connection %s",
cc.connectionID, errors.ErrorStack(err))
return
} else if terror.ErrCritical.Equal(err) {
log.Errorf("[con:%d] critical error, stop the server listener %s",
log.Errorf("con:%d critical error, stop the server listener %s",
cc.connectionID, errors.ErrorStack(err))
metrics.CriticalErrorCounter.Add(1)
select {
Expand All @@ -485,7 +485,7 @@ func (cc *clientConn) Run() {
}
return
}
log.Warnf("[con:%d] dispatch error:\n%s\n%q\n%s",
log.Warnf("con:%d dispatch error:\n%s\n%q\n%s",
cc.connectionID, cc, queryStrForLog(string(data[1:])), errStrForLog(err))
err1 := cc.writeError(err)
terror.Log(errors.Trace(err1))
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,9 @@ func (s *Server) onConn(c net.Conn) {
terror.Log(errors.Trace(err))
return
}
log.Infof("[con:%d] new connection %s", conn.connectionID, c.RemoteAddr().String())
log.Infof("con:%d new connection %s", conn.connectionID, c.RemoteAddr().String())
defer func() {
log.Infof("[con:%d] close connection", conn.connectionID)
log.Infof("con:%d close connection", conn.connectionID)
}()
s.rwlock.Lock()
s.clients[conn.connectionID] = conn
Expand Down
18 changes: 9 additions & 9 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
// BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement,
// Finally t1 will have more data than t2, with no errors return to user!
if s.isRetryableError(err) && !s.sessionVars.BatchInsert && commitRetryLimit > 0 {
log.Warnf("[con:%d] retryable error: %v, txn: %v", s.sessionVars.ConnectionID, err, s.txn)
log.Warnf("con:%d retryable error: %v, txn: %v", s.sessionVars.ConnectionID, err, s.txn)
// Transactions will retry 2 ~ commitRetryLimit times.
// We make larger transactions retry less times to prevent cluster resource outage.
txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit)
Expand All @@ -359,7 +359,7 @@ func (s *session) doCommitWithRetry(ctx context.Context) error {
}

if err != nil {
log.Warnf("[con:%d] finished txn:%v, %v", s.sessionVars.ConnectionID, s.txn, err)
log.Warnf("con:%d finished txn:%v, %v", s.sessionVars.ConnectionID, s.txn, err)
return errors.Trace(err)
}
mapper := s.GetSessionVars().TxnCtx.TableDeltaMap
Expand Down Expand Up @@ -508,17 +508,17 @@ func (s *session) retry(ctx context.Context, maxCnt uint) error {
}
}
if !s.isRetryableError(err) {
log.Warnf("[con:%d] session:%v, err:%v in retry", connID, s, err)
log.Warnf("con:%d session:%v, err:%v in retry", connID, s, err)
metrics.SessionRetryErrorCounter.WithLabelValues(metrics.LblUnretryable)
return errors.Trace(err)
}
retryCnt++
if retryCnt >= maxCnt {
log.Warnf("[con:%d] Retry reached max count %d", connID, retryCnt)
log.Warnf("con:%d Retry reached max count %d", connID, retryCnt)
metrics.SessionRetryErrorCounter.WithLabelValues(metrics.LblReachMax)
return errors.Trace(err)
}
log.Warnf("[con:%d] retryable error: %v, txn: %v", connID, err, s.txn)
log.Warnf("con:%d retryable error: %v, txn: %v", connID, err, s.txn)
kv.BackOff(retryCnt)
s.txn.changeToInvalid()
s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, false)
Expand Down Expand Up @@ -739,7 +739,7 @@ func (s *session) executeStatement(ctx context.Context, connID uint64, stmtNode
recordSet, err := runStmt(ctx, s, stmt)
if err != nil {
if !kv.ErrKeyExists.Equal(err) {
log.Warnf("[con:%d][schema ver:%d] session error:\n%v\n%s",
log.Warnf("con:%d schema_ver:%d session error:\n%v\n%s",
connID, s.sessionVars.TxnCtx.SchemaVersion, errors.ErrorStack(err), s)
}
return nil, errors.Trace(err)
Expand Down Expand Up @@ -815,7 +815,7 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.Rec
stmtNodes, err := s.ParseSQL(ctx, sql, charsetInfo, collation)
if err != nil {
s.rollbackOnError(ctx)
log.Warnf("[con:%d] parse error:\n%v\n%s", connID, err, sql)
log.Warnf("con:%d parse error:\n%v\n%s", connID, err, sql)
return nil, errors.Trace(err)
}
metrics.SessionExecuteParseDuration.Observe(time.Since(startTS).Seconds())
Expand All @@ -833,7 +833,7 @@ func (s *session) execute(ctx context.Context, sql string) (recordSets []ast.Rec
stmt, err := compiler.Compile(ctx, stmtNode)
if err != nil {
s.rollbackOnError(ctx)
log.Warnf("[con:%d] compile error:\n%v\n%s", connID, err, sql)
log.Warnf("con:%d compile error:\n%v\n%s", connID, err, sql)
return nil, errors.Trace(err)
}
metrics.SessionExecuteCompileDuration.Observe(time.Since(startTS).Seconds())
Expand Down Expand Up @@ -986,7 +986,7 @@ func (s *session) NewTxn() error {
return errors.Trace(err)
}
vars := s.GetSessionVars()
log.Infof("[con:%d][schema ver:%d] NewTxn() inside a transaction auto commit: %d", vars.ConnectionID, vars.TxnCtx.SchemaVersion, txnID)
log.Infof("con:%d schema_ver:%d NewTxn() inside a transaction auto commit: %d", vars.ConnectionID, vars.TxnCtx.SchemaVersion, txnID)
}

txn, err := s.store.Begin()
Expand Down
34 changes: 17 additions & 17 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func newTwoPhaseCommitter(txn *tikvTxn, connID uint64) (*twoPhaseCommitter, erro
const logSize = 4 * 1024 * 1024 // 4MB
if len(keys) > logEntryCount || size > logSize {
tableID := tablecodec.DecodeTableID(keys[0])
log.Infof("[BIG_TXN] [con:%d] table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
log.Infof("[BIG_TXN] con:%d table id:%d size:%d, keys:%d, puts:%d, dels:%d, locks:%d, startTS:%d",
connID, tableID, size, len(keys), putCnt, delCnt, lockCnt, txn.startTS)
}

Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *twoPhaseCommitter) doActionOnKeys(bo *Backoffer, action twoPhaseCommitA
twoPhaseCommitGP.Go(func() {
e := c.doActionOnBatches(bo, action, batches)
if e != nil {
log.Debugf("[con:%d] 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
log.Debugf("con:%d 2PC async doActionOnBatches %s err: %v", c.connID, action, e)
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("commit").Inc()
}
})
Expand All @@ -256,7 +256,7 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
if len(batches) == 1 {
e := singleBatchActionFunc(bo, batches[0])
if e != nil {
log.Debugf("[con:%d] 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
}
return errors.Trace(e)
}
Expand Down Expand Up @@ -294,10 +294,10 @@ func (c *twoPhaseCommitter) doActionOnBatches(bo *Backoffer, action twoPhaseComm
var err error
for i := 0; i < len(batches); i++ {
if e := <-ch; e != nil {
log.Debugf("[con:%d] 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
log.Debugf("con:%d 2PC doActionOnBatches %s failed: %v, tid: %d", c.connID, action, e, c.startTS)
// Cancel other requests and return the first error.
if cancel != nil {
log.Debugf("[con:%d] 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
log.Debugf("con:%d 2PC doActionOnBatches %s to cancel other actions, tid: %d", c.connID, action, c.startTS)
cancel()
}
if err == nil {
Expand Down Expand Up @@ -370,7 +370,7 @@ func (c *twoPhaseCommitter) prewriteSingleBatch(bo *Backoffer, batch batchKeys)
if err1 != nil {
return errors.Trace(err1)
}
log.Debugf("[con:%d] 2PC prewrite encounters lock: %v", c.connID, lock)
log.Debugf("con:%d 2PC prewrite encounters lock: %v", c.connID, lock)
locks = append(locks, lock)
}
ok, err := c.store.lockResolver.ResolveLocks(bo, locks)
Expand Down Expand Up @@ -478,7 +478,7 @@ func (c *twoPhaseCommitter) commitSingleBatch(bo *Backoffer, batch batchKeys) er
if keyErr := commitResp.GetError(); keyErr != nil {
c.mu.RLock()
defer c.mu.RUnlock()
err = errors.Errorf("[con:%d] 2PC commit failed: %v", c.connID, keyErr.String())
err = errors.Errorf("con:%d 2PC commit failed: %v", c.connID, keyErr.String())
if c.mu.committed {
// No secondary key could be rolled back after it's primary key is committed.
// There must be a serious bug somewhere.
Expand Down Expand Up @@ -527,7 +527,7 @@ func (c *twoPhaseCommitter) cleanupSingleBatch(bo *Backoffer, batch batchKeys) e
return errors.Trace(err)
}
if keyErr := resp.BatchRollback.GetError(); keyErr != nil {
err = errors.Errorf("[con:%d] 2PC cleanup failed: %s", c.connID, keyErr)
err = errors.Errorf("con:%d 2PC cleanup failed: %s", c.connID, keyErr)
log.Debugf("2PC failed cleanup key: %v, tid: %d", err, c.startTS)
return errors.Trace(err)
}
Expand Down Expand Up @@ -576,9 +576,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
err := c.cleanupKeys(NewBackoffer(context.Background(), cleanupMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
metrics.TiKVSecondaryLockCleanupFailureCounter.WithLabelValues("rollback").Inc()
log.Infof("[con:%d] 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
log.Infof("con:%d 2PC cleanup err: %v, tid: %d", c.connID, err, c.startTS)
} else {
log.Infof("[con:%d] 2PC clean up done, tid: %d", c.connID, c.startTS)
log.Infof("con:%d 2PC clean up done, tid: %d", c.connID, c.startTS)
}
c.cleanWg.Done()
})
Expand Down Expand Up @@ -608,19 +608,19 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}
}
if err != nil {
log.Debugf("[con:%d] 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
log.Debugf("con:%d 2PC failed on prewrite: %v, tid: %d", c.connID, err, c.startTS)
return errors.Trace(err)
}

commitTS, err := c.store.getTimestampWithRetry(NewBackoffer(ctx, tsoMaxBackoff).WithVars(c.txn.vars))
if err != nil {
log.Warnf("[con:%d] 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
log.Warnf("con:%d 2PC get commitTS failed: %v, tid: %d", c.connID, err, c.startTS)
return errors.Trace(err)
}

// check commitTS
if commitTS <= c.startTS {
err = errors.Errorf("[con:%d] Invalid transaction tso with start_ts=%v while commit_ts=%v",
err = errors.Errorf("con:%d Invalid transaction tso with start_ts=%v while commit_ts=%v",
c.connID, c.startTS, commitTS)
log.Error(err)
return errors.Trace(err)
Expand All @@ -631,21 +631,21 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) error {
}

if c.store.oracle.IsExpired(c.startTS, maxTxnTimeUse) {
err = errors.Errorf("[con:%d] txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
err = errors.Errorf("con:%d txn takes too much time, start: %d, commit: %d", c.connID, c.startTS, c.commitTS)
return errors.Annotate(err, txnRetryableMark)
}

err = c.commitKeys(NewBackoffer(ctx, CommitMaxBackoff).WithVars(c.txn.vars), c.keys)
if err != nil {
if undeterminedErr := c.getUndeterminedErr(); undeterminedErr != nil {
log.Warnf("[con:%d] 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
log.Warnf("con:%d 2PC commit result undetermined, err: %v, rpcErr: %v, tid: %v", c.connID, err, undeterminedErr, c.startTS)
err = errors.Wrap(err, terror.ErrResultUndetermined)
}
if !c.mu.committed {
log.Debugf("[con:%d] 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
log.Debugf("con:%d 2PC failed on commit: %v, tid: %d", c.connID, err, c.startTS)
return errors.Trace(err)
}
log.Debugf("[con:%d] 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
log.Debugf("con:%d 2PC succeed with error: %v, tid: %d", c.connID, err, c.startTS)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion table/column.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func CastValue(ctx sessionctx.Context, val types.Datum, col *model.ColumnInfo) (
continue
}
err = ErrTruncateWrongValue.FastGen("incorrect utf8 value %x(%s) for column %s", casted.GetBytes(), str, col.Name)
log.Errorf("[con:%d] %v", ctx.GetSessionVars().ConnectionID, err)
log.Errorf("con:%d %v", ctx.GetSessionVars().ConnectionID, err)
// Truncate to valid utf8 string.
casted = types.NewStringDatum(str[:i])
err = sc.HandleTruncate(err)
Expand Down
8 changes: 4 additions & 4 deletions x-server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,24 +51,24 @@ func (cc *clientConn) Run() {
tp, payload, err := cc.readPacket()
if err != nil {
if terror.ErrorNotEqual(err, io.EOF) {
log.Errorf("[con:%d] read packet error, close this connection %s",
log.Errorf("con:%d read packet error, close this connection %s",
cc.connectionID, errors.ErrorStack(err))
}
return
}
if err = cc.dispatch(tp, payload); err != nil {
if terror.ErrorEqual(err, terror.ErrResultUndetermined) {
log.Errorf("[con:%d] result undetermined error, close this connection %s",
log.Errorf("con:%d result undetermined error, close this connection %s",
cc.connectionID, errors.ErrorStack(err))
} else if terror.ErrorEqual(err, terror.ErrCritical) {
log.Errorf("[con:%d] critical error, stop the server listener %s",
log.Errorf("con:%d critical error, stop the server listener %s",
cc.connectionID, errors.ErrorStack(err))
select {
case cc.server.stopListenerCh <- struct{}{}:
default:
}
}
log.Warnf("[con:%d] dispatch error: %s, %s", cc.connectionID, cc, err)
log.Warnf("con:%d dispatch error: %s, %s", cc.connectionID, cc, err)
cc.writeError(err)
return
}
Expand Down
4 changes: 2 additions & 2 deletions x-server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (s *Server) shouldStopListener() bool {
func (s *Server) onConn(c net.Conn) {
conn := s.newConn(c)
defer func() {
log.Infof("[con:%d] close x protocol connection", conn.connectionID)
log.Infof("con:%d close x protocol connection", conn.connectionID)
}()
if err := conn.handshake(); err != nil {
// Some keep alive services will send request to TiDB and disconnect immediately.
Expand All @@ -144,7 +144,7 @@ func (s *Server) newConn(conn net.Conn) *clientConn {
collation: mysql.DefaultCollationID,
alloc: arena.NewAllocator(32 * 1024),
}
log.Infof("[con:%d] new x protocol connection %s", cc.connectionID, conn.RemoteAddr().String())
log.Infof("con:%d new x protocol connection %s", cc.connectionID, conn.RemoteAddr().String())
cc.salt = util.RandomBuf(20)
return cc
}