Skip to content

Commit

Permalink
fix disorder in reconnecting logtail 2.0 (#21175)
Browse files Browse the repository at this point in the history
-Resolve inconsistencies in pausing and resuming logtail consumers.

Approved by: @LeftHandCold, @heni02, @XuPeng-SH, @volgariver6, @sukki37
  • Loading branch information
aptend authored Jan 10, 2025
1 parent d364247 commit 20523e0
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 16 deletions.
15 changes: 14 additions & 1 deletion pkg/objectio/injects.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ const (
FJ_LogRanges = "fj/log/ranges"
FJ_LogPartitionState = "fj/log/partitionstate"

FJ_CNRecvErr = "fj/cn/recv/err"
FJ_CNRecvErr = "fj/cn/recv/err"
FJ_CNSubSysErr = "fj/cn/recv/subsyserr"
FJ_CNReplayCacheErr = "fj/cn/recv/rcacheerr"

FJ_LogReader = "fj/log/reader"
FJ_LogWorkspace = "fj/log/workspace"
)
Expand Down Expand Up @@ -240,6 +243,16 @@ func CNRecvErrInjected() (bool, int) {
return injected, int(p)
}

func CNSubSysErrInjected() (bool, int) {
p, _, injected := fault.TriggerFault(FJ_CNSubSysErr)
return injected, int(p)
}

func CNReplayCacheErrInjected() (bool, int) {
p, _, injected := fault.TriggerFault(FJ_CNReplayCacheErr)
return injected, int(p)
}

func RangesLogInjected(dbName, tableName string) (bool, int) {
_, sarg, injected := fault.TriggerFault(FJ_LogRanges)
if !injected {
Expand Down
30 changes: 24 additions & 6 deletions pkg/vm/engine/disttae/logtail_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,9 @@ func (c *PushClient) subscribeTable(
}

func (c *PushClient) subSysTables(ctx context.Context) error {
if enabled, p := objectio.CNSubSysErrInjected(); enabled && rand.Intn(100000) < p {
return moerr.NewInternalError(ctx, "FIND_TABLE sub sys error injected")
}
// push subscription to Table `mo_database`, `mo_table`, `mo_column` of mo_catalog.
databaseId := uint64(catalog.MO_CATALOG_ID)
tableIds := []uint64{catalog.MO_DATABASE_ID, catalog.MO_TABLES_ID, catalog.MO_COLUMNS_ID}
Expand All @@ -555,6 +558,10 @@ func (c *PushClient) pause(s bool) {
if c.mu.paused {
return
}
// Note
// If subSysTables fails to send a successful request, receiveLogtails will receive nothing until the context is done. In this case, we attempt to stop the receiveLogtails goroutine immediately.
// The break signal left in the channel will interrupt the normal receiving process, but this is not an issue because reconnecting will create a new channel.
c.subscriber.logTailClient.BreakoutReceive()
select {
case c.pauseC <- s:
c.mu.paused = true
Expand Down Expand Up @@ -733,6 +740,9 @@ func (c *PushClient) waitTimestamp() {
}

func (c *PushClient) replayCatalogCache(ctx context.Context, e *Engine) (err error) {
if enabled, p := objectio.CNReplayCacheErrInjected(); enabled && rand.Intn(100000) < p {
return moerr.NewInternalError(ctx, "FIND_TABLE replay catalog cache error injected")
}
// replay mo_catalog cache
var op client.TxnOperator
var result executor.Result
Expand Down Expand Up @@ -873,6 +883,7 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) {

e.setPushClientStatus(false)

// the consumer goroutine is supposed to be stopped.
c.stopConsumers()

logutil.Infof("%s %s: clean finished, start to reconnect to tn log tail service", logTag, c.serviceID)
Expand Down Expand Up @@ -912,17 +923,23 @@ func (c *PushClient) connect(ctx context.Context, e *Engine) {
c.dcaReset()
err = c.subSysTables(ctx)
if err != nil {
c.pause(false)
// send on closed channel error:
// receive logtail error -> pause -> reconnect -------------------------> stop
// |-> forced subscribe table timeout -> continue ----> resume
// Any errors related to the logtail consumer should not be retried within the inner connect loop; they should be handled by the outer caller.
// So we break the loop here.

c.pause(true)
logutil.Errorf("%s subscribe system tables failed, err %v", logTag, err)
continue
break
}

c.waitTimestamp()

if err := c.replayCatalogCache(ctx, e); err != nil {
c.pause(false)
c.pause(true)
logutil.Errorf("%s replay catalog cache failed, err %v", logTag, err)
continue
break
}

e.setPushClientStatus(true)
Expand Down Expand Up @@ -1259,7 +1276,8 @@ func (c *PushClient) isNotSubscribing(ctx context.Context, dbId, tblId uint64) (
}
//table is unsubscribed
if !c.subscriber.ready() {
return true, Unsubscribed, moerr.NewInternalError(ctx, "log tail subscriber is not ready")
// let wait the subscriber ready.
return false, Unsubscribed, nil //moerr.NewInternalError(ctx, "log tail subscriber is not ready")
}
c.subscribed.m[tblId] = SubTableStatus{
DBID: dbId,
Expand Down Expand Up @@ -1287,7 +1305,7 @@ func (c *PushClient) isNotUnsubscribing(ctx context.Context, dbId, tblId uint64)
}
//table is unsubscribed
if !c.subscriber.ready() {
return true, Unsubscribed, moerr.NewInternalError(ctx, "log tail subscriber is not ready")
return false, Unsubscribed, nil //moerr.NewInternalError(ctx, "log tail subscriber is not ready")
}
c.subscribed.m[tblId] = SubTableStatus{
DBID: dbId,
Expand Down
27 changes: 18 additions & 9 deletions pkg/vm/engine/tae/logtail/service/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ type LogtailClient struct {
// There is another worker send the items in the chan to stream.
requestC chan *LogtailRequest

stream morpc.Stream
recvChan chan morpc.Message
broken chan struct{} // mark morpc stream as broken when necessary
once sync.Once
stream morpc.Stream
recvChan chan morpc.Message
breakChan chan struct{}
broken chan struct{} // mark morpc stream as broken when necessary
once sync.Once

options struct {
rps int
Expand All @@ -69,11 +70,12 @@ type LogtailClient struct {
func NewLogtailClient(ctx context.Context, stream morpc.Stream, opts ...ClientOption) (*LogtailClient, error) {
ctx, cancel := context.WithCancel(ctx)
client := &LogtailClient{
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
ctx: ctx,
cancel: cancel,
requestC: make(chan *LogtailRequest, defaultRequestChanSize),
stream: stream,
broken: make(chan struct{}),
breakChan: make(chan struct{}, 10),
}

recvChan, err := stream.Receive()
Expand Down Expand Up @@ -150,6 +152,10 @@ func (c *LogtailClient) Unsubscribe(
return c.sendRequest(request)
}

func (c *LogtailClient) BreakoutReceive() {
c.breakChan <- struct{}{}
}

// Receive fetches logtail response.
//
// 1. response for error: *LogtailResponse.GetError() != nil
Expand All @@ -162,6 +168,9 @@ func (c *LogtailClient) Receive(ctx context.Context) (*LogtailResponse, error) {
case <-ctx.Done():
return nil, ctx.Err()

case <-c.breakChan:
return nil, moerr.NewInternalErrorNoCtx("logtail client: reconnect breakout")

case <-c.broken:
return nil, moerr.NewStreamClosedNoCtx()

Expand Down
43 changes: 43 additions & 0 deletions test/distributed/cases/dml/checkpoint/connectlogtail.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
select enable_fault_injection();
enable_fault_injection()
true
select add_fault_point('fj/cn/recv/err', ':::', 'echo', 1000, '');
add_fault_point(fj/cn/recv/err, :::, echo, 1000, )
true
select add_fault_point('fj/cn/recv/subsyserr', ':::', 'echo', 30000, '');
add_fault_point(fj/cn/recv/subsyserr, :::, echo, 30000, )
true
select sleep(1);
sleep(1)
0
select remove_fault_point('fj/cn/recv/subsyserr');
remove_fault_point(fj/cn/recv/subsyserr)
true
select add_fault_point('fj/cn/recv/rcacheerr', ':::', 'echo', 70000, '');
add_fault_point(fj/cn/recv/rcacheerr, :::, echo, 70000, )
true
select sleep(1);
sleep(1)
0
select remove_fault_point('fj/cn/recv/rcacheerr');
remove_fault_point(fj/cn/recv/rcacheerr)
true
select disable_fault_injection();
disable_fault_injection()
true
select sleep(1);
sleep(1)
0
create database logtail_reconnect;
show databases;
Database
connectlogtail
information_schema
logtail_reconnect
mo_catalog
mo_debug
mo_task
mysql
system
system_metrics
drop database logtail_reconnect;
23 changes: 23 additions & 0 deletions test/distributed/cases/dml/checkpoint/connectlogtail.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@

select enable_fault_injection();
select add_fault_point('fj/cn/recv/err', ':::', 'echo', 1000, '');
select add_fault_point('fj/cn/recv/subsyserr', ':::', 'echo', 30000, '');

select sleep(1);

select remove_fault_point('fj/cn/recv/subsyserr');

select add_fault_point('fj/cn/recv/rcacheerr', ':::', 'echo', 70000, '');

select sleep(1);

select remove_fault_point('fj/cn/recv/rcacheerr');
select disable_fault_injection();

select sleep(1);

create database logtail_reconnect;
show databases;
drop database logtail_reconnect;


0 comments on commit 20523e0

Please sign in to comment.