Skip to content

Commit

Permalink
add general query log (#110)
Browse files Browse the repository at this point in the history
  • Loading branch information
chicliz authored Sep 18, 2020
1 parent 8279947 commit 6d41aa0
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 29 deletions.
19 changes: 14 additions & 5 deletions log/xlog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type XFileLog struct {
level int

skip int
runtime bool
file *os.File
errFile *os.File
hostname string
Expand Down Expand Up @@ -79,6 +80,14 @@ func (p *XFileLog) Init(config map[string]string) (err error) {
if len(service) > 0 {
p.service = service
}

runtime, ok := config["runtime"]
if !ok || runtime == "true" || runtime == "TRUE" {
p.runtime = true
} else {
p.runtime = false
}

skip, _ := config["skip"]
if len(skip) > 0 {
skipNum, err := strconv.Atoi(skip)
Expand Down Expand Up @@ -264,7 +273,7 @@ func (p *XFileLog) Warnx(logID, format string, a ...interface{}) error {
func (p *XFileLog) warnx(logID, format string, a ...interface{}) error {
logText := formatValue(format, a...)
fun, filename, lineno := getRuntimeInfo(p.skip)
logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno)
logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno)
//logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText)

return p.write(WarnLevel, &logText, logID)
Expand All @@ -291,7 +300,7 @@ func (p *XFileLog) Fatalx(logID, format string, a ...interface{}) error {
func (p *XFileLog) fatalx(logID, format string, a ...interface{}) error {
logText := formatValue(format, a...)
fun, filename, lineno := getRuntimeInfo(p.skip)
logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno)
logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno)
//logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText)

return p.write(FatalLevel, &logText, logID)
Expand All @@ -316,7 +325,7 @@ func (p *XFileLog) Noticex(logID, format string, a ...interface{}) error {
func (p *XFileLog) noticex(logID, format string, a ...interface{}) error {
logText := formatValue(format, a...)
fun, filename, lineno := getRuntimeInfo(p.skip)
logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno)
logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno)

return p.write(NoticeLevel, &logText, logID)
}
Expand All @@ -338,7 +347,7 @@ func (p *XFileLog) tracex(logID, format string, a ...interface{}) error {

logText := formatValue(format, a...)
fun, filename, lineno := getRuntimeInfo(p.skip)
logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno)
logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno)
//logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText)

return p.write(TraceLevel, &logText, logID)
Expand All @@ -356,7 +365,7 @@ func (p *XFileLog) debugx(logID, format string, a ...interface{}) error {

logText := formatValue(format, a...)
fun, filename, lineno := getRuntimeInfo(p.skip)
logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno)
logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno)

return p.write(DebugLevel, &logText, logID)
}
Expand Down
19 changes: 10 additions & 9 deletions log/xlog/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func getRuntimeInfo(skip int) (function, filename string, lineno int) {
if ok {
function = runtime.FuncForPC(pc).Name()
}

return
}

Expand Down Expand Up @@ -106,17 +105,19 @@ func formatValue(format string, a ...interface{}) (result string) {
return
}

func formatLineInfo(functionName, filename, logText string, lineno int) string {
func formatLineInfo(runtime bool, functionName, filename, logText string, lineno int) string {
var buffer bytes.Buffer
buffer.WriteString("[")
buffer.WriteString(functionName)
buffer.WriteString(":")
if runtime {
buffer.WriteString("[")
buffer.WriteString(functionName)
buffer.WriteString(":")

buffer.WriteString(filename)
buffer.WriteString(":")
buffer.WriteString(filename)
buffer.WriteString(":")

buffer.WriteString(strconv.FormatInt(int64(lineno), 10))
buffer.WriteString("] ")
buffer.WriteString(strconv.FormatInt(int64(lineno), 10))
buffer.WriteString("] ")
}
buffer.WriteString(logText)

return buffer.String()
Expand Down
1 change: 1 addition & 0 deletions models/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

// Namespace means namespace model stored in etcd
type Namespace struct {
OpenGeneralLog bool `json:"open_general_log"`
IsEncrypt bool `json:"is_encrypt"` // true: 加密存储 false: 非加密存储,目前加密Slice、User中的用户名、密码
Name string `json:"name"`
Online bool `json:"online"`
Expand Down
41 changes: 38 additions & 3 deletions proxy/server/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/XiaoMi/Gaea/backend"
Expand All @@ -36,15 +37,18 @@ import (
const (
// master comments
masterComment = "/*master*/"
// general query log variable
gaeaGeneralLogVariable = "gaea_general_log"
)

// SessionExecutor is bound to a session, so requests are serializable
type SessionExecutor struct {
manager *Manager

namespace string
user string
db string
namespace string
user string
db string
clientAddr string

status uint16
lastInsertID uint64
Expand Down Expand Up @@ -192,6 +196,15 @@ func (se *SessionExecutor) setStringSessionVariable(name string, valueStr string
return se.sessionVariables.Set(name, valueStr)
}

func (se *SessionExecutor) setGeneralLogVariable(valueStr string) error {
v, err := strconv.Atoi(valueStr)
if err != nil {
return errors.ErrInvalidArgument
}
atomic.StoreUint32(&ProcessGeneralLog, uint32(v))
return nil
}

// GetLastInsertID return last_inert_id
func (se *SessionExecutor) GetLastInsertID() uint64 {
return se.lastInsertID
Expand Down Expand Up @@ -595,7 +608,29 @@ func createShowDatabaseResult(dbs []string) *mysql.Result {
}

plan.GenerateSelectResultRowData(result)
return result
}

func createShowGeneralLogResult() *mysql.Result {
r := new(mysql.Resultset)

field := &mysql.Field{}
field.Name = hack.Slice(gaeaGeneralLogVariable)
r.Fields = append(r.Fields, field)

var value string
if OpenProcessGeneralQueryLog() {
value = "ON"
} else {
value = "OFF"
}
r.Values = append(r.Values, []interface{}{value})
result := &mysql.Result{
AffectedRows: 1,
Resultset: r,
}

plan.GenerateSelectResultRowData(result)
return result
}

Expand Down
19 changes: 14 additions & 5 deletions proxy/server/executor_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,11 @@ func (se *SessionExecutor) handleQuery(sql string) (r *mysql.Result, err error)
}

startTime := time.Now()

stmtType := parser.Preview(sql)
reqCtx.Set(util.StmtType, stmtType)

r, err = se.doQuery(reqCtx, sql)
se.manager.RecordSessionSQLMetrics(reqCtx, se.namespace, sql, startTime, err)
se.manager.RecordSessionSQLMetrics(reqCtx, se, sql, startTime, err)
return r, err
}

Expand Down Expand Up @@ -172,8 +171,12 @@ func (se *SessionExecutor) handleShow(reqCtx *util.RequestContext, sql string, s
switch stmt.Tp {
case ast.ShowDatabases:
dbs := se.GetNamespace().GetAllowedDBs()
r := createShowDatabaseResult(dbs)
return r, nil
return createShowDatabaseResult(dbs), nil
case ast.ShowVariables:
if strings.Contains(sql, gaeaGeneralLogVariable) {
return createShowGeneralLogResult(), nil
}
fallthrough
default:
r, err := se.ExecuteSQL(reqCtx, backend.DefaultSlice, se.db, sql)
if err != nil {
Expand Down Expand Up @@ -281,10 +284,16 @@ func (se *SessionExecutor) handleSetVariable(v *ast.VariableAssignment) error {
return nil
case "sql_select_limit":
return nil

// unsupported
case "transaction":
return fmt.Errorf("does not support set transaction in gaea")
case gaeaGeneralLogVariable:
value := getVariableExprResult(v.Value)
onOffValue, err := getOnOffVariable(value)
if err != nil {
return mysql.NewDefaultError(mysql.ErrWrongValueForVar, name, value)
}
return se.setGeneralLogVariable(onOffValue)
default:
return nil
}
Expand Down
33 changes: 28 additions & 5 deletions proxy/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/XiaoMi/Gaea/core/errors"
"github.com/XiaoMi/Gaea/log"
"github.com/XiaoMi/Gaea/log/xlog"
"github.com/XiaoMi/Gaea/models"
"github.com/XiaoMi/Gaea/mysql"
"github.com/XiaoMi/Gaea/parser"
Expand Down Expand Up @@ -291,7 +292,8 @@ func (m *Manager) ConfigFingerprint() string {
}

// RecordSessionSQLMetrics record session SQL metrics, like response time, error
func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, namespace string, sql string, startTime time.Time, err error) {
func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, se *SessionExecutor, sql string, startTime time.Time, err error) {
namespace := se.namespace
ns := m.GetNamespace(namespace)
if ns == nil {
log.Warn("record session SQL metrics error, namespace: %s, sql: %s, err: %s", namespace, sql, "namespace not found")
Expand Down Expand Up @@ -327,6 +329,12 @@ func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, namespace
ns.SetErrorSQLFingerprint(md5, fingerprint)
m.statistics.recordSessionErrorSQLFingerprint(namespace, operation, md5)
}

if OpenProcessGeneralQueryLog() && ns.openGeneralLog {
m.statistics.generalLogger.Notice("client: %s, namespace: %s, db: %s, user: %s, cmd: %s, sql: %s, cost: %d ms, succ: %t",
se.clientAddr, namespace, se.db, se.user, operation,
strings.ReplaceAll(sql, "\n", " "), duration, err == nil)
}
}

// RecordBackendSQLMetrics record backend SQL metrics, like response time, error
Expand Down Expand Up @@ -615,8 +623,9 @@ type StatisticManager struct {
manager *Manager
clusterName string

statsType string // 监控后端类型
handlers map[string]http.Handler
statsType string // 监控后端类型
handlers map[string]http.Handler
generalLogger log.Logger

sqlTimings *stats.MultiTimings // SQL耗时统计
sqlFingerprintSlowCounts *stats.CountersWithMultiLabels // 慢SQL指纹数量统计
Expand Down Expand Up @@ -648,10 +657,14 @@ func CreateStatisticManager(cfg *models.Proxy, manager *Manager) (*StatisticMana
mgr := NewStatisticManager()
mgr.manager = manager
mgr.clusterName = cfg.Cluster
if err := mgr.Init(cfg); err != nil {

var err error
if err = mgr.Init(cfg); err != nil {
return nil, err
}
if mgr.generalLogger, err = initGeneralLogger(cfg); err != nil {
return nil, err
}

return mgr, nil
}

Expand All @@ -660,6 +673,16 @@ type proxyStatsConfig struct {
StatsEnabled bool
}

func initGeneralLogger(cfg *models.Proxy) (log.Logger, error) {
c := make(map[string]string, 5)
c["path"] = cfg.LogPath
c["filename"] = cfg.LogFileName + "_sql"
c["level"] = cfg.LogLevel
c["service"] = cfg.Service
c["runtime"] = "false"
return xlog.CreateLogManager(cfg.LogOutput, c)
}

func parseProxyStatsConfig(cfg *models.Proxy) (*proxyStatsConfig, error) {
enabled, err := strconv.ParseBool(cfg.StatsEnabled)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions proxy/server/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Namespace struct {
userProperties map[string]*UserProperty // key: user name ,value: user's properties
defaultCharset string
defaultCollationID mysql.CollationID
openGeneralLog bool

slowSQLCache *cache.LRUCache
errorSQLCache *cache.LRUCache
Expand All @@ -85,6 +86,7 @@ func NewNamespace(namespaceConfig *models.Namespace) (*Namespace, error) {
name: namespaceConfig.Name,
sqls: make(map[string]string, 16),
userProperties: make(map[string]*UserProperty, 2),
openGeneralLog: namespaceConfig.OpenGeneralLog,
slowSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity),
errorSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity),
backendSlowSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity),
Expand Down
3 changes: 1 addition & 2 deletions proxy/server/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func newSession(s *Server, co net.Conn) *Session {
cc.c.SetConnectionID(atomic.AddUint32(&baseConnID, 1))

cc.executor = newSessionExecutor(s.manager)

cc.executor.clientAddr = co.RemoteAddr().String()
cc.closed.Store(false)

return cc
}

Expand Down
12 changes: 12 additions & 0 deletions proxy/server/var.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package server

import "sync/atomic"

// Process global variables.
var (
ProcessGeneralLog uint32
)

func OpenProcessGeneralQueryLog() bool {
return atomic.LoadUint32(&ProcessGeneralLog) == 1
}

0 comments on commit 6d41aa0

Please sign in to comment.