Skip to content

Commit

Permalink
Merge branch 'master' into fix-get-cgroup-cpu2
Browse files Browse the repository at this point in the history
  • Loading branch information
hnes authored Feb 16, 2023
2 parents a746178 + 951dbc6 commit 4cf1374
Show file tree
Hide file tree
Showing 26 changed files with 559 additions and 371 deletions.
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230201112839-2b853bed8125
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.0
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.1
go.uber.org/goleak v1.1.11
go.uber.org/zap v1.20.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
Expand Down
137 changes: 105 additions & 32 deletions cmd/pd-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/autoscaling"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/dashboard"
"github.com/tikv/pd/pkg/errs"
tso "github.com/tikv/pd/pkg/mcs/tso/server"
"github.com/tikv/pd/pkg/swaggerserver"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
Expand All @@ -41,43 +42,83 @@ import (
)

func main() {
ctx, cancel, svr := createServerWrapper(os.Args[1:])

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()
rootCmd := &cobra.Command{
Use: "pd-server",
Short: "Placement Driver server",
Run: createServerWrapper,
}

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
rootCmd.Flags().BoolP("version", "V", false, "print version information and exit")
rootCmd.Flags().StringP("config", "", "", "config file")
rootCmd.Flags().BoolP("config-check", "", false, "check config file validity and exit")
rootCmd.Flags().StringP("name", "", "", "human-readable name for this pd member")
rootCmd.Flags().StringP("data-dir", "", "", "path to the data directory (default 'default.${name}')")
rootCmd.Flags().StringP("client-urls", "", "http://127.0.0.1:2379", "url for client traffic")
rootCmd.Flags().StringP("advertise-client-urls", "", "", "advertise url for client traffic (default '${client-urls}')")
rootCmd.Flags().StringP("peer-urls", "", "http://127.0.0.1:2379", "url for peer traffic")
rootCmd.Flags().StringP("advertise-peer-urls", "", "", "advertise url for peer traffic (default '${peer-urls}')")
rootCmd.Flags().StringP("initial-cluster", "", "", "initial cluster configuration for bootstrapping, e,g. pd=http://127.0.0.1:2380")
rootCmd.Flags().StringP("join", "", "", "join to an existing cluster (usage: cluster's '${advertise-client-urls}'")
rootCmd.Flags().StringP("metrics-addr", "", "", "prometheus pushgateway address, leaves it empty will disable prometheus push")
rootCmd.Flags().StringP("log-level", "L", "info", "log level: debug, info, warn, error, fatal (default 'info')")
rootCmd.Flags().StringP("log-file", "", "", "log file path")
rootCmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
rootCmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
rootCmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
rootCmd.Flags().BoolP("force-new-cluster", "", false, "force to create a new one-member cluster")
rootCmd.AddCommand(NewServiceCommand())

rootCmd.SetOutput(os.Stdout)
if err := rootCmd.Execute(); err != nil {
rootCmd.Println(err)
os.Exit(1)
}
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))
// NewServiceCommand returns the service command.
func NewServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "service <tso>",
Short: "Run a service",
}
cmd.AddCommand(NewTSOServiceCommand())
return cmd
}

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
// NewTSOServiceCommand returns the unsafe remove failed stores command.
func NewTSOServiceCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "tso",
Short: "Run the tso service",
Run: tso.CreateServerWrapper,
}
cmd.Flags().BoolP("version", "V", false, "print version information and exit")
cmd.Flags().StringP("config", "", "", "config file")
cmd.Flags().StringP("backend-endpoints", "", "http://127.0.0.1:2379", "url for etcd client")
cmd.Flags().StringP("listen-addr", "", "", "listen address for tso service")
cmd.Flags().StringP("cacert", "", "", "path of file that contains list of trusted TLS CAs")
cmd.Flags().StringP("cert", "", "", "path of file that contains X509 certificate in PEM format")
cmd.Flags().StringP("key", "", "", "path of file that contains X509 key in PEM format")
return cmd
}

func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func createServerWrapper(cmd *cobra.Command, args []string) {
schedulers.Register()
cfg := config.NewConfig()
err := cfg.Parse(args)
flagSet := cmd.Flags()
flagSet.Parse(args)
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
server.PrintPDInfo()
exit(0)
}
Expand All @@ -92,15 +133,21 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
configCheck, err := flagSet.GetBool("config-check")
if err != nil {
cmd.Println(err)
return
}

if configCheck {
server.PrintConfigCheckMsg(cfg)
exit(0)
}

// New zap logger
err = cfg.SetupLogger()
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.GetZapLogger(), cfg.GetZapLogProperties())
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
Expand Down Expand Up @@ -132,7 +179,33 @@ func createServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("create server failed", errs.ZapError(err))
}

return ctx, cancel, svr
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
75 changes: 58 additions & 17 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,19 @@ import (
"flag"
"net/http"
"os"
"os/signal"
"syscall"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"github.com/pingcap/errors"
"github.com/pingcap/log"
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/spf13/cobra"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/tso"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

// Server is the TSO server, and it implements bs.Server.
Expand Down Expand Up @@ -69,12 +72,24 @@ func (s *Server) GetHTTPClient() *http.Client {
}

// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server
func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs.Server) {
func CreateServerWrapper(cmd *cobra.Command, args []string) {
cmd.Flags().Parse(args)
cfg := tso.NewConfig()
err := cfg.Parse(os.Args[1:])
flagSet := cmd.Flags()
err := cfg.Parse(flagSet)
if err != nil {
cmd.Println(err)
return
}

if cfg.Version {
printVersionInfo()
printVersion, err := flagSet.GetBool("version")
if err != nil {
cmd.Println(err)
return
}
if printVersion {
// TODO: support printing TSO server info
// server.PrintTSOInfo()
exit(0)
}

Expand All @@ -88,29 +103,55 @@ func CreateServerWrapper(args []string) (context.Context, context.CancelFunc, bs
log.Fatal("parse cmd flags error", errs.ZapError(err))
}

if cfg.ConfigCheck {
printConfigCheckMsg(cfg)
exit(0)
// New zap logger
err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
if err == nil {
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
} else {
log.Fatal("initialize logger error", errs.ZapError(err))
}
// Flushing any buffered log entries
defer log.Sync()

// TODO: Initialize logger
// TODO: support printing TSO server info
// LogTSOInfo()

// TODO: Make it configurable if it has big impact on performance.
grpcprometheus.EnableHandlingTimeHistogram()

metricutil.Push(&cfg.Metric)

// TODO: Create the server
ctx, cancel := context.WithCancel(context.Background())
svr := &Server{}

sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

var sig os.Signal
go func() {
sig = <-sc
cancel()
}()

if err := svr.Run(); err != nil {
log.Fatal("run server failed", errs.ZapError(err))
}

return nil, nil, nil
}

// TODO: implement it
func printVersionInfo() {
}
<-ctx.Done()
log.Info("Got signal to exit", zap.String("signal", sig.String()))

// TODO: implement it
func printConfigCheckMsg(cfg *tso.Config) {
svr.Close()
switch sig {
case syscall.SIGTERM:
exit(0)
default:
exit(1)
}
}

func exit(code int) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/mock/mockcluster/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/server/schedule/placement"
)

// SetMaxMergeRegionSize updates the MaxMergeRegionSize configuration.
Expand Down Expand Up @@ -145,3 +146,26 @@ func (mc *Cluster) updateReplicationConfig(f func(*config.ReplicationConfig)) {
f(r)
mc.SetReplicationConfig(r)
}

// SetMaxReplicasWithLabel sets the max replicas for the cluster in two ways.
func (mc *Cluster) SetMaxReplicasWithLabel(enablePlacementRules bool, num int, labels ...string) {
if len(labels) == 0 {
labels = []string{"zone", "rack", "host"}
}
if enablePlacementRules {
rule := &placement.Rule{
GroupID: "pd",
ID: "default",
Index: 1,
StartKey: []byte(""),
EndKey: []byte(""),
Role: placement.Voter,
Count: num,
LocationLabels: labels,
}
mc.SetRule(rule)
} else {
mc.SetMaxReplicas(num)
mc.SetLocationLabels(labels)
}
}
Loading

0 comments on commit 4cf1374

Please sign in to comment.