Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support TLS for components #904

Merged
merged 15 commits into from
Feb 21, 2020
Merged
30 changes: 15 additions & 15 deletions binlogctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,20 @@ const (

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet

Command string `toml:"cmd" json:"cmd"`
NodeID string `toml:"node-id" json:"node-id"`
DataDir string `toml:"data-dir" json:"data-dir"`
TimeZone string `toml:"time-zone" json:"time-zone"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
SSLCA string `toml:"ssl-ca" json:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
tls *tls.Config
*flag.FlagSet `toml:"-" json:"-"`

Command string `toml:"cmd" json:"cmd"`
NodeID string `toml:"node-id" json:"node-id"`
DataDir string `toml:"data-dir" json:"data-dir"`
TimeZone string `toml:"time-zone" json:"time-zone"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
SSLCA string `toml:"ssl-ca" json:"ssl-ca"`
SSLCert string `toml:"ssl-cert" json:"ssl-cert"`
SSLKey string `toml:"ssl-key" json:"ssl-key"`
State string `toml:"state" json:"state"`
ShowOfflineNodes bool `toml:"state" json:"show-offline-nodes"`
Text string `toml:"text" json:"text"`
TLS *tls.Config `toml:"-" json:"-"`
printVersion bool
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func (cfg *Config) Parse(args []string) error {
SSLCert: cfg.SSLCert,
SSLKey: cfg.SSLKey,
}
cfg.tls, err = sCfg.ToTLSConfig()
cfg.TLS, err = sCfg.ToTLSConfig()
if err != nil {
return errors.Errorf("tls config error %v", err)
}
Expand Down
44 changes: 44 additions & 0 deletions binlogctl/global.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogctl
suzaku marked this conversation as resolved.
Show resolved Hide resolved

import (
"net/http"

"github.com/pingcap/errors"
"go.etcd.io/etcd/pkg/transport"
)

var dialClient = &http.Client{}

// InitHTTPSClient creates https client with ca file
func InitHTTPSClient(CAPath, CertPath, KeyPath string) error {
tlsInfo := transport.TLSInfo{
CertFile: CertPath,
KeyFile: KeyPath,
TrustedCAFile: CAPath,
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return errors.WithStack(err)
}

dialClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}

return nil
}
27 changes: 16 additions & 11 deletions binlogctl/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package binlogctl

import (
"context"
"crypto/tls"
"fmt"
"net/http"
"time"
Expand All @@ -34,8 +35,8 @@ var (
)

// QueryNodesByKind returns specified nodes, like pumps/drainers
func QueryNodesByKind(urls string, kind string, showOffline bool) error {
registry, err := createRegistryFuc(urls)
func QueryNodesByKind(urls string, kind string, showOffline bool, tlsConfig *tls.Config) error {
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -56,12 +57,12 @@ func QueryNodesByKind(urls string, kind string, showOffline bool) error {
}

// UpdateNodeState update pump or drainer's state.
func UpdateNodeState(urls, kind, nodeID, state string) error {
func UpdateNodeState(urls, kind, nodeID, state string, tlsConfig *tls.Config) error {
/*
node's state can be online, pausing, paused, closing and offline.
if the state is one of them, will update the node's state saved in etcd directly.
*/
registry, err := createRegistryFuc(urls)
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -81,12 +82,12 @@ func UpdateNodeState(urls, kind, nodeID, state string) error {
}

// createRegistry returns an ectd registry
func createRegistry(urls string) (*node.EtcdRegistry, error) {
func createRegistry(urls string, tlsConfig *tls.Config) (*node.EtcdRegistry, error) {
ectdEndpoints, err := flags.ParseHostPortAddr(urls)
if err != nil {
return nil, errors.Trace(err)
}
cli, err := newEtcdClientFromCfgFunc(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, nil)
cli, err := newEtcdClientFromCfgFunc(ectdEndpoints, etcdDialTimeout, node.DefaultRootPath, tlsConfig)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -95,8 +96,8 @@ func createRegistry(urls string) (*node.EtcdRegistry, error) {
}

// ApplyAction applies action on pump or drainer
func ApplyAction(urls, kind, nodeID string, action string) error {
registry, err := createRegistryFuc(urls)
func ApplyAction(urls, kind, nodeID string, action string, tlsConfig *tls.Config) error {
registry, err := createRegistryFuc(urls, tlsConfig)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -106,14 +107,18 @@ func ApplyAction(urls, kind, nodeID string, action string) error {
return errors.Trace(err)
}

var client http.Client
url := fmt.Sprintf("http://%s/state/%s/%s", n.Addr, n.NodeID, action)
schema := "http"
if tlsConfig != nil {
schema = "https"
}

url := fmt.Sprintf("%s://%s/state/%s/%s", schema, n.Addr, n.NodeID, action)
log.Debug("send put http request", zap.String("url", url))
req, err := http.NewRequest("PUT", url, nil)
if err != nil {
return errors.Trace(err)
}
_, err = client.Do(req)
_, err = dialClient.Do(req)
if err == nil {
log.Info("Apply action on node success", zap.String("action", action), zap.String("NodeID", n.NodeID))
return nil
Expand Down
16 changes: 8 additions & 8 deletions binlogctl/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type testNodesSuite struct{}
func (s *testNodesSuite) SetUpTest(c *C) {
newEtcdClientFromCfgFunc = newFakeEtcdClientFromCfg
createRegistryFuc = createMockRegistry
_, err := createMockRegistry("127.0.0.1:2379")
_, err := createMockRegistry("127.0.0.1:2379", nil)
c.Assert(err, IsNil)
}

Expand All @@ -63,29 +63,29 @@ func (s *testNodesSuite) TestApplyAction(c *C) {

registerPumpForTest(c, "test", url)

err := ApplyAction("127.0.0.1:2379", "pumps", "test2", PausePump)
err := ApplyAction("127.0.0.1:2379", "pumps", "test2", PausePump, nil)
c.Assert(errors.IsNotFound(err), IsTrue)

// TODO: handle log information and add check
err = ApplyAction("127.0.0.1:2379", "pumps", "test", PausePump)
err = ApplyAction("127.0.0.1:2379", "pumps", "test", PausePump, nil)
c.Assert(err, IsNil)
}

func (s *testNodesSuite) TestQueryNodesByKind(c *C) {
registerPumpForTest(c, "test", "127.0.0.1:8255")

// TODO: handle log information and add check
err := QueryNodesByKind("127.0.0.1:2379", "pumps", false)
err := QueryNodesByKind("127.0.0.1:2379", "pumps", false, nil)
c.Assert(err, IsNil)
}

func (s *testNodesSuite) TestUpdateNodeState(c *C) {
registerPumpForTest(c, "test", "127.0.0.1:8255")

err := UpdateNodeState("127.0.0.1:2379", "pumps", "test2", node.Paused)
err := UpdateNodeState("127.0.0.1:2379", "pumps", "test2", node.Paused, nil)
c.Assert(err, ErrorMatches, ".*not found.*")

err = UpdateNodeState("127.0.0.1:2379", "pumps", "test", node.Paused)
err = UpdateNodeState("127.0.0.1:2379", "pumps", "test", node.Paused, nil)
c.Assert(err, IsNil)

// check node's state is changed to paused
Expand All @@ -104,7 +104,7 @@ func (s *testNodesSuite) TestUpdateNodeState(c *C) {

func (s *testNodesSuite) TestCreateRegistry(c *C) {
urls := "127.0.0.1:2379"
registry, err := createRegistry(urls)
registry, err := createRegistry(urls, nil)
c.Assert(err, IsNil)
c.Assert(registry, NotNil)

Expand All @@ -131,7 +131,7 @@ func (s *testNodesSuite) TestCreateRegistry(c *C) {

}

func createMockRegistry(urls string) (*node.EtcdRegistry, error) {
func createMockRegistry(urls string, _ *tls.Config) (*node.EtcdRegistry, error) {
if fakeRegistry != nil {
return fakeRegistry, nil
}
Expand Down
24 changes: 16 additions & 8 deletions cmd/binlogctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/binlogctl"
ctl "github.com/pingcap/tidb-binlog/binlogctl"
"github.com/pingcap/tidb-binlog/pkg/node"
"go.uber.org/zap"
Expand All @@ -41,25 +42,32 @@ func main() {
os.Exit(2)
}

if cfg.SSLCA != "" {
err = binlogctl.InitHTTPSClient(cfg.SSLCA, cfg.SSLCert, cfg.SSLKey)
if err != nil {
log.Fatal("failed to init https client", zap.Error(err))
}
}

switch cfg.Command {
case ctl.GenerateMeta:
err = ctl.GenerateMetaInfo(cfg)
case ctl.QueryPumps:
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.PumpNode, cfg.ShowOfflineNodes)
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.PumpNode, cfg.ShowOfflineNodes, cfg.TLS)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should initial all clients (like registry, dial Client) together, the reason is to ensure that TLS are processed correctly and uniformly, here implementation is a weird example:

  • we provide TLS config in interface of github.com/pingcap/tidb-binlog/binlogctl, like QueryNodesByKind
  • we do binlogctl.InitHTTPSClient separately

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ctl.QueryDrainers:
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.DrainerNode, cfg.ShowOfflineNodes)
err = ctl.QueryNodesByKind(cfg.EtcdURLs, node.DrainerNode, cfg.ShowOfflineNodes, cfg.TLS)
case ctl.UpdatePump:
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, cfg.State)
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, cfg.State, cfg.TLS)
case ctl.UpdateDrainer:
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, cfg.State)
err = ctl.UpdateNodeState(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, cfg.State, cfg.TLS)
case ctl.PausePump:
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, pause)
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, pause, cfg.TLS)
case ctl.PauseDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, pause)
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, pause, cfg.TLS)
case ctl.OfflinePump:
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close)
err = ctl.ApplyAction(cfg.EtcdURLs, node.PumpNode, cfg.NodeID, close, cfg.TLS)
case ctl.OfflineDrainer:
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close)
err = ctl.ApplyAction(cfg.EtcdURLs, node.DrainerNode, cfg.NodeID, close, cfg.TLS)
case ctl.Encrypt:
if len(cfg.Text) == 0 {
err = errors.New("need to specify the text to be encrypt")
Expand Down
1 change: 1 addition & 0 deletions cmd/pump/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func main() {
log.Fatal("Failed to initialize log", zap.Error(err))
}
version.PrintVersionInfo("Pump")
log.Info("start pump...", zap.Reflect("config", cfg))

p, err := pump.NewServer(cfg)
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion drainer/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"crypto/tls"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -49,6 +50,7 @@ type notifyResult struct {
// Collector collects binlog from all pump, and send binlog to syncer.
type Collector struct {
clusterID uint64
tls *tls.Config
interval time.Duration
reg *node.EtcdRegistry
tiStore kv.Storage
Expand Down Expand Up @@ -106,6 +108,7 @@ func NewCollector(cfg *Config, clusterID uint64, s *Syncer, cpt checkpoint.Check

c := &Collector{
clusterID: clusterID,
tls: cfg.tls,
interval: time.Duration(cfg.DetectInterval) * time.Second,
reg: node.NewEtcdRegistry(cli, cfg.EtcdTimeout),
pumps: make(map[string]*Pump),
Expand Down Expand Up @@ -308,7 +311,7 @@ func (c *Collector) handlePumpStatusUpdate(ctx context.Context, n *node.Status)
}

commitTS := c.merger.GetLatestTS()
p := NewPump(n.NodeID, n.Addr, c.clusterID, commitTS, c.errCh)
p := NewPump(n.NodeID, n.Addr, c.tls, c.clusterID, commitTS, c.errCh)
c.pumps[n.NodeID] = p
c.merger.AddSource(MergeSource{
ID: n.NodeID,
Expand Down
15 changes: 13 additions & 2 deletions drainer/pump.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package drainer

import (
"crypto/tls"
"strings"
"sync/atomic"
"time"
Expand All @@ -29,6 +30,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

Expand All @@ -40,6 +42,7 @@ const (
type Pump struct {
nodeID string
addr string
tlsConfig *tls.Config
clusterID uint64
// the latest binlog ts that pump had handled
latestTS int64
Expand All @@ -56,11 +59,12 @@ type Pump struct {
}

// NewPump returns an instance of Pump
func NewPump(nodeID, addr string, clusterID uint64, startTs int64, errCh chan error) *Pump {
func NewPump(nodeID, addr string, tlsConfig *tls.Config, clusterID uint64, startTs int64, errCh chan error) *Pump {
nodeID = pump.FormatNodeID(nodeID)
return &Pump{
nodeID: nodeID,
addr: addr,
tlsConfig: tlsConfig,
clusterID: clusterID,
latestTS: startTs,
errCh: errCh,
Expand Down Expand Up @@ -204,7 +208,14 @@ func (p *Pump) createPullBinlogsClient(ctx context.Context, last int64) error {
callOpts = append(callOpts, grpc.UseCompressor(compressor))
}

conn, err := grpc.Dial(p.addr, grpc.WithInsecure(), grpc.WithDefaultCallOptions(callOpts...))
dialOpts := []grpc.DialOption{grpc.WithDefaultCallOptions(callOpts...)}
if p.tlsConfig != nil {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(p.tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithInsecure())
}

conn, err := grpc.Dial(p.addr, dialOpts...)
if err != nil {
p.logger.Error("pump create grpc dial failed", zap.Error(err))
p.pullCli = nil
Expand Down
2 changes: 1 addition & 1 deletion drainer/pump_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (x *mockPumpPullBinlogsClient) Recv() (*binlog.PullBinlogResp, error) {

func (s *pumpSuite) TestPullBinlog(c *C) {
errChan := make(chan error, 10)
p := NewPump("pump_test", "", 0, 5, errChan)
p := NewPump("pump_test", "", nil, 0, 5, errChan)
p.grpcConn = &grpc.ClientConn{}
binlogBytesChan := make(chan []byte, 10)
p.pullCli = &mockPumpPullBinlogsClient{binlogBytesChan: binlogBytesChan}
Expand Down
Loading