Skip to content

Commit

Permalink
Merge branch 'master' into add_big_txn_integration_test
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Jun 21, 2022
2 parents 32b11c1 + 86b704b commit e5d3846
Show file tree
Hide file tree
Showing 115 changed files with 1,973 additions and 723 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# TiFlow

[![LICENSE](https://img.shields.io/github/license/pingcap/tiflow.svg)](https://github.com/pingcap/tiflow/blob/master/LICENSE)
![GitHub release (latest SemVer)](https://img.shields.io/github/v/release/pingcap/tiflow)
![GitHub Release Date](https://img.shields.io/github/release-date/pingcap/tiflow)
![GitHub go.mod Go version](https://img.shields.io/github/go-mod/go-version/pingcap/tiflow)
[![Build Status](https://github.com/pingcap/tiflow/actions/workflows/check_and_build.yaml/badge.svg?branch=master)](https://github.com/pingcap/tiflow/actions/workflows/check_and_build.yaml?query=event%3Apush+branch%3Amaster)
[![codecov](https://codecov.io/gh/pingcap/tiflow/branch/master/graph/badge.svg)](https://codecov.io/gh/pingcap/tiflow)
[![Coverage Status](https://coveralls.io/repos/github/pingcap/tiflow/badge.svg)](https://coveralls.io/github/pingcap/tiflow)
[![LICENSE](https://img.shields.io/github/license/pingcap/tiflow.svg)](https://github.com/pingcap/tiflow/blob/master/LICENSE)
[![Go Report Card](https://goreportcard.com/badge/github.com/pingcap/tiflow)](https://goreportcard.com/report/github.com/pingcap/tiflow)

## Introduction
Expand Down
27 changes: 27 additions & 0 deletions cdc/api/interfaces.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package api

import (
"context"

"github.com/pingcap/tiflow/cdc/model"
)

// CaptureInfoProvider provides capture and its onwnership information
type CaptureInfoProvider interface {
Info() (model.CaptureInfo, error)
IsOwner() bool
GetOwnerCaptureInfo(ctx context.Context) (*model.CaptureInfo, error)
}
17 changes: 17 additions & 0 deletions cdc/api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,20 @@ func ErrorHandleMiddleware() gin.HandlerFunc {
}
}
}

// ForwardToOwnerMiddleware forward an request to owner if current server
// is not owner, or handle it locally.
func ForwardToOwnerMiddleware(p api.CaptureInfoProvider) gin.HandlerFunc {
return func(ctx *gin.Context) {
if !p.IsOwner() {
api.ForwardToOwner(ctx, p)

// Without calling Abort(), Gin will continued to process the next handler,
// execute code which should only be run by the owner, and cause a panic.
// See https://github.com/pingcap/tiflow/issues/5888
ctx.Abort()
return
}
ctx.Next()
}
}
90 changes: 90 additions & 0 deletions cdc/api/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package api

import (
"bufio"
"context"
"encoding/json"
"net/http"
"strings"

"github.com/gin-gonic/gin"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"go.uber.org/zap"
)

Expand All @@ -35,6 +39,11 @@ var httpBadRequestError = []*errors.Error{
cerror.ErrMySQLInvalidConfig, cerror.ErrCaptureNotExist,
}

const (
// forwardFromCapture is a header to be set when forwarding requests to owner
forwardFromCapture = "TiCDC-ForwardFromCapture"
)

// IsHTTPBadRequestError check if a error is a http bad request error
func IsHTTPBadRequestError(err error) bool {
if err == nil {
Expand Down Expand Up @@ -139,3 +148,84 @@ func HandleOwnerScheduleTable(
return errors.Trace(err)
}
}

// ForwardToOwner forwards an request to the owner
func ForwardToOwner(c *gin.Context, p CaptureInfoProvider) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forwardFromCapture)) != 0 {
_ = c.Error(cerror.ErrRequestForwardErr.FastGenByArgs())
return
}

info, err := p.Info()
if err != nil {
_ = c.Error(err)
return
}

c.Header(forwardFromCapture, info.ID)

var owner *model.CaptureInfo
// get owner
owner, err = p.GetOwnerCaptureInfo(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}

security := config.GetGlobalServerConfig().Security

// init a request
req, err := http.NewRequestWithContext(
ctx, c.Request.Method, c.Request.RequestURI, c.Request.Body)
if err != nil {
_ = c.Error(err)
return
}

req.URL.Host = owner.AdvertiseAddr
// we should check tls config instead of security here because
// security will never be nil
if tls, _ := security.ToTLSConfigWithVerify(); tls != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
for k, v := range c.Request.Header {
for _, vv := range v {
req.Header.Add(k, vv)
}
}

// forward to owner
cli, err := httputil.NewClient(security)
if err != nil {
_ = c.Error(err)
return
}
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
return
}

// write header
for k, values := range resp.Header {
for _, v := range values {
c.Header(k, v)
}
}

// write status code
c.Status(resp.StatusCode)

// write response body
defer resp.Body.Close()
_, err = bufio.NewReader(resp.Body).WriteTo(c.Writer)
if err != nil {
_ = c.Error(err)
return
}
}
104 changes: 4 additions & 100 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package v1

import (
"bufio"
"fmt"
"net/http"
"os"
Expand All @@ -27,9 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/httputil"
"github.com/pingcap/tiflow/pkg/logutil"
"github.com/pingcap/tiflow/pkg/version"
"github.com/tikv/client-go/v2/oracle"
Expand All @@ -43,8 +40,6 @@ const (
apiOpVarChangefeedID = "changefeed_id"
// apiOpVarCaptureID is the key of capture ID in HTTP API
apiOpVarCaptureID = "capture_id"
// forWardFromCapture is a header to be set when a request is forwarded from another capture
forWardFromCapture = "TiCDC-ForwardFromCapture"
)

// OpenAPI provides capture APIs.
Expand Down Expand Up @@ -85,7 +80,7 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {

// changefeed API
changefeedGroup := v1.Group("/changefeeds")
changefeedGroup.Use(api.forwardToOwnerMiddleware)
changefeedGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
changefeedGroup.GET("", api.ListChangefeed)
changefeedGroup.GET("/:changefeed_id", api.GetChangefeed)
changefeedGroup.POST("", api.CreateChangefeed)
Expand All @@ -98,18 +93,18 @@ func RegisterOpenAPIRoutes(router *gin.Engine, api OpenAPI) {

// owner API
ownerGroup := v1.Group("/owner")
ownerGroup.Use(api.forwardToOwnerMiddleware)
ownerGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
ownerGroup.POST("/resign", api.ResignOwner)

// processor API
processorGroup := v1.Group("/processors")
processorGroup.Use(api.forwardToOwnerMiddleware)
processorGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
processorGroup.GET("", api.ListProcessor)
processorGroup.GET("/:changefeed_id/:capture_id", api.GetProcessor)

// capture API
captureGroup := v1.Group("/captures")
captureGroup.Use(api.forwardToOwnerMiddleware)
captureGroup.Use(middleware.ForwardToOwnerMiddleware(api.capture))
captureGroup.GET("", api.ListCapture)
}

Expand Down Expand Up @@ -773,94 +768,3 @@ func SetLogLevel(c *gin.Context) {
log.Warn("log level changed", zap.String("level", data.Level))
c.Status(http.StatusOK)
}

// forwardToOwnerMiddleware forward an request to owner if current server
// is not owner, or handle it locally.
func (h *OpenAPI) forwardToOwnerMiddleware(c *gin.Context) {
if !h.capture.IsOwner() {
h.forwardToOwner(c)
return
}
c.Next()
}

// forwardToOwner forward an request to owner
func (h *OpenAPI) forwardToOwner(c *gin.Context) {
ctx := c.Request.Context()
// every request can only forward to owner one time
if len(c.GetHeader(forWardFromCapture)) != 0 {
_ = c.Error(cerror.ErrRequestForwardErr.FastGenByArgs())
return
}

info, err := h.capture.Info()
if err != nil {
_ = c.Error(err)
return
}

c.Header(forWardFromCapture, info.ID)

var owner *model.CaptureInfo
// get owner
owner, err = h.capture.GetOwnerCaptureInfo(ctx)
if err != nil {
log.Info("get owner failed", zap.Error(err))
_ = c.Error(err)
return
}

security := config.GetGlobalServerConfig().Security

// init a request
req, err := http.NewRequestWithContext(
ctx, c.Request.Method, c.Request.RequestURI, c.Request.Body)
if err != nil {
_ = c.Error(err)
return
}

req.URL.Host = owner.AdvertiseAddr
// we should check tls config instead of security here because
// security will never be nil
if tls, _ := security.ToTLSConfigWithVerify(); tls != nil {
req.URL.Scheme = "https"
} else {
req.URL.Scheme = "http"
}
for k, v := range c.Request.Header {
for _, vv := range v {
req.Header.Add(k, vv)
}
}

// forward to owner
cli, err := httputil.NewClient(security)
if err != nil {
_ = c.Error(err)
return
}
resp, err := cli.Do(req)
if err != nil {
_ = c.Error(err)
return
}

// write header
for k, values := range resp.Header {
for _, v := range values {
c.Header(k, v)
}
}

// write status code
c.Status(resp.StatusCode)

// write response body
defer resp.Body.Close()
_, err = bufio.NewReader(resp.Body).WriteTo(c.Writer)
if err != nil {
_ = c.Error(err)
return
}
}
5 changes: 2 additions & 3 deletions cdc/api/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func VerifyCreateChangefeedConfig(
// init ChangefeedInfo
info := &model.ChangeFeedInfo{
SinkURI: changefeedConfig.SinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: changefeedConfig.StartTS,
TargetTs: changefeedConfig.TargetTS,
Expand All @@ -157,7 +156,7 @@ func VerifyCreateChangefeedConfig(
return nil, cerror.ErrAPIInvalidParam.Wrap(errors.Annotatef(err, "invalid timezone:%s", changefeedConfig.TimeZone))
}
ctx = contextutil.PutTimezoneInCtx(ctx, tz)
if err := sink.Validate(ctx, info.SinkURI, info.Config, info.Opts); err != nil {
if err := sink.Validate(ctx, info.SinkURI, info.Config); err != nil {
return nil, err
}

Expand Down Expand Up @@ -205,7 +204,7 @@ func VerifyUpdateChangefeedConfig(ctx context.Context,
// verify sink_uri
if changefeedConfig.SinkURI != "" {
newInfo.SinkURI = changefeedConfig.SinkURI
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config, newInfo.Opts); err != nil {
if err := sink.Validate(ctx, changefeedConfig.SinkURI, newInfo.Config); err != nil {
return nil, cerror.ErrChangefeedUpdateRefused.GenWithStackByCause(err)
}
}
Expand Down
7 changes: 3 additions & 4 deletions cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ func (s FeedState) IsNeeded(need string) bool {

// ChangeFeedInfo describes the detail of a ChangeFeed
type ChangeFeedInfo struct {
UpstreamID uint64 `json:"upstream-id"`
SinkURI string `json:"sink-uri"`
Opts map[string]string `json:"opts"`
CreateTime time.Time `json:"create-time"`
UpstreamID uint64 `json:"upstream-id"`
SinkURI string `json:"sink-uri"`
CreateTime time.Time `json:"create-time"`
// Start sync at this commit ts if `StartTs` is specify or using the CreateTime of changefeed.
StartTs uint64 `json:"start-ts"`
// The ChangeFeed will exits until sync to timestamp TargetTs
Expand Down
Loading

0 comments on commit e5d3846

Please sign in to comment.