Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Object store compatibility tests #1373

Merged
merged 11 commits into from
Aug 28, 2023
14 changes: 7 additions & 7 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,13 @@ var (

// ObjectStoreConfig is the config for the object store.
type ObjectStoreConfig struct {
Bucket string
Description string
TTL time.Duration
MaxBytes int64
Storage StorageType
Replicas int
Placement *Placement
Bucket string `json:"bucket"`
Description string `json:"description,omitempty"`
TTL time.Duration `json:"max_age,omitempty"`
MaxBytes int64 `json:"max_bytes,omitempty"`
Storage StorageType `json:"storage,omitempty"`
Replicas int `json:"num_replicas,omitempty"`
Placement *Placement `json:"placement,omitempty"`
}

type ObjectStoreStatus interface {
Expand Down
300 changes: 300 additions & 0 deletions test/compat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,300 @@
//go:build compat
// +build compat

package test

import (
"bytes"
"crypto/sha256"
"encoding/json"
"io"
"net/http"
"os"
"strings"
"testing"
"time"

"github.com/nats-io/nats.go"
)

type objectStepConfig[T any] struct {
Suite string `json:"suite"`
Test string `json:"test"`
Command string `json:"command"`
URL string `json:"url"`
Bucket string `json:"bucket"`
Object string `json:"object"`
Config T `json:"config"`
}

func TestCompatibilityObjectStoreDefaultBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.default-bucket.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

// 1. Create default bucket
msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}

_, err = js.CreateObjectStore(&nats.ObjectStoreConfig{
Bucket: "test",
})
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
// send empty response to indicate client is done
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreCustomBucket(t *testing.T) {
t.Parallel()
nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.custom-bucket.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

// 1. Create custom bucket
msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
var cfg objectStepConfig[*nats.ObjectStoreConfig]
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}

_, err = js.CreateObjectStore(cfg.Config)
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
// send empty response to indicate client is done
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}

validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreGetObject(t *testing.T) {
t.Parallel()
type config struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.get-object.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
var cfg config
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
// Get object
os, err := js.ObjectStore(cfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
obj, err := os.Get(cfg.Object)
if err != nil {
t.Fatalf("Error creating object store: %v", err)
}
data, err := io.ReadAll(obj)
if err != nil {
t.Fatalf("Error reading object: %v", err)
}

// calculate sha256 of the object
h := sha256.New()
h.Write(data)
sha := h.Sum(nil)

// send response to indicate client is done
if err := msg.Respond(sha); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStorePutObject(t *testing.T) {
t.Parallel()

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.put-object.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Put object
var putObjectCfg objectStepConfig[*nats.ObjectMeta]
if err := json.Unmarshal(msg.Data, &putObjectCfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err := js.ObjectStore(putObjectCfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
client := http.Client{Timeout: 10 * time.Second, Transport: &http.Transport{DisableKeepAlives: true}}
resp, err := client.Get(putObjectCfg.URL)
if err != nil {
t.Fatalf("Error getting content: %v", err)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Error reading content: %v", err)
}
defer resp.Body.Close()
if _, err := os.Put(putObjectCfg.Config, bytes.NewBuffer(data)); err != nil {
t.Fatalf("Error putting object: %v", err)
}
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreUpdateMetadata(t *testing.T) {
t.Parallel()

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.update-metadata.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Update object metadata
var putObjectCfg objectStepConfig[*nats.ObjectMeta]
if err := json.Unmarshal(msg.Data, &putObjectCfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err := js.ObjectStore(putObjectCfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
if err := os.UpdateMeta(putObjectCfg.Object, putObjectCfg.Config); err != nil {
t.Fatalf("Error putting object: %v", err)
}
if err := msg.Respond(nil); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func TestCompatibilityObjectStoreWatchUpdates(t *testing.T) {
t.Parallel()

type config struct {
Bucket string `json:"bucket"`
Object string `json:"object"`
}

nc, js := connect(t)
defer nc.Close()

// setup subscription on which tester will be sending requests
sub, err := nc.SubscribeSync("tests.object-store.watch-updates.>")
if err != nil {
t.Fatalf("Error subscribing to test subject: %v", err)
}
defer sub.Unsubscribe()

msg, err := sub.NextMsg(1 * time.Hour)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
// Watch object
var cfg config
if err := json.Unmarshal(msg.Data, &cfg); err != nil {
t.Fatalf("Error unmarshalling message: %v", err)
}
os, err := js.ObjectStore(cfg.Bucket)
if err != nil {
t.Fatalf("Error getting object store: %v", err)
}
watcher, err := os.Watch(nats.UpdatesOnly())
if err != nil {
t.Fatalf("Error getting watcher: %v", err)
}
var info *nats.ObjectInfo
select {
case info = <-watcher.Updates():
case <-time.After(30 * time.Second):
t.Fatalf("Timeout waiting for object update")
}

if err := msg.Respond([]byte(info.Digest)); err != nil {
t.Fatalf("Error responding to message: %v", err)
}
validateTestResult(t, sub)
}

func validateTestResult(t *testing.T, sub *nats.Subscription) {
t.Helper()
stepEnd, err := sub.NextMsg(5 * time.Second)
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if strings.Contains(string(stepEnd.Subject), "fail") {
t.Fatalf("Test step failed: %v", string(stepEnd.Subject))
}
}

func connect(t *testing.T) (*nats.Conn, nats.JetStreamContext) {
t.Helper()
natsURL := os.Getenv("NATS_URL")
if natsURL == "" {
natsURL = nats.DefaultURL
}
nc, err := nats.Connect(natsURL, nats.Timeout(1*time.Hour))
if err != nil {
t.Fatalf("Error connecting to NATS: %v", err)
}
js, err := nc.JetStream()
if err != nil {
t.Fatalf("Error getting JetStream context: %v", err)
}
return nc, js
}
7 changes: 7 additions & 0 deletions test/configs/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM golang:1.20
WORKDIR /usr/src/nats.go
COPY . /usr/src/nats.go
RUN go mod tidy -modfile go_test.mod
RUN go test -run TestNone -modfile go_test.mod -tags compat ./test/...
ENV NATS_URL=localhost:4222
CMD go test -v -run TestCompatibility -modfile go_test.mod -tags compat ./test/... -count 1 -parallel 10