Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blocking XGroupCreateMkStream does not interrupt on context cancellation #2276

Open
jgirtakovskis opened this issue Nov 5, 2022 · 18 comments · May be fixed by #2433
Open

Blocking XGroupCreateMkStream does not interrupt on context cancellation #2276

jgirtakovskis opened this issue Nov 5, 2022 · 18 comments · May be fixed by #2433

Comments

@jgirtakovskis
Copy link

When XGroupCreateMkStream is called in blocking mode (Block = 0), call does not get interrupted by cancelling context.

Expected Behavior

Blocking function interrupts when context is cancelled

Current Behavior

Function continues to block after context cancellation

Possible Solution

Unsure yet

Steps to Reproduce

package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"github.com/go-redis/redis/v9"
	"github.com/google/uuid"
)

func main() {
	rdb := redis.NewUniversalClient(&redis.UniversalOptions{
		Addrs:    []string{"localhost:6379"},
		Password: "", // no password set
		DB:       0,  // use default DB
	})

	defer rdb.Close()

	ctx, cancelFn := context.WithCancel(context.Background())

	go func() {
		for idx := 0; idx < 5; idx++ {
			fmt.Printf("Waiting %v...\n", idx)
			time.Sleep(time.Second)
		}
		cancelFn()
		fmt.Printf("Cancelled context and now expect blocking XGroupCreateMkStream to be interrupted...\n")
	}()

	name := "blag"
	streamName := name
	groupName := name + "-blah"

	_, err := rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Result()
	fmt.Printf("%v\n", err)

	var wg sync.WaitGroup

	wg.Add(1)
	go func() {
		defer wg.Done()
		objs, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
			Group:    groupName,
			Consumer: uuid.NewString(),
			Streams:  []string{streamName, ">"},
			Count:    100,
			Block:    0,
		}).Result()
		fmt.Printf("%v, %v\n", err, objs)
	}()

	wg.Wait()
	fmt.Printf("Done.\n")
}

Context (Environment)

I have two goroutines concurrently performing XREADGROUP and XADD in blocking mode. XADD is triggered by external events and is not guaranteed to add items to the stream at any particular cadence or pattern. Shutting down reading goroutine is not possible due to the blocking call that does not get interrupted by context concellation.

Detailed Description

Blocking calls should interrupt when context is cancelled and connection closed.

Possible Implementation

N/A

@berndverst
Copy link

Dapr maintainer here. I also need this fixed.

@Alger7w
Copy link

Alger7w commented Dec 7, 2022

I have same problem too. version: v9.0.0-rc.2

@latolukasz
Copy link

The same problem, quite a bit blocker for me:(

@brettmorien
Copy link

brettmorien commented Feb 9, 2023

We've built a workaround for this using samber/lo, but agree that this should really be handled by the library level.

❗❗ Turns out this leaks a lot of goroutines and shouldn't be used. ❗❗

var cmd *redis.XStreamSliceCmd

select {
case cmd = <-lo.Async(func() *redis.XStreamSliceCmd {
	return c.Client.XReadGroup(ctx, &redis.XReadGroupArgs{
		Group:    "group",
		Consumer: "consumerID",
		Streams:  []string{"stream", ">"},
		Count:    1,
	})
}):
case <-ctx.Done():
	return ctx.Err()
}

streams, err := cmd.Result()

@armsnyder
Copy link
Contributor

Note that the workaround above will leak goroutines if you run the code repeatedly, so it's really only viable for handling app shutdown.

@armsnyder
Copy link
Contributor

I was able to contribute test cases in #2432. However I'm less confident in providing a fix. The code would need to safely dispose of the connection if the context is canceled, such as removing it from the connection pool. It would also need to not interfere with the expected behavior of ContextTimeoutEnabled (#2243).

@monkey92t
Copy link
Collaborator

This feels complicated, but net.Conn is hard to be controlled by ctx.
net.Conn uses Deadline instead of ctx...

var conn net.Conn // go-redis pool.Conn
ctx := context.Background()

processBlockCmd := func() <-chan *redis.XStreamSliceCmd {
	ch := make(chan *redis.XStreamSliceCmd)
	go func() {
		cmd := &redis.XStreamSliceCmd{}
		// write...
		if _, err := conn.Read(nil); err != nil {
			// check conn timeout?
			if err.Error() == "i/o timeout" && errors.Is(ctx.Err(), context.Canceled) {
				cmd.SetErr(err)
			}
		}
		ch <- cmd
		close(ch)
	}()
	return ch
}


select {
case cmd := <-processBlockCmd():
	return cmd
case <-ctx.Done():
	conn.SetDeadline(time.Now())
	return <-processBlockCmd()
}

@armsnyder armsnyder linked a pull request Feb 10, 2023 that will close this issue
@armsnyder
Copy link
Contributor

armsnyder commented Feb 10, 2023

@monkey92t Right, deadlines on net.Conn are best used when you know the timeout ahead of time. The code you shared makes sense, and it is very similar to the change I just proposed in #2433 at a library level. I believe this belongs in the library since I would prefer the redis client to manage the connection for me. If users would prefer not to cancel redis commands with a context, then they can pass context.Background() to redis commands.

EDIT: One change between your code and mine is that you used SetDeadline whereas I closed the connection. Is there a meaningful difference there?

@monkey92t
Copy link
Collaborator

@armsnyder We still need to think more. If goroutine is used every time a command is executed, it will have an impact on performance. I haven't thought of a good solution yet.

@armsnyder
Copy link
Contributor

Here's a benchstat comparing master with my PR.

https://gist.github.com/armsnyder/40aca6ea480bf53434d1e41c663e1550

We could optimize by running a goroutine per connection rather than per command. The connection goroutine would handle all I/O, with the command communicating to it over a channel.

@brettmorien
Copy link

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

@monkey92t
Copy link
Collaborator

Hi folks (@monkey92t)!

I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (Constantly Reestablishing Connections to AWS ElastiCache Redis in Cluster Mode (Continued) #2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

@berndverst
Copy link

Hi folks (@monkey92t)!
I'm checking in to see if this is still on the radar. Not being able to cancel out of a blocking call is a deal breaker for gracefully shutting down our apps. We are currently operating on a July 2022 beta of this library until we can upgrade to a properly working release version.

Thank you for your attention....I've done related tests and it's hard to choose:

  1. Adding the chan+goroutine method (such as @armsnyder's example), we will pay a huge cost for each command executed.
  2. When <-ctx.Done(), we will close a network connection, because we can't trust the state of this connection, which will cause a chain reaction (Constantly Reestablishing Connections to AWS ElastiCache Redis in Cluster Mode (Continued) #2046)

No matter how we do it, it will cause a lot of side effects because of context. I haven't found a better solution. A similar approach is also used in the net(*netFD) package.

I'm trying more solutions and benchmark tests, such as letting users choose whether to pay for listening to ctx, like #2243.

With respect to (1) I am concerned about goroutines being leaked, or GC overhead. This would be a deal breaker for our use in Dapr (Distributed Application Runtime - github.com/dapr/dapr). We are very performance conscious as our project runs on a variety of targets included embedded systems.

I can't speak in favor of (2), but my vote is against (1).

@monkey92t
Copy link
Collaborator

@berndverst What do you think of #2455

@berndverst
Copy link

@berndverst What do you think of #2455

Let me loop in one of my co-maintainers - @ItalyPaleAle thoughts on #2455 for addressing the issue discussed here?

@brettmorien
Copy link

Hi folks. Any movement on this issue?

@wk8
Copy link

wk8 commented Apr 27, 2023

This is a locker for us, could you please look at #2455? Thank you!

@kkkbird
Copy link
Contributor

kkkbird commented May 18, 2023

really need this fix, any update?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

9 participants