Skip to content

Commit

Permalink
[FIX] ignore Nats-Expected headers when process Inbound Source Msg
Browse files Browse the repository at this point in the history
  • Loading branch information
ramonberrutti committed Mar 28, 2024
1 parent 76c4cc0 commit 6a5f1b7
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 0 deletions.
25 changes: 25 additions & 0 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4131,6 +4131,31 @@ func removeHeaderIfPresent(hdr []byte, key string) []byte {
return hdr
}

func removeHeaderIfPrefixPresent(hdr []byte, prefix string) []byte {
var index int
for {
start := bytes.Index(hdr[index:], []byte(prefix))
if start < 0 {
break
}
index += start
if index < 1 || hdr[index-1] != '\n' {
return hdr
}

end := bytes.Index(hdr[index+len(prefix):], []byte(_CRLF_))
if end < 0 {
return hdr
}

hdr = append(hdr[:index], hdr[index+end+len(prefix)+len(_CRLF_):]...)
if len(hdr) <= len(emptyHdrLine) {
return nil
}
}
return hdr
}

// Generate a new header based on optional original header and key value.
// More used in JetStream layers.
func genHeader(hdr []byte, key, value string) []byte {
Expand Down
18 changes: 18 additions & 0 deletions server/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2944,3 +2944,21 @@ func TestTLSClientHandshakeFirstAndInProcessConnection(t *testing.T) {
t.Fatal("Should have not got an error retrieving TLS connection state")
}
}

func TestRemoveHeaderIfPrefixPresent(t *testing.T) {
hdr := []byte("NATS/1.0\r\n\r\n")

hdr = genHeader(hdr, "a", "1")
hdr = genHeader(hdr, JSExpectedStream, "my-stream")
hdr = genHeader(hdr, JSExpectedLastSeq, "22")
hdr = genHeader(hdr, "b", "2")
hdr = genHeader(hdr, JSExpectedLastSubjSeq, "24")
hdr = genHeader(hdr, JSExpectedLastMsgId, "1")
hdr = genHeader(hdr, "c", "3")

hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-")

if !bytes.Equal(hdr, []byte("NATS/1.0\r\na: 1\r\nb: 2\r\nc: 3\r\n\r\n")) {
t.Fatalf("Expected headers to be stripped, got %q", hdr)
}
}
54 changes: 54 additions & 0 deletions server/jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11968,6 +11968,60 @@ func TestJetStreamSourceWorkingQueueWithLimit(t *testing.T) {
}
}

func TestJetStreamStreamSourceFromKV(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

// Client for API reuqests.
nc, js := jsClientConnect(t, s)
defer nc.Close()

// Create a kv store
kv, err := js.CreateKeyValue(&nats.KeyValueConfig{Bucket: "test"})
require_NoError(t, err)

// Create a stream with a source from the kv store
_, err = js.AddStream(&nats.StreamConfig{Name: "test", Retention: nats.InterestPolicy, Sources: []*nats.StreamSource{{Name: "KV_" + kv.Bucket()}}})
require_NoError(t, err)

// Create a interested consumer
_, err = js.AddConsumer("test", &nats.ConsumerConfig{Durable: "durable", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

ss, err := js.PullSubscribe("", "", nats.Bind("test", "durable"))
require_NoError(t, err)

rev1, err := kv.Create("key", []byte("value1"))
require_NoError(t, err)

m, err := ss.Fetch(1, nats.MaxWait(500*time.Millisecond))
require_NoError(t, err)
require_NoError(t, m[0].Ack())
if string(m[0].Data) != "value1" {
t.Fatalf("Expected value1, got %s", m[0].Data)
}

rev2, err := kv.Update("key", []byte("value2"), rev1)
require_NoError(t, err)

_, err = kv.Update("key", []byte("value3"), rev2)
require_NoError(t, err)

m, err = ss.Fetch(1, nats.MaxWait(500*time.Millisecond))
require_NoError(t, err)
require_NoError(t, m[0].Ack())
if string(m[0].Data) != "value2" {
t.Fatalf("Expected value2, got %s", m[0].Data)
}

m, err = ss.Fetch(1, nats.MaxWait(500*time.Millisecond))
require_NoError(t, err)
require_NoError(t, m[0].Ack())
if string(m[0].Data) != "value3" {
t.Fatalf("Expected value3, got %s", m[0].Data)
}
}

func TestJetStreamInputTransform(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()
Expand Down
1 change: 1 addition & 0 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3236,6 +3236,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// If we are daisy chained here make sure to remove the original one.
if len(hdr) > 0 {
hdr = removeHeaderIfPresent(hdr, JSStreamSource)
hdr = removeHeaderIfPrefixPresent(hdr, "Nats-Expected-")
}
// Hold onto the origin reply which has all the metadata.
hdr = genHeader(hdr, JSStreamSource, si.genSourceHeader(m.rply))
Expand Down

0 comments on commit 6a5f1b7

Please sign in to comment.