diff --git a/cdc/capture/http_errors.go b/cdc/capture/http_errors.go new file mode 100644 index 00000000000..9fbe07fb4ff --- /dev/null +++ b/cdc/capture/http_errors.go @@ -0,0 +1,51 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "strings" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" +) + +// httpBadRequestError is some errors that will cause a BadRequestError in http handler +var httpBadRequestError = []*errors.Error{ + cerror.ErrAPIInvalidParam, cerror.ErrSinkURIInvalid, cerror.ErrStartTsBeforeGC, + cerror.ErrChangeFeedNotExists, cerror.ErrTargetTsBeforeStartTs, cerror.ErrTableIneligible, + cerror.ErrFilterRuleInvalid, cerror.ErrChangefeedUpdateRefused, cerror.ErrMySQLConnectionError, + cerror.ErrMySQLInvalidConfig, +} + +// IsHTTPBadRequestError check if a error is a http bad request error +func IsHTTPBadRequestError(err error) bool { + if err == nil { + return false + } + for _, e := range httpBadRequestError { + if e.Equal(err) { + return true + } + + rfcCode, ok := cerror.RFCCode(err) + if ok && e.RFCCode() == rfcCode { + return true + } + + if strings.Contains(err.Error(), string(e.RFCCode())) { + return true + } + } + return false +} diff --git a/cdc/capture/http_errors_test.go b/cdc/capture/http_errors_test.go new file mode 100644 index 00000000000..cd9592e7da8 --- /dev/null +++ b/cdc/capture/http_errors_test.go @@ -0,0 +1,33 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package capture + +import ( + "testing" + + "github.com/pingcap/errors" + cerror "github.com/pingcap/ticdc/pkg/errors" + "github.com/stretchr/testify/require" +) + +func TestIsHTTPBadRequestError(t *testing.T) { + err := cerror.ErrAPIInvalidParam.GenWithStack("aa") + require.Equal(t, true, IsHTTPBadRequestError(err)) + err = cerror.ErrAPIInvalidParam.Wrap(errors.New("aa")) + require.Equal(t, true, IsHTTPBadRequestError(err)) + err = cerror.ErrPDEtcdAPIError.GenWithStack("aa") + require.Equal(t, false, IsHTTPBadRequestError(err)) + err = nil + require.Equal(t, false, IsHTTPBadRequestError(err)) +} diff --git a/cdc/capture/http_handler.go b/cdc/capture/http_handler.go index dab346af776..76384d8956a 100644 --- a/cdc/capture/http_handler.go +++ b/cdc/capture/http_handler.go @@ -72,9 +72,21 @@ func (h *HTTPHandler) ListChangefeed(c *gin.Context) { ctx := c.Request.Context() state := c.Query(apiOpVarChangefeedState) // get all changefeed status +<<<<<<< HEAD statuses, err := h.capture.etcdClient.GetAllChangeFeedStatus(ctx) +======= + statuses, err := statusProvider.GetAllChangeFeedStatuses(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) + return + } + // get all changefeed infos + infos, err := statusProvider.GetAllChangeFeedInfo(ctx) +>>>>>>> 28424edb9 (http_*: add log for http api and refine the err handle logic (#2997)) + if err != nil { + // this call will return a parsedError generated by the error we passed in + // so it is no need to check the parsedError + _ = c.Error(err) return } @@ -128,38 +140,25 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } info, err := h.capture.etcdClient.GetChangeFeedInfo(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } status, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } processorInfos, err := h.capture.etcdClient.GetAllTaskStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -185,7 +184,7 @@ func (h *HTTPHandler) GetChangefeed(c *gin.Context) { TaskStatus: taskStatus, } - c.JSON(http.StatusOK, changefeedDetail) + c.IndentedJSON(http.StatusOK, changefeedDetail) } // CreateChangefeed creates a changefeed @@ -206,29 +205,25 @@ func (h *HTTPHandler) CreateChangefeed(c *gin.Context) { ctx := c.Request.Context() var changefeedConfig model.ChangefeedConfig if err := c.BindJSON(&changefeedConfig); err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err)) return } info, err := verifyCreateChangefeedConfig(c, changefeedConfig, h.capture) if err != nil { - if cerror.ErrPDEtcdAPIError.Equal(err) { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(err) return } infoStr, err := info.Marshal() if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } err = h.capture.etcdClient.CreateChangefeedInfo(ctx, info, changefeedConfig.ID) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -255,18 +250,13 @@ func (h *HTTPHandler) PauseChangefeed(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists && check if the etcdClient work well _, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -301,18 +291,13 @@ func (h *HTTPHandler) ResumeChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists && check if the etcdClient work well _, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -354,21 +339,16 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } info, err := h.capture.etcdClient.GetChangeFeedInfo(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } if info.State != model.StateStopped { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped"))) + _ = c.Error(cerror.ErrChangefeedUpdateRefused.GenWithStackByArgs("can only update changefeed config when it is stopped")) return } @@ -376,19 +356,19 @@ func (h *HTTPHandler) UpdateChangefeed(c *gin.Context) { // filter_rules, ignore_txn_start_ts, mounter_worker_num, sink_config var changefeedConfig model.ChangefeedConfig if err = c.BindJSON(&changefeedConfig); err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } newInfo, err := verifyUpdateChangefeedConfig(ctx, changefeedConfig, info) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(err) return } err = h.capture.etcdClient.SaveChangeFeedInfo(ctx, newInfo, changefeedID) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -413,18 +393,13 @@ func (h *HTTPHandler) RemoveChangefeed(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists && check if the etcdClient work well _, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -460,18 +435,13 @@ func (h *HTTPHandler) RebalanceTable(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -502,18 +472,13 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) { ctx := c.Request.Context() changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } // check if the changefeed exists _, _, err := h.capture.etcdClient.GetChangeFeedStatus(ctx, changefeedID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - return - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -523,12 +488,12 @@ func (h *HTTPHandler) MoveTable(c *gin.Context) { }{} err = c.BindJSON(&data) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.Wrap(err)) + return } if err := model.ValidateChangefeedID(data.CaptureID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", data.CaptureID)) return } @@ -576,35 +541,41 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) { changefeedID := c.Param(apiOpVarChangefeedID) if err := model.ValidateChangefeedID(changefeedID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid changefeed_id: %s", changefeedID)) return } captureID := c.Param(apiOpVarCaptureID) if err := model.ValidateChangefeedID(captureID); err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", changefeedID))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid capture_id: %s", captureID)) return } _, status, err := h.capture.etcdClient.GetTaskStatus(ctx, changefeedID, captureID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } +<<<<<<< HEAD +======= + status, exist := statuses[captureID] + if !exist { + _ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)) + } +>>>>>>> 28424edb9 (http_*: add log for http api and refine the err handle logic (#2997)) _, position, err := h.capture.etcdClient.GetTaskPosition(ctx, changefeedID, captureID) if err != nil { - if cerror.ErrChangeFeedNotExists.Equal(err) { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) - } - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } +<<<<<<< HEAD +======= + position, exist := positions[captureID] + if !exist { + _ = c.Error(cerror.ErrCaptureNotExist.GenWithStackByArgs(captureID)) + } +>>>>>>> 28424edb9 (http_*: add log for http api and refine the err handle logic (#2997)) processorDetail := &model.ProcessorDetail{CheckPointTs: position.CheckPointTs, ResolvedTs: position.ResolvedTs, Error: position.Error} tables := make([]int64, 0) @@ -612,7 +583,7 @@ func (h *HTTPHandler) GetProcessor(c *gin.Context) { tables = append(tables, tableID) } processorDetail.Tables = tables - c.JSON(http.StatusOK, processorDetail) + c.IndentedJSON(http.StatusOK, processorDetail) } // ListProcessor lists all processors in the TiCDC cluster @@ -628,7 +599,7 @@ func (h *HTTPHandler) ListProcessor(c *gin.Context) { ctx := c.Request.Context() infos, err := h.capture.etcdClient.GetProcessors(ctx) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } resps := make([]*model.ProcessorCommonInfo, len(infos)) @@ -658,7 +629,7 @@ func (h *HTTPHandler) ListCapture(c *gin.Context) { ownerID, err := h.capture.etcdClient.GetOwnerID(c, kv.CaptureOwnerKey) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -728,14 +699,13 @@ func SetLogLevel(c *gin.Context) { }{} err := c.BindJSON(&data) if err != nil { - c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("invalid log level: %s", err.Error())) return } err = logutil.SetLogLevel(data.Level) if err != nil { - c.IndentedJSON(http.StatusBadRequest, - model.NewHTTPError(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", err))) + _ = c.Error(cerror.ErrAPIInvalidParam.GenWithStack("fail to change log level: %s", data.Level)) return } log.Warn("log level changed", zap.String("level", data.Level)) @@ -747,7 +717,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { ctx := c.Request.Context() // every request can only forward to owner one time if len(c.GetHeader(forWardFromCapture)) != 0 { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(cerror.ErrRequestForwardErr.FastGenByArgs())) + _ = c.Error(cerror.ErrRequestForwardErr.FastGenByArgs()) return } c.Header(forWardFromCapture, h.capture.Info().ID) @@ -764,13 +734,13 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { return nil }, retry.WithBackoffBaseDelay(300), retry.WithMaxTries(getOwnerRetryMaxTime)) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } tslConfig, err := config.GetGlobalServerConfig().Security.ToTLSConfigWithVerify() if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -792,7 +762,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { cli := httputil.NewClient(tslConfig) resp, err := cli.Do(req) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } @@ -810,7 +780,7 @@ func (h *HTTPHandler) forwardToOwner(c *gin.Context) { defer resp.Body.Close() _, err = bufio.NewReader(resp.Body).WriteTo(c.Writer) if err != nil { - c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + _ = c.Error(err) return } } diff --git a/cdc/capture/http_validator.go b/cdc/capture/http_validator.go index 8fe917f9e04..2009bbdde5c 100644 --- a/cdc/capture/http_validator.go +++ b/cdc/capture/http_validator.go @@ -139,7 +139,7 @@ func verifyCreateChangefeedConfig(ctx context.Context, changefeedConfig model.Ch tz, err := util.GetTimezone(changefeedConfig.TimeZone) if err != nil { - return nil, errors.Annotate(err, "invalid timezone:"+changefeedConfig.TimeZone) + return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone)) } ctx = util.PutTimezoneInCtx(ctx, tz) err = sink.Validate(ctx, info.SinkURI, info.Config, info.Opts) diff --git a/cdc/http_router.go b/cdc/http_router.go index a27bb14fba2..c4202882b52 100644 --- a/cdc/http_router.go +++ b/cdc/http_router.go @@ -21,22 +21,32 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/capture" + "github.com/pingcap/ticdc/cdc/model" swaggerFiles "github.com/swaggo/files" ginSwagger "github.com/swaggo/gin-swagger" +<<<<<<< HEAD +======= + "go.uber.org/zap" + +>>>>>>> 28424edb9 (http_*: add log for http api and refine the err handle logic (#2997)) // use for OpenAPI online docs _ "github.com/pingcap/ticdc/docs/api" ) // newRouter create a router for OpenAPI + func newRouter(captureHandler capture.HTTPHandler) *gin.Engine { // discard gin log output gin.DefaultWriter = ioutil.Discard router := gin.New() + router.Use(logMiddleware()) // request will timeout after 10 second router.Use(timeoutMiddleware(time.Second * 10)) + router.Use(errorHandleMiddleware()) // OpenAPI online docs router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) @@ -95,7 +105,7 @@ func newRouter(captureHandler capture.HTTPHandler) *gin.Engine { } // timeoutMiddleware wraps the request context with a timeout -func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) { +func timeoutMiddleware(timeout time.Duration) gin.HandlerFunc { return func(c *gin.Context) { // wrap the request context with a timeout ctx, cancel := context.WithTimeout(c.Request.Context(), timeout) @@ -118,3 +128,51 @@ func timeoutMiddleware(timeout time.Duration) func(c *gin.Context) { c.Next() } } + +func logMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + start := time.Now() + path := c.Request.URL.Path + query := c.Request.URL.RawQuery + c.Next() + + cost := time.Since(start) + + err := c.Errors.Last() + var stdErr error + if err != nil { + stdErr = err.Err + } + + log.Info(path, + zap.Int("status", c.Writer.Status()), + zap.String("method", c.Request.Method), + zap.String("path", path), + zap.String("query", query), + zap.String("ip", c.ClientIP()), + zap.String("user-agent", c.Request.UserAgent()), + zap.Error(stdErr), + zap.Duration("cost", cost), + ) + } +} + +func errorHandleMiddleware() gin.HandlerFunc { + return func(c *gin.Context) { + c.Next() + // because we will return immediately after an error occurs in http_handler + // there wil be only one error in c.Errors + lastError := c.Errors.Last() + if lastError != nil { + err := lastError.Err + // put the error into response + if capture.IsHTTPBadRequestError(err) { + c.IndentedJSON(http.StatusBadRequest, model.NewHTTPError(err)) + } else { + c.IndentedJSON(http.StatusInternalServerError, model.NewHTTPError(err)) + } + c.Abort() + return + } + } +} diff --git a/cdc/http_router_test.go b/cdc/http_router_test.go index 4821a72b6d2..77e9e184c17 100644 --- a/cdc/http_router_test.go +++ b/cdc/http_router_test.go @@ -23,9 +23,8 @@ import ( "github.com/stretchr/testify/require" ) -func TestPProfRouter(t *testing.T) { +func TestPProfPath(t *testing.T) { t.Parallel() - router := newRouter(capture.NewHTTPHandler(nil)) apis := []*openAPI{ diff --git a/cdc/http_status.go b/cdc/http_status.go index 4effd1b59f7..ce7b4da9231 100644 --- a/cdc/http_status.go +++ b/cdc/http_status.go @@ -39,6 +39,7 @@ import ( ) func (s *Server) startStatusHTTP() error { + conf := config.GetGlobalServerConfig() router := newRouter(capture.NewHTTPHandler(s.capture)) router.GET("/status", gin.WrapF(s.handleStatus)) @@ -58,7 +59,6 @@ func (s *Server) startStatusHTTP() error { prometheus.DefaultGatherer = registry router.Any("/metrics", gin.WrapH(promhttp.Handler())) - conf := config.GetGlobalServerConfig() err := conf.Security.AddSelfCommonName() if err != nil { log.Error("status server set tls config failed", zap.Error(err)) diff --git a/cdc/sink/mysql.go b/cdc/sink/mysql.go index 7a1946f3824..250a9a16a7a 100644 --- a/cdc/sink/mysql.go +++ b/cdc/sink/mysql.go @@ -331,7 +331,7 @@ func checkTiDBVariable(ctx context.Context, db *sql.DB, variableName, defaultVal err := db.QueryRowContext(ctx, querySQL).Scan(&name, &value) if err != nil && err != sql.ErrNoRows { errMsg := "fail to query session variable " + variableName - return "", errors.Annotate(cerror.WrapError(cerror.ErrMySQLQueryError, err), errMsg) + return "", cerror.ErrMySQLQueryError.Wrap(err).GenWithStack(errMsg) } // session variable works, use given default value if err == nil { @@ -435,13 +435,12 @@ func parseSinkURI(ctx context.Context, sinkURI *url.URL, opts map[string]string) } tlsCfg, err := credential.ToTLSConfig() if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, errors.Trace(err) } name := "cdc_mysql_tls" + params.changefeedID err = dmysql.RegisterTLSConfig(name, tlsCfg) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } params.tls = "?tls=" + name } @@ -509,8 +508,7 @@ var getDBConnImpl = getDBConn func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { db, err := sql.Open("mysql", dsnStr) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } err = db.PingContext(ctx) if err != nil { @@ -518,8 +516,7 @@ func getDBConn(ctx context.Context, dsnStr string) (*sql.DB, error) { if closeErr := db.Close(); closeErr != nil { log.Warn("close db failed", zap.Error(err)) } - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } return db, nil } @@ -1357,12 +1354,12 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } tlsCfg, err := credential.ToTLSConfig() if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } name := "cdc_mysql_tls" + "syncpoint" + id err = dmysql.RegisterTLSConfig(name, tlsCfg) if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } tlsParam = "?tls=" + name } @@ -1402,8 +1399,7 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } testDB, err := sql.Open("mysql", dsn.FormatDSN()) if err != nil { - return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection when configuring sink") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection when configuring sink") } defer testDB.Close() dsnStr, err = configureSinkURI(ctx, dsn, params, testDB) @@ -1412,11 +1408,11 @@ func newMySQLSyncpointStore(ctx context.Context, id string, sinkURI *url.URL) (S } syncDB, err = sql.Open("mysql", dsnStr) if err != nil { - return nil, errors.Annotate(err, "Open database connection failed") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } err = syncDB.PingContext(ctx) if err != nil { - return nil, errors.Annotate(err, "fail to open MySQL connection") + return nil, cerror.ErrMySQLConnectionError.Wrap(err).GenWithStack("fail to open MySQL connection") } log.Info("Start mysql syncpoint sink") diff --git a/cdc/sink/simple_mysql_tester.go b/cdc/sink/simple_mysql_tester.go index 3b3d7edc885..d68f538802c 100644 --- a/cdc/sink/simple_mysql_tester.go +++ b/cdc/sink/simple_mysql_tester.go @@ -85,7 +85,7 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re db, err = sql.Open("mysql", dsnStr) if err != nil { return nil, errors.Annotate( - cerror.WrapError(cerror.ErrMySQLConnectionError, err), "Open database connection failed") + cerror.WrapError(cerror.ErrMySQLConnectionError, err), "fail to open MySQL connection") } err = db.PingContext(ctx) if err != nil { diff --git a/cdc/sink/sink_test.go b/cdc/sink/sink_test.go index 557239a9861..808b7324577 100644 --- a/cdc/sink/sink_test.go +++ b/cdc/sink/sink_test.go @@ -35,7 +35,7 @@ func TestValidateSink(t *testing.T) { sinkURI := "mysql://root:111@127.0.0.1:3306/" err := Validate(ctx, sinkURI, replicateConfig, opts) require.NotNil(t, err) - require.Regexp(t, "fail to open MySQL connection.*ErrMySQLConnectionError.*", err) + require.Contains(t, err.Error(), "fail to open MySQL connection") // test sink uri right sinkURI = "blackhole://" diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index abb4286e980..1779d9a624e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -74,12 +74,18 @@ func (s *replicaConfigSuite) TestOutDated(c *check.C) { c.Assert(conf2, check.DeepEquals, conf) } +<<<<<<< HEAD type serverConfigSuite struct{} var _ = check.Suite(&serverConfigSuite{}) func (s *serverConfigSuite) TestMarshal(c *check.C) { defer testleak.AfterTest(c)() +======= +func TestServerConfigMarshal(t *testing.T) { + t.Parallel() + +>>>>>>> 28424edb9 (http_*: add log for http api and refine the err handle logic (#2997)) rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","log":{"file":{"max-size":300,"max-days":0,"max-backups":0}},"data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":10485760,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` conf := GetDefaultServerConfig()