Skip to content

Commit

Permalink
This is an automated cherry-pick of #10904
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
CharlesCheung96 authored and ti-chi-bot committed Apr 27, 2024
1 parent 986ebf7 commit de21c33
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 7 deletions.
7 changes: 1 addition & 6 deletions cdc/redo/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
const (
emitBatch = mysql.DefaultMaxTxnRow
defaultReaderChanSize = mysql.DefaultWorkerCount * emitBatch
maxTotalMemoryUsage = 90.0
maxTotalMemoryUsage = 80.0
maxWaitDuration = time.Minute * 2
)

Expand Down Expand Up @@ -221,11 +221,6 @@ func (l *LogReader) runReader(egCtx context.Context, cfg *readerConfig) error {
case l.rowCh <- row:
}
}
err := util.WaitMemoryAvailable(maxTotalMemoryUsage, maxWaitDuration)
if err != nil {
return errors.Trace(err)
}

case redo.RedoDDLLogFileType:
ddl := model.LogToDDL(item.data.RedoDDL)
if ddl != nil && ddl.CommitTs > cfg.startTs && ddl.CommitTs <= cfg.endTs {
Expand Down
38 changes: 37 additions & 1 deletion pkg/cmd/redo/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,27 @@
package redo

import (
"net/http"
_ "net/http/pprof" // init pprof
"net/url"
"runtime/debug"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/applier"
cmdcontext "github.com/pingcap/tiflow/pkg/cmd/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"github.com/spf13/cobra"
"go.uber.org/zap"
)

// applyRedoOptions defines flags for the `redo apply` command.
type applyRedoOptions struct {
options
sinkURI string
sinkURI string
enableProfiling bool
memoryLimitInGiBytes int64
}

// newapplyRedoOptions creates new applyRedoOptions for the `redo apply` command.
Expand All @@ -39,6 +48,8 @@ func (o *applyRedoOptions) addFlags(cmd *cobra.Command) {
cmd.Flags().StringVar(&o.sinkURI, "sink-uri", "", "target database sink-uri")
// the possible error returned from MarkFlagRequired is `no such flag`
cmd.MarkFlagRequired("sink-uri") //nolint:errcheck
cmd.Flags().BoolVar(&o.enableProfiling, "enable-profiling", true, "enable pprof profiling")
cmd.Flags().Int64Var(&o.memoryLimitInGiBytes, "memory-limit", 10, "memory limit in GiB")
}

//nolint:unparam
Expand All @@ -55,13 +66,38 @@ func (o *applyRedoOptions) complete(cmd *cobra.Command) error {
sinkURI.RawQuery = rawQuery.Encode()
o.sinkURI = sinkURI.String()
}

totalMemory, err := util.GetMemoryLimit()
if err == nil {
totalMemoryInBytes := int64(float64(totalMemory) * 0.8)
memoryLimitInBytes := o.memoryLimitInGiBytes * 1024 * 1024 * 1024
if totalMemoryInBytes != 0 && memoryLimitInBytes > totalMemoryInBytes {
memoryLimitInBytes = totalMemoryInBytes
}
debug.SetMemoryLimit(memoryLimitInBytes)
log.Info("set memory limit", zap.Int64("memoryLimit", memoryLimitInBytes))
}

return nil
}

// run runs the `redo apply` command.
func (o *applyRedoOptions) run(cmd *cobra.Command) error {
ctx := cmdcontext.GetDefaultContext()

if o.enableProfiling {
go func() {
server := &http.Server{
Addr: ":6060",
ReadHeaderTimeout: 5 * time.Second,
}
log.Info("Start http pprof server", zap.String("addr", server.Addr))
if err := server.ListenAndServe(); err != nil {
log.Fatal("http pprof", zap.Error(err))
}
}()
}

cfg := &applier.RedoApplierConfig{
Storage: o.storage,
SinkURI: o.sinkURI,
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package util

import (
"context"
"math"

"github.com/KimMachineGun/automemlimit/memlimit"
Expand All @@ -39,3 +40,39 @@ func GetMemoryLimit() (uint64, error) {
}
return totalMemory, nil
}
<<<<<<< HEAD

Check failure on line 43 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 43 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body
=======

// CheckMemoryUsage checks if the memory usage is less than the limit.
func CheckMemoryUsage(limit float64) (bool, error) {
stat, err := mem.VirtualMemory()
if err != nil {
return false, err
}

log.Info("check memory usage", zap.Any("memory", stat))
return stat.UsedPercent < limit, nil
}

// WaitMemoryAvailable waits until the memory usage is less than the limit.
func WaitMemoryAvailable(ctx context.Context, limit float64, timeout time.Duration) error {
ticker := time.NewTicker(time.Second * 5)
timeoutTimer := time.NewTimer(timeout)
for {
select {
case <-ctx.Done():
return errors.WrapError(errors.ErrWaitFreeMemoryTimeout, ctx.Err())
case <-ticker.C:
hasFreeMemory, err := CheckMemoryUsage(limit)
if err != nil {
return err
}
if hasFreeMemory {
return nil
}
case <-timeoutTimer.C:
return errors.ErrWaitFreeMemoryTimeout.GenWithStackByArgs()
}
}
}
>>>>>>> 35703b2fdd (redo(ticdc): enable pprof and set memory limit for redo applier (#10904))

Check failure on line 78 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

syntax error: non-declaration statement outside function body

Check failure on line 78 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Arm Build (ARM64)

invalid character U+0023 '#'

Check failure on line 78 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

syntax error: non-declaration statement outside function body

Check failure on line 78 in pkg/util/memory.go

View workflow job for this annotation

GitHub Actions / Mac OS Build

invalid character U+0023 '#'

0 comments on commit de21c33

Please sign in to comment.