Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] logtail: fix data race, a new worker send all requests #20565

Merged
merged 5 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/common/moerr/cause.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ var (
CauseInitEnginePack = NewInternalError(context.Background(), "InitEnginePack")
//pkg/vm/message
CauseReceiveMessage = NewInternalError(context.Background(), "ReceiveMessage")
CauseLogTailRequest = NewInternalError(context.Background(), "LogTailRequest")
)

func AttachCause(ctx context.Context, err error) error {
Expand Down
1 change: 1 addition & 0 deletions pkg/vm/engine/disttae/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func New(
}

e.pClient.LogtailRPCClientFactory = DefaultNewRpcStreamToTnLogTailService
e.pClient.ctx = ctx

initMoTableStatsConfig(ctx, e)

Expand Down
31 changes: 13 additions & 18 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ const (

// push client related constants.
// maxSubscribeRequestPerSecond : max number of subscribe request we allowed per second.
// defaultRequestDeadline : default deadline for every request (subscribe and unsubscribe).
maxSubscribeRequestPerSecond = 10000
defaultRequestDeadline = 2 * time.Minute

// subscribe related constants.
// periodToCheckTableSubscribeSucceed : check table subscribe status period after push client send a subscribe request.
Expand Down Expand Up @@ -117,6 +115,7 @@ var (
// 1. if we want to lock both subscriber and subscribed, we should lock subscriber first.
// -----------------------------------------------------------------------------------------------------
type PushClient struct {
ctx context.Context
serviceID string
// Responsible for sending subscription / unsubscription requests to the service
// and receiving the log tail from service.
Expand Down Expand Up @@ -307,7 +306,12 @@ func (c *PushClient) init(
}
c.initialized = true

return c.subscriber.init(e.GetService(), serviceAddr, c.LogtailRPCClientFactory)
return c.subscriber.init(
c.ctx,
e.GetService(),
serviceAddr,
c.LogtailRPCClientFactory,
)
}

func (c *PushClient) SetReconnectHandler(handler func()) {
Expand Down Expand Up @@ -1436,6 +1440,7 @@ func DefaultNewRpcStreamToTnLogTailService(
}

func (s *logTailSubscriber) init(
ctx context.Context,
sid string,
serviceAddr string,
rpcStreamFactory func(string, string, morpc.RPCClient) (morpc.RPCClient, morpc.Stream, error)) (err error) {
Expand Down Expand Up @@ -1465,7 +1470,11 @@ func (s *logTailSubscriber) init(
s.rpcStream = rpcStream

// new the log tail client.
s.logTailClient, err = service.NewLogtailClient(s.rpcStream, service.WithClientRequestPerSecond(maxSubscribeRequestPerSecond))
s.logTailClient, err = service.NewLogtailClient(
ctx,
s.rpcStream,
service.WithClientRequestPerSecond(maxSubscribeRequestPerSecond),
)
if err != nil {
return err
}
Expand Down Expand Up @@ -1510,27 +1519,13 @@ func (s *logTailSubscriber) waitReady(ctx context.Context) error {
// can't call this method directly.
func (s *logTailSubscriber) subscribeTable(
ctx context.Context, tblId api.TableID) error {
// set a default deadline for ctx if it doesn't have.
if _, ok := ctx.Deadline(); !ok {
newCtx, cancel := context.WithTimeoutCause(ctx, defaultRequestDeadline, moerr.CauseSubscribeTable)
_ = cancel
err := s.logTailClient.Subscribe(newCtx, tblId)
return moerr.AttachCause(ctx, err)
}
err := s.logTailClient.Subscribe(ctx, tblId)
return moerr.AttachCause(ctx, err)
}

// can't call this method directly.
func (s *logTailSubscriber) unSubscribeTable(
ctx context.Context, tblId api.TableID) error {
// set a default deadline for ctx if it doesn't have.
if _, ok := ctx.Deadline(); !ok {
newCtx, cancel := context.WithTimeoutCause(ctx, defaultRequestDeadline, moerr.CauseUnSubscribeTable)
_ = cancel
err := s.logTailClient.Unsubscribe(newCtx, tblId)
return moerr.AttachCause(ctx, err)
}
err := s.logTailClient.Unsubscribe(ctx, tblId)
return moerr.AttachCause(ctx, err)
}
Expand Down
79 changes: 63 additions & 16 deletions pkg/vm/engine/tae/logtail/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package service
import (
"context"
"sync"
"time"

"go.uber.org/ratelimit"
"go.uber.org/zap"
Expand All @@ -29,6 +30,12 @@ import (
v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
)

const (
defaultRequestChanSize = 512
// defaultRequestDeadline : default deadline for every request (subscribe and unsubscribe).
defaultRequestDeadline = 2 * time.Minute
)

type ClientOption func(*LogtailClient)

func WithClientRequestPerSecond(rps int) ClientOption {
Expand All @@ -39,6 +46,13 @@ func WithClientRequestPerSecond(rps int) ClientOption {

// LogtailClient encapsulates morpc stream.
type LogtailClient struct {
ctx context.Context
cancel context.CancelFunc

// requestC is a chan, which receives all sub/unsub request.
// There is another worker send the items in the chan to stream.
requestC chan *LogtailRequest

stream morpc.Stream
recvChan chan morpc.Message
broken chan struct{} // mark morpc stream as broken when necessary
Expand All @@ -52,10 +66,14 @@ type LogtailClient struct {
}

// NewLogtailClient constructs LogtailClient.
func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
func NewLogtailClient(ctx context.Context, stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
ctx, cancel := context.WithCancel(ctx)
client := &LogtailClient{
stream: stream,
broken: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
}

recvChan, err := stream.Receive()
Expand All @@ -71,6 +89,12 @@ func NewLogtailClient(stream morpc.Stream, opts ...ClientOption) (*LogtailClient
}
client.limiter = ratelimit.New(client.options.rps)

go func() {
if wErr := client.sendWorker(); wErr != nil {
logutil.Infof("logtail client send worker returned: %v", wErr)
}
}()

return client, nil
}

Expand All @@ -80,6 +104,9 @@ func (c *LogtailClient) Close() error {
if err != nil {
logutil.Error("logtail client: fail to close morpc stream", zap.Error(err))
}
if c.cancel != nil {
c.cancel()
}
return err
}

Expand All @@ -100,13 +127,7 @@ func (c *LogtailClient) Subscribe(
Table: &table,
},
}
request.SetID(c.stream.ID())

err := c.stream.Send(ctx, request)
if err != nil {
logutil.Error("logtail client: fail to subscribe via morpc stream", zap.Error(err))
}
return err
return c.sendRequest(request)
}

// Unsubscribe cancel subscription for table.
Expand All @@ -126,12 +147,7 @@ func (c *LogtailClient) Unsubscribe(
Table: &table,
},
}
request.SetID(c.stream.ID())
err := c.stream.Send(ctx, request)
if err != nil {
logutil.Error("logtail client: fail to unsubscribe via morpc stream", zap.Error(err))
}
return err
return c.sendRequest(request)
}

// Receive fetches logtail response.
Expand Down Expand Up @@ -198,3 +214,34 @@ func (c *LogtailClient) streamBroken() bool {
}
return false
}

func (c *LogtailClient) sendRequest(request *LogtailRequest) error {
select {
case <-c.ctx.Done():
return c.ctx.Err()

case c.requestC <- request:
return nil
}
}

func (c *LogtailClient) sendWorker() error {
sendFn := func(request *LogtailRequest) error {
request.SetID(c.stream.ID())
ctx, cancel := context.WithTimeoutCause(c.ctx, defaultRequestDeadline, moerr.CauseLogTailRequest)
defer cancel()
return c.stream.Send(ctx, request)
}

for {
select {
case <-c.ctx.Done():
return c.ctx.Err()

case request := <-c.requestC:
if err := sendFn(request); err != nil {
logutil.Error("logtail client: fail to send sub/unsub request via morpc stream", zap.Error(err))
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/logtail/service/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestService(t *testing.T) {
rpcStream, err := rpcClient.NewStream(address, false)
require.NoError(t, err)

logtailClient, err := NewLogtailClient(rpcStream, WithClientRequestPerSecond(100))
logtailClient, err := NewLogtailClient(context.TODO(), rpcStream, WithClientRequestPerSecond(100))
require.NoError(t, err)
defer func() {
err := logtailClient.Close()
Expand Down
Loading