Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New jetstream interface, consumer.Info() returns error consumer instance not yet created #1305

Closed
tpihl opened this issue Jun 8, 2023 · 8 comments
Assignees
Labels
bug Confirmed reproducible bug

Comments

@tpihl
Copy link

tpihl commented Jun 8, 2023

Defect

There is no way to know when the consumer actually is created to successfully get info from it. Regardless, calling Info(ctx) on the returned consumer should wait until the consumer is created and then return the info. Returning the error as currently may be factually correct but not useful. An error that it was deleted, thats fine. But the nil error and returned consumer from the OrderedConsumer cannot be used must be a bug.

Versions of nats.go and the nats-server if one was involved:

nats.go v1.26.0
nats-server v2.9.17

OS/Container environment:

linux

Steps or code to reproduce the issue:

con, err := str.OrderedConsumer(
	ctx,
	jetstream.OrderedConsumerConfig{
		FilterSubjects: []string{
			"a.b.1",
		},
	},
)
if err != nil {
	panic(err)
}

ci, err := con.Info(otherctx)
if err != nil {
	panic(err)
}

the call con.Info() returned error;
nats: consumer instance not yet created

Expected result:

Information about the consumer and a nil errror

Actual result:

No information about consumer and an error that the consumer returned by the previous call cannot be used and no way except iterative re-try to continue (and iterative retry is a mess for readable code)

@tpihl tpihl added the bug Confirmed reproducible bug label Jun 8, 2023
@piotrpio
Copy link
Collaborator

This issue should only be present when using OrderedConsumer, for which the library is internally managing underlying consumers. Error on Info() is returned because we're lazily creating ordered consumer when needed - e.g. when you call Consume(). After giving it a thought, it may be a bad approach and I'll fix it to create the consumer as soon as you call stream.OrderedConsumer().

Aside from this, what info from ordered consumer do you need? Bear in mind that as ordered consumers are managed by the library, while you certainly can ask for info about the currently existing consumer, that consumer instance may be removed at any point and re-created with different name. For some use cases, getting msg.Metadata() can be used to get required data when fetching messages (this operation is "free", i.e. does not require any additional API calls).

@tpihl
Copy link
Author

tpihl commented Jun 14, 2023

@piotrpio thank you!

I think it's quite okay with a lazy creation if the first call to using the new consumer consumer can block until the creation is done and the answer can be given. I assume that it (at least logically) if an iterator or .Consume was called before the consumer where ready. So maybe that is also an option to gain a fast return on the create request

Actually the answer to your question is lazy load ;)

The Metadata call cannot tell me how many messages existed (qualified by the filters) when we started since they will show a moving target. And if the inflow (for some reason) match the speed we can consume, we will, for a period, not reach 0 pending in the .Metadata() info.

I haven't tested what is most effective, maybe I need to redesign to create a normal consumer, ask how many messages i already have pending, fetch them in batch, process them and then start a new ordered consumer. But my gut feeling is that ordered consumer will be just as fast and with less complexity and fewer resources created and deleted. So getting this "number of messages to consider loaded" before processing felt the easiest way to do it.

I could grab a number that is probably as correct from the metadata of the first message, it just messes up the code . At the moment it's reasonably readable.

  1. define the relevant filter(s)
  2. create the consumer
  3. get number of msgs until "loaded"
  4. start consuming, decrease number of msgs to load for each msgs until 0, then signal load
  5. keep consuming...

Setting the number of msgs to loaded after first msg breaks the flow of my code with some code that is only relevant for the first of maybe many msgs to process.
But there is a trickier corner-case that adds even more code. How to handle if there is not messaging (relevant to the filter). In that case I must, but only for the first msgs, try to figure out that i should signal loaded. And it will delay that signal.
I could do a GetLastBySubject (sorry for mangled name, didn't look up the correct name and may be in old interface) to find the last msg seqno and instead of counting down look for that source-seq and signal loaded when processed. And signal if we didnt get a message. But I assumed that also this will use more resourses in the end that my attempt to query the consumer when it was ready.

Hope it makes sense. I understand that it is an extra roundtrip and if the ambition with the new simpler interface should be simpler to implement instead of simpler to use, i withdraw my opinion.

@Jarema
Copy link
Member

Jarema commented Jun 14, 2023

Thanks for the explanation!

We changed the code to not return until consumer is created.
This is more sensible and solves your issue too.

When it comes to performance, Ordered Consumer is the fasted one.
You should not need to tweak much - it does optimise throughput and usage of resources under the hood.
I would stick to one consumer, work with it, count messages from first info and handle your logic in that way.

New API goal was to make simple things simple, and hard, possible with clean and nice API. Without magic under the hood that was confusing (like the Subscribe method that did a lot, including creating and deleting consumers). This explicit code sometimes means a line or two more, but without any neckbreaking unexpected behaviors. Your early feedback is invaluable, thank you for that!

Hope that clears things out.

Ah, and yes, Info is pretty resource heavy on the server side, hence @piotrpio suggestion to use message metadata, but now knowing your reason for wanting it, we can suggest using CachedInfo, which will give you the initial info we got from the server while creating the consumer, so it's free too.

@tpihl
Copy link
Author

tpihl commented Jun 15, 2023

Tried with CachedInfo(), works fine. Maybe some updates to the doc+

@tpihl tpihl closed this as completed Jun 15, 2023
@tpihl
Copy link
Author

tpihl commented Jun 27, 2023

This is not working (testing with nats.go ver v1.27.1) @Jarema

@piotrpio
Copy link
Collaborator

@tpihl could you elaborate? Which part is not working, are you trying with CachedInfo() or Info()?

@tpihl
Copy link
Author

tpihl commented Jun 29, 2023

Hi @piotrpio

I tried both, CachedInfo() panics and Info() returns the error.

go.mod references

github.com/nats-io/nats.go v1.27.1

Here is the full Test including output ;

func Test_VerifyInfoForOrderedConsumer(t *testing.T) {

	nc, err := nats.Connect(RedactedNatsURL,
		nats.UserInfo(RedactedNatsUser, RedactedNatsPassword),
	)
	if err != nil {
		t.Fatal(err)
	}

	js, err := jetstream.New(nc)
	if err != nil {
		t.Fatal(err)
	}

	str, err := js.Stream(context.TODO(), RedactedStreamName)
	if err != nil {
		t.Fatal(err)
	}
	con, err := str.OrderedConsumer(
		context.TODO(),
		jetstream.OrderedConsumerConfig{
			FilterSubjects: []string{
				RedactedFilter,
			},
		},
	)
	if err != nil {
		t.Fatal(err)
	}

	ci, err := con.Info(context.TODO())
	if err != nil {
		t.Error(err) 
	} else {
		t.Log(ci.NumPending)
		return
	}

	ci = con.CachedInfo()  // this is the statement that throws the panic
	t.Log(ci.NumPending)
}

/*	Result after executing go test -v for this test only

=== RUN   Test_VerifyInfoForOrderedConsumer
    x_test.go:217: nats: consumer instance not yet created
--- FAIL: Test_VerifyInfoForOrderedConsumer (0.00s)
panic: runtime error: invalid memory address or nil pointer dereference [recovered]
... (the rest of the output is irrelevant)

*/

@piotrpio
Copy link
Collaborator

While that was surprising at first, the reason is pretty simple. #1317 was not part of release v1.27.1! So you'll have to be a bit more patient with this, the next release is coming very soon. You can run your test on against main just to be sure it's working fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Confirmed reproducible bug
Projects
None yet
Development

No branches or pull requests

3 participants