Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unhandled i/o timeout error leads to infinite busy loop in startResponseRouter() function #61

Open
orzel7 opened this issue Nov 6, 2021 · 9 comments · May be fixed by #62
Open

Unhandled i/o timeout error leads to infinite busy loop in startResponseRouter() function #61

orzel7 opened this issue Nov 6, 2021 · 9 comments · May be fixed by #62

Comments

@orzel7
Copy link

orzel7 commented Nov 6, 2021

Expected Behavior

No busy loop, properly handling of network errors.

Actual Behavior

When i/o timeout error occurs, startResponseRouter() function enters busy loop here:

res, err := l.receiver.Receive(context.Background())

// You'll see this when the link is shutting down (either
// service-initiated via 'detach' or a user-initiated shutdown)
if isClosedError(err) {
    l.broadcastError(err)
    break
}

// I don't believe this should happen. The JS version of this same code
// ignores errors as well since responses should always be correlated
// to actual send requests. So this is just here for completeness.
if res == nil {
    continue
}

Environment

  • OS: Ubuntu 20.04
  • Go version: 17.1
  • Version of Library: 3.2.1
@orzel7
Copy link
Author

orzel7 commented Nov 11, 2021

Example, how to reproduce problem:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	servicebus "github.com/Azure/azure-service-bus-go"
)

const (
	connStr = "Endpoint=sb://........connection string.........."
)

func main() {

	ctx := context.Background()

	sb, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		log.Fatal(err)
	}

	q, err := sb.NewQueue("test1.queue2")
	rcv3, err := q.NewReceiver(ctx)

	handler := func(ctx context.Context, m *servicebus.Message) error {

		fmt.Printf("Received message: %s[%v]\n", m.Data, m.SystemProperties)

		err := q.RenewLocks(ctx, m)
		if err != nil {
			fmt.Println("RenewLocks failed with error: ", err)
			return err
		}

		time.Sleep(5 * time.Minute)

		err = m.Complete(ctx)
		if err != nil {
			fmt.Println("Complete failed with error:", err)
		}

		fmt.Println("DONE")

		return err
	}

	for {
		err := rcv3.ReceiveOne(ctx, servicebus.HandlerFunc(handler))
		if err != nil {
			fmt.Println("ReceiveOne failed:", err)
			err = rcv3.Recover(ctx)
			if err != nil {
				fmt.Println("OOPS, Recover failed:", err)
				break
			}
		}
	}
}

Run this program
Send message to queue
Disable internet connection(Unplugging ethernet cable or turn off WiFi)
Wait about 1 minute(in my case, message lock duration is 30 sec)
Wait about 5 minutes
You will see logs like:

Received message: test10[&{2021-11-11 16:35:06.043 +0000 UTC 0xc0000a0550 2021-11-11 10:11:21.387 +0000 UTC map[x-opt-enqueued-time:2021-11-11 10:11:21.387 +0000 UTC x-opt-locked-until:2021-11-11 16:35:06.043 +0000 UTC x-opt-message-state:0 x-opt-sequence-number:9]}]
Complete failed with error: read tcp 192.168.1.4:42608->23.97.226.21:5671: i/o timeout
DONE
ReceiveOne failed: read tcp 192.168.1.4:42608->23.97.226.21:5671: i/o timeout
Received message: test10[&{2021-11-11 16:40:07.592 +0000 UTC 0xc00016a230 2021-11-11 10:11:21.387 +0000 UTC map[x-opt-enqueued-time:2021-11-11 10:11:21.387 +0000 UTC x-opt-locked-until:2021-11-11 16:40:07.592 +0000 UTC x-opt-message-state:0 x-opt-sequence-number:9]}]
RenewLocks failed with error: amqp: link detached

Additionally, you would notice high CPU usage by process "main" :
Screenshot from 2021-11-11 17-58-11
Ok, let kill process and analyze callstack: kill -11 720652

goroutine 54 [runnable]:
errors.Is(0x7c52e0, 0x99f120, 0x7c4fc0, 0xc000098700, 0x0)
	/home/emironov/sdk/go1.16.5/src/errors/wrap.go:49 +0xe5
github.com/Azure/azure-amqp-common-go/v3/rpc.isClosedError(0x7c53a0, 0xc0000b80a0, 0xc0000a0000)
	/home/emironov/projects/amqp-go-test/vendor/github.com/Azure/azure-amqp-common-go/v3/rpc/rpc.go:502 +0xe6
github.com/Azure/azure-amqp-common-go/v3/rpc.(*Link).startResponseRouter(0xc0005a2080)
	/home/emironov/projects/amqp-go-test/vendor/github.com/Azure/azure-amqp-common-go/v3/rpc/rpc.go:227 +0x7c
created by github.com/Azure/azure-amqp-common-go/v3/rpc.(*Link).RPC.func1
	/home/emironov/projects/amqp-go-test/vendor/github.com/Azure/azure-amqp-common-go/v3/rpc/rpc.go:258 +0x3e

@orzel7
Copy link
Author

orzel7 commented Nov 11, 2021

As you can see, recovering doesn't help, management link won't recover.

@jhendrixMSFT
Copy link
Member

I think there are two issues here.

  • When error is not nil and is not a "close error" we assume this is some transient error and retry. However we don't pause between attempts leading to excessive CPU consumption.
  • Recovering the client doesn't cause the RPC "response router" to recover.

Does that sound right?

@jhendrixMSFT
Copy link
Member

Indeed, the RPC client lives on the Queue itself, not the Receiver, so recovering it would have no effect.

@orzel7
Copy link
Author

orzel7 commented Nov 15, 2021

I think there are two issues here.
Yes, exactly, I've found two issues.

  • When error is not nil and is not a "close error" we assume this is some transient error and retry. However we don't pause between attempts leading to excessive CPU consumption.
    Yes, but i don't understand what is transient error in this context. In current implementation, any error leads to close Detach channel. Once closed, channel stays closed forever. So, there is no way to auto-recover without re-creating link.
  • Recovering the client doesn't cause the RPC "response router" to recover.
    Yes, exactly.

There is third problem here - RPC link doesn't refresh auth token, but that's another story.

@orzel7
Copy link
Author

orzel7 commented Nov 15, 2021

Please, consider my changes not as a final solution, but as a code, illustrating problems found.

@sruthiramani
Copy link

Hi,
I'm seeing this error now and then while using the library.

amqp: link closed

Is this related to this i/o issue?

@jhendrixMSFT
Copy link
Member

@sruthiramani without more details it's hard to say. Can you please open a new issue and provide relevant info like version and which API call(s) you see returning this error?

@sruthiramani
Copy link

@sruthiramani without more details it's hard to say. Can you please open a new issue and provide relevant info like version and which API call(s) you see returning this error?

I've opened a new issue with the details
#65
Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants