diff --git a/log/xlog/file.go b/log/xlog/file.go index d159314d..8afc8b3e 100644 --- a/log/xlog/file.go +++ b/log/xlog/file.go @@ -30,6 +30,7 @@ type XFileLog struct { level int skip int + runtime bool file *os.File errFile *os.File hostname string @@ -79,6 +80,14 @@ func (p *XFileLog) Init(config map[string]string) (err error) { if len(service) > 0 { p.service = service } + + runtime, ok := config["runtime"] + if !ok || runtime == "true" || runtime == "TRUE" { + p.runtime = true + } else { + p.runtime = false + } + skip, _ := config["skip"] if len(skip) > 0 { skipNum, err := strconv.Atoi(skip) @@ -264,7 +273,7 @@ func (p *XFileLog) Warnx(logID, format string, a ...interface{}) error { func (p *XFileLog) warnx(logID, format string, a ...interface{}) error { logText := formatValue(format, a...) fun, filename, lineno := getRuntimeInfo(p.skip) - logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno) + logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno) //logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText) return p.write(WarnLevel, &logText, logID) @@ -291,7 +300,7 @@ func (p *XFileLog) Fatalx(logID, format string, a ...interface{}) error { func (p *XFileLog) fatalx(logID, format string, a ...interface{}) error { logText := formatValue(format, a...) fun, filename, lineno := getRuntimeInfo(p.skip) - logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno) + logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno) //logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText) return p.write(FatalLevel, &logText, logID) @@ -316,7 +325,7 @@ func (p *XFileLog) Noticex(logID, format string, a ...interface{}) error { func (p *XFileLog) noticex(logID, format string, a ...interface{}) error { logText := formatValue(format, a...) fun, filename, lineno := getRuntimeInfo(p.skip) - logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno) + logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno) return p.write(NoticeLevel, &logText, logID) } @@ -338,7 +347,7 @@ func (p *XFileLog) tracex(logID, format string, a ...interface{}) error { logText := formatValue(format, a...) fun, filename, lineno := getRuntimeInfo(p.skip) - logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno) + logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno) //logText = fmt.Sprintf("[%s:%s:%d] %s", fun, filepath.Base(filename), lineno, logText) return p.write(TraceLevel, &logText, logID) @@ -356,7 +365,7 @@ func (p *XFileLog) debugx(logID, format string, a ...interface{}) error { logText := formatValue(format, a...) fun, filename, lineno := getRuntimeInfo(p.skip) - logText = formatLineInfo(fun, filepath.Base(filename), logText, lineno) + logText = formatLineInfo(p.runtime, fun, filepath.Base(filename), logText, lineno) return p.write(DebugLevel, &logText, logID) } diff --git a/log/xlog/util.go b/log/xlog/util.go index ace75692..9b82b972 100644 --- a/log/xlog/util.go +++ b/log/xlog/util.go @@ -78,7 +78,6 @@ func getRuntimeInfo(skip int) (function, filename string, lineno int) { if ok { function = runtime.FuncForPC(pc).Name() } - return } @@ -106,17 +105,19 @@ func formatValue(format string, a ...interface{}) (result string) { return } -func formatLineInfo(functionName, filename, logText string, lineno int) string { +func formatLineInfo(runtime bool, functionName, filename, logText string, lineno int) string { var buffer bytes.Buffer - buffer.WriteString("[") - buffer.WriteString(functionName) - buffer.WriteString(":") + if runtime { + buffer.WriteString("[") + buffer.WriteString(functionName) + buffer.WriteString(":") - buffer.WriteString(filename) - buffer.WriteString(":") + buffer.WriteString(filename) + buffer.WriteString(":") - buffer.WriteString(strconv.FormatInt(int64(lineno), 10)) - buffer.WriteString("] ") + buffer.WriteString(strconv.FormatInt(int64(lineno), 10)) + buffer.WriteString("] ") + } buffer.WriteString(logText) return buffer.String() diff --git a/models/namespace.go b/models/namespace.go index 87d4ef02..6a242c0e 100644 --- a/models/namespace.go +++ b/models/namespace.go @@ -28,6 +28,7 @@ import ( // Namespace means namespace model stored in etcd type Namespace struct { + OpenGeneralLog bool `json:"open_general_log"` IsEncrypt bool `json:"is_encrypt"` // true: 加密存储 false: 非加密存储,目前加密Slice、User中的用户名、密码 Name string `json:"name"` Online bool `json:"online"` diff --git a/proxy/server/executor.go b/proxy/server/executor.go index db5de1dd..ff91ecde 100644 --- a/proxy/server/executor.go +++ b/proxy/server/executor.go @@ -19,6 +19,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/XiaoMi/Gaea/backend" @@ -36,15 +37,18 @@ import ( const ( // master comments masterComment = "/*master*/" + // general query log variable + gaeaGeneralLogVariable = "gaea_general_log" ) // SessionExecutor is bound to a session, so requests are serializable type SessionExecutor struct { manager *Manager - namespace string - user string - db string + namespace string + user string + db string + clientAddr string status uint16 lastInsertID uint64 @@ -192,6 +196,15 @@ func (se *SessionExecutor) setStringSessionVariable(name string, valueStr string return se.sessionVariables.Set(name, valueStr) } +func (se *SessionExecutor) setGeneralLogVariable(valueStr string) error { + v, err := strconv.Atoi(valueStr) + if err != nil { + return errors.ErrInvalidArgument + } + atomic.StoreUint32(&ProcessGeneralLog, uint32(v)) + return nil +} + // GetLastInsertID return last_inert_id func (se *SessionExecutor) GetLastInsertID() uint64 { return se.lastInsertID @@ -595,7 +608,29 @@ func createShowDatabaseResult(dbs []string) *mysql.Result { } plan.GenerateSelectResultRowData(result) + return result +} +func createShowGeneralLogResult() *mysql.Result { + r := new(mysql.Resultset) + + field := &mysql.Field{} + field.Name = hack.Slice(gaeaGeneralLogVariable) + r.Fields = append(r.Fields, field) + + var value string + if OpenProcessGeneralQueryLog() { + value = "ON" + } else { + value = "OFF" + } + r.Values = append(r.Values, []interface{}{value}) + result := &mysql.Result{ + AffectedRows: 1, + Resultset: r, + } + + plan.GenerateSelectResultRowData(result) return result } diff --git a/proxy/server/executor_handle.go b/proxy/server/executor_handle.go index 11818a0f..9f0480c0 100644 --- a/proxy/server/executor_handle.go +++ b/proxy/server/executor_handle.go @@ -71,12 +71,11 @@ func (se *SessionExecutor) handleQuery(sql string) (r *mysql.Result, err error) } startTime := time.Now() - stmtType := parser.Preview(sql) reqCtx.Set(util.StmtType, stmtType) r, err = se.doQuery(reqCtx, sql) - se.manager.RecordSessionSQLMetrics(reqCtx, se.namespace, sql, startTime, err) + se.manager.RecordSessionSQLMetrics(reqCtx, se, sql, startTime, err) return r, err } @@ -172,8 +171,12 @@ func (se *SessionExecutor) handleShow(reqCtx *util.RequestContext, sql string, s switch stmt.Tp { case ast.ShowDatabases: dbs := se.GetNamespace().GetAllowedDBs() - r := createShowDatabaseResult(dbs) - return r, nil + return createShowDatabaseResult(dbs), nil + case ast.ShowVariables: + if strings.Contains(sql, gaeaGeneralLogVariable) { + return createShowGeneralLogResult(), nil + } + fallthrough default: r, err := se.ExecuteSQL(reqCtx, backend.DefaultSlice, se.db, sql) if err != nil { @@ -281,10 +284,16 @@ func (se *SessionExecutor) handleSetVariable(v *ast.VariableAssignment) error { return nil case "sql_select_limit": return nil - // unsupported case "transaction": return fmt.Errorf("does not support set transaction in gaea") + case gaeaGeneralLogVariable: + value := getVariableExprResult(v.Value) + onOffValue, err := getOnOffVariable(value) + if err != nil { + return mysql.NewDefaultError(mysql.ErrWrongValueForVar, name, value) + } + return se.setGeneralLogVariable(onOffValue) default: return nil } diff --git a/proxy/server/manager.go b/proxy/server/manager.go index 96e38f87..5956c2f5 100644 --- a/proxy/server/manager.go +++ b/proxy/server/manager.go @@ -27,6 +27,7 @@ import ( "github.com/XiaoMi/Gaea/core/errors" "github.com/XiaoMi/Gaea/log" + "github.com/XiaoMi/Gaea/log/xlog" "github.com/XiaoMi/Gaea/models" "github.com/XiaoMi/Gaea/mysql" "github.com/XiaoMi/Gaea/parser" @@ -291,7 +292,8 @@ func (m *Manager) ConfigFingerprint() string { } // RecordSessionSQLMetrics record session SQL metrics, like response time, error -func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, namespace string, sql string, startTime time.Time, err error) { +func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, se *SessionExecutor, sql string, startTime time.Time, err error) { + namespace := se.namespace ns := m.GetNamespace(namespace) if ns == nil { log.Warn("record session SQL metrics error, namespace: %s, sql: %s, err: %s", namespace, sql, "namespace not found") @@ -327,6 +329,12 @@ func (m *Manager) RecordSessionSQLMetrics(reqCtx *util.RequestContext, namespace ns.SetErrorSQLFingerprint(md5, fingerprint) m.statistics.recordSessionErrorSQLFingerprint(namespace, operation, md5) } + + if OpenProcessGeneralQueryLog() && ns.openGeneralLog { + m.statistics.generalLogger.Notice("client: %s, namespace: %s, db: %s, user: %s, cmd: %s, sql: %s, cost: %d ms, succ: %t", + se.clientAddr, namespace, se.db, se.user, operation, + strings.ReplaceAll(sql, "\n", " "), duration, err == nil) + } } // RecordBackendSQLMetrics record backend SQL metrics, like response time, error @@ -615,8 +623,9 @@ type StatisticManager struct { manager *Manager clusterName string - statsType string // 监控后端类型 - handlers map[string]http.Handler + statsType string // 监控后端类型 + handlers map[string]http.Handler + generalLogger log.Logger sqlTimings *stats.MultiTimings // SQL耗时统计 sqlFingerprintSlowCounts *stats.CountersWithMultiLabels // 慢SQL指纹数量统计 @@ -648,10 +657,14 @@ func CreateStatisticManager(cfg *models.Proxy, manager *Manager) (*StatisticMana mgr := NewStatisticManager() mgr.manager = manager mgr.clusterName = cfg.Cluster - if err := mgr.Init(cfg); err != nil { + + var err error + if err = mgr.Init(cfg); err != nil { + return nil, err + } + if mgr.generalLogger, err = initGeneralLogger(cfg); err != nil { return nil, err } - return mgr, nil } @@ -660,6 +673,16 @@ type proxyStatsConfig struct { StatsEnabled bool } +func initGeneralLogger(cfg *models.Proxy) (log.Logger, error) { + c := make(map[string]string, 5) + c["path"] = cfg.LogPath + c["filename"] = cfg.LogFileName + "_sql" + c["level"] = cfg.LogLevel + c["service"] = cfg.Service + c["runtime"] = "false" + return xlog.CreateLogManager(cfg.LogOutput, c) +} + func parseProxyStatsConfig(cfg *models.Proxy) (*proxyStatsConfig, error) { enabled, err := strconv.ParseBool(cfg.StatsEnabled) if err != nil { diff --git a/proxy/server/namespace.go b/proxy/server/namespace.go index e536101f..7c7749a1 100644 --- a/proxy/server/namespace.go +++ b/proxy/server/namespace.go @@ -65,6 +65,7 @@ type Namespace struct { userProperties map[string]*UserProperty // key: user name ,value: user's properties defaultCharset string defaultCollationID mysql.CollationID + openGeneralLog bool slowSQLCache *cache.LRUCache errorSQLCache *cache.LRUCache @@ -85,6 +86,7 @@ func NewNamespace(namespaceConfig *models.Namespace) (*Namespace, error) { name: namespaceConfig.Name, sqls: make(map[string]string, 16), userProperties: make(map[string]*UserProperty, 2), + openGeneralLog: namespaceConfig.OpenGeneralLog, slowSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity), errorSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity), backendSlowSQLCache: cache.NewLRUCache(defaultSQLCacheCapacity), diff --git a/proxy/server/session.go b/proxy/server/session.go index 6f810d23..adf17bb1 100644 --- a/proxy/server/session.go +++ b/proxy/server/session.go @@ -70,9 +70,8 @@ func newSession(s *Server, co net.Conn) *Session { cc.c.SetConnectionID(atomic.AddUint32(&baseConnID, 1)) cc.executor = newSessionExecutor(s.manager) - + cc.executor.clientAddr = co.RemoteAddr().String() cc.closed.Store(false) - return cc } diff --git a/proxy/server/var.go b/proxy/server/var.go new file mode 100644 index 00000000..2f6a09db --- /dev/null +++ b/proxy/server/var.go @@ -0,0 +1,12 @@ +package server + +import "sync/atomic" + +// Process global variables. +var ( + ProcessGeneralLog uint32 +) + +func OpenProcessGeneralQueryLog() bool { + return atomic.LoadUint32(&ProcessGeneralLog) == 1 +}