Skip to content

Commit

Permalink
Split compat tests into multiple go tests, add missing tests
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Aug 22, 2023
1 parent 11c1640 commit 3bf15e1
Showing 1 changed file with 217 additions and 33 deletions.
250 changes: 217 additions & 33 deletions test/compat_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
//go:build compat
// +build compat

package test

import (
"bytes"
"crypto/sha256"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strings"
"testing"
"time"

Expand All @@ -18,74 +21,148 @@ type objectStepConfig[T any] struct {
Command string `json:"command"`
URL string `json:"url"`
Bucket string `json:"bucket"`
Object string `json:"object"`
Config T `json:"config"`
}

func TestObjectStoreCompatibility(t *testing.T) {
nc, err := nats.Connect("demo.nats.io")
if err != nil {
t.Fatalf("Error connecting to NATS: %v", err)
}
func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object_store.>")
sub, err := nc.SubscribeSync("tests.object-store.default-bucket.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

// 1. Create default bucket
init, err := sub.NextMsg(5 * time.Second)
msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}

js, err := nc.JetStream()
if err != nil {
t.Fatalf("Error getting JetStream context: %v", err)
}
os, err := js.CreateObjectStore(&nats.ObjectStoreConfig{
_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "test",
})
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
// send empty response to indicate client is done
if err := init.Respond(nil); err != nil {
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateStepResult(t, sub)
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreCustomBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
defer nc.Close()

// 2. Create bucket with custom config
custom, err := sub.NextMsg(5 * time.Second)
// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.custom-bucket.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

// 1. Create custom bucket
msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
var cfg objectStepConfig[*nats.ObjectStoreConfig]
if err := json.Unmarshal(custom.Data, &cfg); err != nil {
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}

os, err = js.CreateObjectStore(cfg.Config)
_, err = js.CreateObjectStore(cfg.Config)
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
if err := custom.Respond(nil); err != nil {
// send empty response to indicate client is done
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateStepResult(t, sub)
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreGetObject(t *testing.T) {
t.Parallel()
type config struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}

nc, js := connect(t)
defer nc.Close()

// 3. Put object
objReq, err := sub.NextMsg(5 * time.Second)
// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.get-object.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
var cfg config
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
// Get object
os, err := js.ObjectStore(cfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
obj, err := os.Get(cfg.Object)
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
data, err := io.ReadAll(obj)
if err != nil {
t.Fatalf("Error reading object: %v", err)
}

// calculate sha256 of the object
h := sha256.New()
h.Write(data)
sha := h.Sum(nil)

// send response to indicate client is done
if err := msg.Respond(sha); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStorePutObject(t *testing.T) {
t.Parallel()

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.put-object.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Put object
var putObjectCfg objectStepConfig[*nats.ObjectMeta]
if err := json.Unmarshal(objReq.Data, &putObjectCfg); err != nil {
if err := json.Unmarshal(msg.Data, &putObjectCfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err = js.ObjectStore(putObjectCfg.Bucket)
os, err := js.ObjectStore(putObjectCfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
Expand All @@ -94,23 +171,130 @@ func TestObjectStoreCompatibility(t *testing.T) {
if err != nil {
t.Fatalf("Error getting content: %v", err)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading content: %v", err)
}
defer resp.Body.Close()
if _, err := os.Put(putObjectCfg.Config, resp.Body); err != nil {
if _, err := os.Put(putObjectCfg.Config, bytes.NewBuffer(data)); err != nil {
t.Fatalf("Error putting object: %v", err)
}
if err := objReq.Respond(nil); err != nil {
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateStepResult(t, sub)
validateTestResult(t, sub)
}

func validateStepResult(t *testing.T, sub *nats.Subscription) {
func TestCompatibilityObjectStoreUpdateMetadata(t *testing.T) {
t.Parallel()

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.update-metadata.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Update object metadata
var putObjectCfg objectStepConfig[*nats.ObjectMeta]
if err := json.Unmarshal(msg.Data, &putObjectCfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err := js.ObjectStore(putObjectCfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
if err := os.UpdateMeta(putObjectCfg.Object, putObjectCfg.Config); err != nil {
t.Fatalf("Error putting object: %v", err)
}
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreWatch(t *testing.T) {
t.Skip("Skipping test until watch behavior is sorted out in compatibility-tests")
t.Parallel()

type config struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.watch.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Watch object
var cfg config
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err := js.ObjectStore(cfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
time.Sleep(5 * time.Second)
watcher, err := os.Watch()
if err != nil {
t.Fatalf("Error getting watcher: %v", err)
}
var info *nats.ObjectInfo
select {
case info = <-watcher.Updates():
fmt.Println(info.Digest)
case <-time.After(30 * time.Second):
t.Fatalf("Timeout waiting for object update")
}

if err := msg.Respond([]byte(info.Digest)); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func validateTestResult(t *testing.T, sub *nats.Subscription) {
t.Helper()
stepEnd, err := sub.NextMsg(5 * time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if len(stepEnd.Header["STATUS"]) > 0 {
t.Fatalf("Test step failed: %v", stepEnd.Header["STATUS"][0])
if strings.Contains(string(stepEnd.Subject), "fail") {
t.Fatalf("Test step failed: %v", string(stepEnd.Subject))
}
}

func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) {
t.Helper()
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL
}
nc, err := nats.Connect(natsURL, nats.Timeout(1*time.Hour))
if err != nil {
t.Fatalf("Error connecting to NATS: %v", err)
}
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Error getting JetStream context: %v", err)
}
return nc, js
}

0 comments on commit 3bf15e1

Please sign in to comment.