Skip to content

Commit

Permalink
[bug] logtail: fix data race, a new worker send all requests
Browse files Browse the repository at this point in the history
  • Loading branch information
volgariver6 committed Dec 4, 2024
1 parent f7ad01c commit 86e9c4a
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 34 deletions.
1 change: 1 addition & 0 deletions pkg/common/moerr/cause.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ var (
CauseInitEnginePack = NewInternalError(context.Background(), "InitEnginePack")
//pkg/vm/message
CauseReceiveMessage = NewInternalError(context.Background(), "ReceiveMessage")
CauseLogTailRequest = NewInternalError(context.Background(), "LogTailRequest")
)

func AttachCause(ctx context.Context, err error) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func New(
}

e.pClient.LogtailRPCClientFactory = DefaultNewRpcStreamToTnLogTailService
e.pClient.ctx = ctx
return e
}

Expand Down
31 changes: 13 additions & 18 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ const (

// push client related constants.
// maxSubscribeRequestPerSecond : max number of subscribe request we allowed per second.
// defaultRequestDeadline : default deadline for every request (subscribe and unsubscribe).
maxSubscribeRequestPerSecond = 10000
defaultRequestDeadline = 2 * time.Minute

// subscribe related constants.
// periodToCheckTableSubscribeSucceed : check table subscribe status period after push client send a subscribe request.
Expand Down Expand Up @@ -117,6 +115,7 @@ var (
// 1. if we want to lock both subscriber and subscribed, we should lock subscriber first.
// -----------------------------------------------------------------------------------------------------
type PushClient struct {
ctx context.Context
serviceID string
// Responsible for sending subscription / unsubscription requests to the service
// and receiving the log tail from service.
Expand Down Expand Up @@ -307,7 +306,12 @@ func (c *PushClient) init(
}
c.initialized = true

return c.subscriber.init(e.GetService(), serviceAddr, c.LogtailRPCClientFactory)
return c.subscriber.init(
c.ctx,
e.GetService(),
serviceAddr,
c.LogtailRPCClientFactory,
)
}

func (c *PushClient) SetReconnectHandler(handler func()) {
Expand Down Expand Up @@ -1436,6 +1440,7 @@ func DefaultNewRpcStreamToTnLogTailService(
}

func (s *logTailSubscriber) init(
ctx context.Context,
sid string,
serviceAddr string,
rpcStreamFactory func(string, string, morpc.RPCClient) (morpc.RPCClient, morpc.Stream, error)) (err error) {
Expand Down Expand Up @@ -1465,7 +1470,11 @@ func (s *logTailSubscriber) init(
s.rpcStream = rpcStream

// new the log tail client.
s.logTailClient, err = service.NewLogtailClient(s.rpcStream, service.WithClientRequestPerSecond(maxSubscribeRequestPerSecond))
s.logTailClient, err = service.NewLogtailClient(
ctx,
s.rpcStream,
service.WithClientRequestPerSecond(maxSubscribeRequestPerSecond),
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1510,27 +1519,13 @@ func (s *logTailSubscriber) waitReady(ctx context.Context) error {
// can't call this method directly.
func (s *logTailSubscriber) subscribeTable(
ctx context.Context, tblId api.TableID) error {
// set a default deadline for ctx if it doesn't have.
if _, ok := ctx.Deadline(); !ok {
newCtx, cancel := context.WithTimeoutCause(ctx, defaultRequestDeadline, moerr.CauseSubscribeTable)
_ = cancel
err := s.logTailClient.Subscribe(newCtx, tblId)
return moerr.AttachCause(ctx, err)
}
err := s.logTailClient.Subscribe(ctx, tblId)
return moerr.AttachCause(ctx, err)
}

// can't call this method directly.
func (s *logTailSubscriber) unSubscribeTable(
ctx context.Context, tblId api.TableID) error {
// set a default deadline for ctx if it doesn't have.
if _, ok := ctx.Deadline(); !ok {
newCtx, cancel := context.WithTimeoutCause(ctx, defaultRequestDeadline, moerr.CauseUnSubscribeTable)
_ = cancel
err := s.logTailClient.Unsubscribe(newCtx, tblId)
return moerr.AttachCause(ctx, err)
}
err := s.logTailClient.Unsubscribe(ctx, tblId)
return moerr.AttachCause(ctx, err)
}
Expand Down
80 changes: 64 additions & 16 deletions pkg/vm/engine/tae/logtail/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package service
import (
"context"
"sync"
"time"

"go.uber.org/ratelimit"
"go.uber.org/zap"
Expand All @@ -29,6 +30,12 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
)

const (
defaultRequestChanSize = 512
// defaultRequestDeadline : default deadline for every request (subscribe and unsubscribe).
defaultRequestDeadline = 2 * time.Minute
)

type ClientOption func(*LogtailClient)

func WithClientRequestPerSecond(rps int) ClientOption {
Expand All @@ -39,6 +46,13 @@ func WithClientRequestPerSecond(rps int) ClientOption {

// LogtailClient encapsulates morpc stream.
type LogtailClient struct {
ctx context.Context
cancel context.CancelFunc

// requestC is a chan, which receives all sub/unsub request.
// There is another worker send the items in the chan to stream.
requestC chan *LogtailRequest

stream morpc.Stream
recvChan chan morpc.Message
broken chan struct{} // mark morpc stream as broken when necessary
Expand All @@ -52,10 +66,14 @@ type LogtailClient struct {
}

// NewLogtailClient constructs LogtailClient.
func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
func NewLogtailClient(ctx context.Context, stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
ctx, cancel := context.WithCancel(ctx)
client := &LogtailClient{
stream: stream,
broken: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
}

recvChan, err := stream.Receive()
Expand All @@ -71,6 +89,12 @@ func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient
}
client.limiter = ratelimit.New(client.options.rps)

go func() {
if wErr := client.sendWorker(); wErr != nil {
logutil.Infof("logtail client send worker returned: %v", wErr)
}
}()

return client, nil
}

Expand All @@ -80,6 +104,9 @@ func (c *LogtailClient) Close() error {
if err != nil {
logutil.Error("logtail client: fail to close morpc stream", zap.Error(err))
}
if c.cancel != nil {
c.cancel()
}
return err
}

Expand All @@ -100,13 +127,7 @@ func (c *LogtailClient) Subscribe(
Table: &table,
},
}
request.SetID(c.stream.ID())

err := c.stream.Send(ctx, request)
if err != nil {
logutil.Error("logtail client: fail to subscribe via morpc stream", zap.Error(err))
}
return err
return c.sendRequest(request)
}

// Unsubscribe cancel subscription for table.
Expand All @@ -126,12 +147,7 @@ func (c *LogtailClient) Unsubscribe(
Table: &table,
},
}
request.SetID(c.stream.ID())
err := c.stream.Send(ctx, request)
if err != nil {
logutil.Error("logtail client: fail to unsubscribe via morpc stream", zap.Error(err))
}
return err
return c.sendRequest(request)
}

// Receive fetches logtail response.
Expand Down Expand Up @@ -198,3 +214,35 @@ func (c *LogtailClient) streamBroken() bool {
}
return false
}

func (c *LogtailClient) sendRequest(request *LogtailRequest) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()

case c.requestC <- request:
return nil
}
}

func (c *LogtailClient) sendWorker() error {
sendFn := func(request *LogtailRequest) error {
request.SetID(c.stream.ID())
ctx, cancel := context.WithTimeoutCause(c.ctx, defaultRequestDeadline, moerr.CauseLogTailRequest)
defer cancel()
return c.stream.Send(ctx, request)
}

for {
select {
case <-c.ctx.Done():
return c.ctx.Err()

case request := <-c.requestC:
if err := sendFn(request); err != nil {
logutil.Error("logtail client: fail to send sub/unsub request via morpc stream", zap.Error(err))
return err
}
}
}
}

0 comments on commit 86e9c4a

Please sign in to comment.