diff --git a/deps/github.com/arangodb/go-driver/Makefile b/deps/github.com/arangodb/go-driver/Makefile index e82cbf987..b8f6f00d2 100644 --- a/deps/github.com/arangodb/go-driver/Makefile +++ b/deps/github.com/arangodb/go-driver/Makefile @@ -58,13 +58,9 @@ else ifeq ("$(TEST_AUTH)", "jwt") ARANGOARGS := --server.jwt-secret=/jwtsecret endif -ifeq ("$(TEST_MODE)", "single") - TEST_NET := container:$(DBCONTAINER) - TEST_ENDPOINTS := http://localhost:8529 -else - TEST_NET := container:$(TESTCONTAINER)-ns - TEST_ENDPOINTS := http://localhost:7001 - TESTS := $(REPOPATH)/test +TEST_NET := container:$(TESTCONTAINER)-ns +TEST_ENDPOINTS := http://localhost:7001 +TESTS := $(REPOPATH)/test ifeq ("$(TEST_AUTH)", "rootpw") CLUSTERENV := JWTSECRET=testing TEST_AUTHENTICATION := basic:root: @@ -77,7 +73,6 @@ ifeq ("$(TEST_SSL)", "auto") CLUSTERENV := SSL=auto $(CLUSTERENV) TEST_ENDPOINTS = https://localhost:7001 endif -endif ifeq ("$(TEST_CONNECTION)", "vst") TESTS := $(REPOPATH)/test @@ -133,13 +128,13 @@ run-tests-http: $(GOBUILDDIR) # Single server tests run-tests-single: run-tests-single-json run-tests-single-vpack run-tests-single-vst-1.0 $(VST11_SINGLE_TESTS) -run-tests-single-json: run-tests-single-json-with-auth run-tests-single-json-no-auth +run-tests-single-json: run-tests-single-json-with-auth run-tests-single-json-no-auth run-tests-single-json-ssl -run-tests-single-vpack: run-tests-single-vpack-with-auth run-tests-single-vpack-no-auth +run-tests-single-vpack: run-tests-single-vpack-with-auth run-tests-single-vpack-no-auth run-tests-single-vpack-ssl -run-tests-single-vst-1.0: run-tests-single-vst-1.0-with-auth run-tests-single-vst-1.0-no-auth +run-tests-single-vst-1.0: run-tests-single-vst-1.0-with-auth run-tests-single-vst-1.0-no-auth run-tests-single-vst-1.0-ssl -run-tests-single-vst-1.1: run-tests-single-vst-1.1-with-auth run-tests-single-vst-1.1-jwt-auth run-tests-single-vst-1.1-no-auth +run-tests-single-vst-1.1: run-tests-single-vst-1.1-with-auth run-tests-single-vst-1.1-jwt-auth run-tests-single-vst-1.1-no-auth run-tests-single-vst-1.1-ssl run-tests-single-vst-1.1-jwt-ssl run-tests-single-json-no-auth: @echo "Single server, HTTP+JSON, no authentication" @@ -177,6 +172,26 @@ run-tests-single-vst-1.1-jwt-auth: @echo "Single server, Velocystream 1.1, JWT authentication" @${MAKE} TEST_MODE="single" TEST_AUTH="jwt" TEST_CONNECTION="vst" TEST_CVERSION="1.1" __run_tests +run-tests-single-json-ssl: + @echo "Single server, HTTP+JSON, with authentication, SSL" + @${MAKE} TEST_MODE="single" TEST_AUTH="rootpw" TEST_SSL="auto" TEST_CONTENT_TYPE="json" __run_tests + +run-tests-single-vpack-ssl: + @echo "Single server, HTTP+Velocypack, with authentication, SSL" + @${MAKE} TEST_MODE="single" TEST_AUTH="rootpw" TEST_SSL="auto" TEST_CONTENT_TYPE="vpack" __run_tests + +run-tests-single-vst-1.0-ssl: + @echo "Single server, Velocystream 1.0, with authentication, SSL" + @${MAKE} TEST_MODE="single" TEST_AUTH="rootpw" TEST_SSL="auto" TEST_CONNECTION="vst" TEST_CVERSION="1.0" __run_tests + +run-tests-single-vst-1.1-ssl: + @echo "Single server, Velocystream 1.1, with authentication, SSL" + @${MAKE} TEST_MODE="single" TEST_AUTH="rootpw" TEST_SSL="auto" TEST_CONNECTION="vst" TEST_CVERSION="1.1" __run_tests + +run-tests-single-vst-1.1-jwt-ssl: + @echo "Single server, Velocystream 1.1, JWT authentication, SSL" + @${MAKE} TEST_MODE="single" TEST_AUTH="jwt" TEST_SSL="auto" TEST_CONNECTION="vst" TEST_CVERSION="1.1" __run_tests + # ResilientSingle server tests run-tests-resilientsingle: run-tests-resilientsingle-json run-tests-resilientsingle-vpack run-tests-resilientsingle-vst-1.0 $(VST11_RESILIENTSINGLE_TESTS) @@ -310,25 +325,14 @@ else ifdef JWTSECRET echo "$JWTSECRET" > "${JWTSECRETFILE}" endif -ifeq ("$(TEST_MODE)", "single") - @-docker rm -f -v $(DBCONTAINER) $(TESTCONTAINER) &> /dev/null - docker run -d --name $(DBCONTAINER) \ - $(ARANGOENV) $(ARANGOVOL) \ - $(ARANGODB) --log.level requests=debug --log.use-microtime true $(ARANGOARGS) -else @-docker rm -f -v $(TESTCONTAINER) &> /dev/null @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) TMPDIR=${GOBUILDDIR} $(CLUSTERENV) $(ROOTDIR)/test/cluster.sh start endif -endif __test_cleanup: @docker rm -f -v $(TESTCONTAINER) &> /dev/null ifndef TEST_ENDPOINTS_OVERRIDE -ifeq ("$(TEST_MODE)", "single") - @docker rm -f -v $(DBCONTAINER) &> /dev/null -else @TESTCONTAINER=$(TESTCONTAINER) ARANGODB=$(ARANGODB) STARTER=$(STARTER) STARTERMODE=$(TEST_MODE) $(ROOTDIR)/test/cluster.sh cleanup -endif endif @sleep 3 diff --git a/deps/github.com/arangodb/go-driver/cluster_impl.go b/deps/github.com/arangodb/go-driver/cluster_impl.go index 4f3d3b75f..0ddd307f0 100644 --- a/deps/github.com/arangodb/go-driver/cluster_impl.go +++ b/deps/github.com/arangodb/go-driver/cluster_impl.go @@ -123,6 +123,10 @@ type cleanOutServerRequest struct { Server string `json:"server"` } +type cleanOutServerResponse struct { + JobID string `json:"id"` +} + // CleanOutServer triggers activities to clean out a DBServers. func (c *cluster) CleanOutServer(ctx context.Context, serverID string) error { req, err := c.conn.NewRequest("POST", "_admin/cluster/cleanOutServer") @@ -135,7 +139,7 @@ func (c *cluster) CleanOutServer(ctx context.Context, serverID string) error { if _, err := req.SetBody(input); err != nil { return WithStack(err) } - applyContextSettings(ctx, req) + cs := applyContextSettings(ctx, req) resp, err := c.conn.Do(ctx, req) if err != nil { return WithStack(err) @@ -143,6 +147,13 @@ func (c *cluster) CleanOutServer(ctx context.Context, serverID string) error { if err := resp.CheckStatus(200, 202); err != nil { return WithStack(err) } + var result cleanOutServerResponse + if err := resp.ParseBody("", &result); err != nil { + return WithStack(err) + } + if cs.JobIDResponse != nil { + *cs.JobIDResponse = result.JobID + } return nil } diff --git a/deps/github.com/arangodb/go-driver/context.go b/deps/github.com/arangodb/go-driver/context.go index a47815dba..170e2d5dd 100644 --- a/deps/github.com/arangodb/go-driver/context.go +++ b/deps/github.com/arangodb/go-driver/context.go @@ -56,6 +56,7 @@ const ( keyFollowLeaderRedirect ContextKey = "arangodb-followLeaderRedirect" keyDBServerID ContextKey = "arangodb-dbserverID" keyBatchID ContextKey = "arangodb-batchID" + keyJobIDResponse ContextKey = "arangodb-jobIDResponse" ) // WithRevision is used to configure a context to make document @@ -213,6 +214,13 @@ func WithBatchID(parent context.Context, id string) context.Context { return context.WithValue(contextOrBackground(parent), keyBatchID, id) } +// WithJobIDResponse is used to configure a context that includes a reference to a JobID +// that is filled on a error-free response. +// This is used in cluster functions. +func WithJobIDResponse(parent context.Context, jobID *string) context.Context { + return context.WithValue(contextOrBackground(parent), keyJobIDResponse, jobID) +} + type contextSettings struct { Silent bool WaitForSync bool @@ -229,6 +237,7 @@ type contextSettings struct { FollowLeaderRedirect *bool DBServerID string BatchID string + JobIDResponse *string } // applyContextSettings returns the settings configured in the context in the given request. @@ -356,6 +365,12 @@ func applyContextSettings(ctx context.Context, req Request) contextSettings { result.BatchID = id } } + // JobIDResponse + if v := ctx.Value(keyJobIDResponse); v != nil { + if idRef, ok := v.(*string); ok { + result.JobIDResponse = idRef + } + } return result } diff --git a/deps/github.com/arangodb/go-driver/cursor_impl.go b/deps/github.com/arangodb/go-driver/cursor_impl.go index d7ab022bd..805d4baf3 100644 --- a/deps/github.com/arangodb/go-driver/cursor_impl.go +++ b/deps/github.com/arangodb/go-driver/cursor_impl.go @@ -24,7 +24,9 @@ package driver import ( "context" + "encoding/json" "path" + "reflect" "sync" "sync/atomic" "time" @@ -165,12 +167,23 @@ func (c *cursor) ReadDocument(ctx context.Context, result interface{}) (Document } c.resultIndex++ var meta DocumentMeta - if err := c.conn.Unmarshal(*c.Result[index], &meta); err != nil { - // If a cursor returns something other than a document, this will fail. - // Just ignore it. - } - if err := c.conn.Unmarshal(*c.Result[index], result); err != nil { - return DocumentMeta{}, WithStack(err) + resultPtr := c.Result[index] + if resultPtr == nil { + // Got NULL result + rv := reflect.ValueOf(result) + if rv.Kind() != reflect.Ptr || rv.IsNil() { + return DocumentMeta{}, WithStack(&json.InvalidUnmarshalError{Type: reflect.TypeOf(result)}) + } + e := rv.Elem() + e.Set(reflect.Zero(e.Type())) + } else { + if err := c.conn.Unmarshal(*resultPtr, &meta); err != nil { + // If a cursor returns something other than a document, this will fail. + // Just ignore it. + } + if err := c.conn.Unmarshal(*resultPtr, result); err != nil { + return DocumentMeta{}, WithStack(err) + } } return meta, nil } diff --git a/deps/github.com/arangodb/go-driver/test/concurrency_test.go b/deps/github.com/arangodb/go-driver/test/concurrency_test.go index 3a5db3cf4..6af0d4840 100644 --- a/deps/github.com/arangodb/go-driver/test/concurrency_test.go +++ b/deps/github.com/arangodb/go-driver/test/concurrency_test.go @@ -26,6 +26,7 @@ import ( "context" "crypto/rand" "encoding/hex" + "os" "strconv" "sync" "testing" @@ -40,71 +41,81 @@ func TestConcurrentCreateSmallDocuments(t *testing.T) { t.Skip("Skip on short tests") } c := createClientFromEnv(t, true) - db := ensureDatabase(nil, c, "document_test", nil, t) - col := ensureCollection(nil, db, "TestConcurrentCreateSmallDocuments", nil, t) - docChan := make(chan driver.DocumentMeta, 16*1024) + version, err := c.Version(nil) + if err != nil { + t.Fatalf("Version failed: %s", describe(err)) + } + isv33p := version.Version.CompareTo("3.3") >= 0 + if !isv33p && os.Getenv("TEST_CONNECTION") == "vst" { + t.Skip("Skipping VST load test on 3.2") + } else { + db := ensureDatabase(nil, c, "document_test", nil, t) + col := ensureCollection(nil, db, "TestConcurrentCreateSmallDocuments", nil, t) - creator := func(limit, interval int) { - for i := 0; i < limit; i++ { - ctx := context.Background() - doc := UserDoc{ - "Jan", - i * interval, - } - meta, err := col.CreateDocument(ctx, doc) - if err != nil { - t.Fatalf("Failed to create new document: %s", describe(err)) + docChan := make(chan driver.DocumentMeta, 16*1024) + + creator := func(limit, interval int) { + for i := 0; i < limit; i++ { + ctx := context.Background() + doc := UserDoc{ + "Jan", + i * interval, + } + meta, err := col.CreateDocument(ctx, doc) + if err != nil { + t.Fatalf("Failed to create new document: %s", describe(err)) + } + docChan <- meta } - docChan <- meta } - } - reader := func() { - for { - meta, ok := <-docChan - if !ok { - return - } - // Document must exists now - if found, err := col.DocumentExists(nil, meta.Key); err != nil { - t.Fatalf("DocumentExists failed for '%s': %s", meta.Key, describe(err)) - } else if !found { - t.Errorf("DocumentExists returned false for '%s', expected true", meta.Key) - } - // Read document - var readDoc UserDoc - if _, err := col.ReadDocument(nil, meta.Key, &readDoc); err != nil { - t.Fatalf("Failed to read document '%s': %s", meta.Key, describe(err)) + reader := func() { + for { + meta, ok := <-docChan + if !ok { + return + } + // Document must exists now + if found, err := col.DocumentExists(nil, meta.Key); err != nil { + t.Fatalf("DocumentExists failed for '%s': %s", meta.Key, describe(err)) + } else if !found { + t.Errorf("DocumentExists returned false for '%s', expected true", meta.Key) + } + // Read document + var readDoc UserDoc + if _, err := col.ReadDocument(nil, meta.Key, &readDoc); err != nil { + t.Fatalf("Failed to read document '%s': %s", meta.Key, describe(err)) + } } } - } - noCreators := getIntFromEnv("NOCREATORS", 25) - noReaders := getIntFromEnv("NOREADERS", 50) - noDocuments := getIntFromEnv("NODOCUMENTS", 1000) // per creator - - wgCreators := sync.WaitGroup{} - // Run N concurrent creators - for i := 0; i < noCreators; i++ { - wgCreators.Add(1) - go func() { - defer wgCreators.Done() - creator(noDocuments, noCreators) - }() - } - wgReaders := sync.WaitGroup{} - // Run M readers - for i := 0; i < noReaders; i++ { - wgReaders.Add(1) - go func() { - defer wgReaders.Done() - reader() - }() + noCreators := getIntFromEnv("NOCREATORS", 25) + noReaders := getIntFromEnv("NOREADERS", 50) + noDocuments := getIntFromEnv("NODOCUMENTS", 1000) // per creator + + wgCreators := sync.WaitGroup{} + // Run N concurrent creators + for i := 0; i < noCreators; i++ { + wgCreators.Add(1) + go func() { + defer wgCreators.Done() + creator(noDocuments, noCreators) + }() + } + wgReaders := sync.WaitGroup{} + // Run M readers + for i := 0; i < noReaders; i++ { + wgReaders.Add(1) + go func() { + defer wgReaders.Done() + reader() + }() + } + wgCreators.Wait() + close(docChan) + wgReaders.Wait() } - wgCreators.Wait() - close(docChan) - wgReaders.Wait() } // TestConcurrentCreateBigDocuments make a lot of concurrent CreateDocument calls. @@ -114,71 +125,81 @@ func TestConcurrentCreateBigDocuments(t *testing.T) { t.Skip("Skip on short tests") } c := createClientFromEnv(t, true) - db := ensureDatabase(nil, c, "document_test", nil, t) - col := ensureCollection(nil, db, "TestConcurrentCreateBigDocuments", nil, t) - - docChan := make(chan driver.DocumentMeta, 16*1024) - - creator := func(limit, interval int) { - data := make([]byte, 1024) - for i := 0; i < limit; i++ { - rand.Read(data) - ctx := context.Background() - doc := UserDoc{ - "Jan" + strconv.Itoa(i) + hex.EncodeToString(data), - i * interval, - } - meta, err := col.CreateDocument(ctx, doc) - if err != nil { - t.Fatalf("Failed to create new document: %s", describe(err)) - } - docChan <- meta - } + + version, err := c.Version(nil) + if err != nil { + t.Fatalf("Version failed: %s", describe(err)) } + isv33p := version.Version.CompareTo("3.3") >= 0 + if !isv33p && os.Getenv("TEST_CONNECTION") == "vst" { + t.Skip("Skipping VST load test on 3.2") + } else { + db := ensureDatabase(nil, c, "document_test", nil, t) + col := ensureCollection(nil, db, "TestConcurrentCreateBigDocuments", nil, t) - reader := func() { - for { - meta, ok := <-docChan - if !ok { - return - } - // Document must exists now - if found, err := col.DocumentExists(nil, meta.Key); err != nil { - t.Fatalf("DocumentExists failed for '%s': %s", meta.Key, describe(err)) - } else if !found { - t.Errorf("DocumentExists returned false for '%s', expected true", meta.Key) + docChan := make(chan driver.DocumentMeta, 16*1024) + + creator := func(limit, interval int) { + data := make([]byte, 1024) + for i := 0; i < limit; i++ { + rand.Read(data) + ctx := context.Background() + doc := UserDoc{ + "Jan" + strconv.Itoa(i) + hex.EncodeToString(data), + i * interval, + } + meta, err := col.CreateDocument(ctx, doc) + if err != nil { + t.Fatalf("Failed to create new document: %s", describe(err)) + } + docChan <- meta } - // Read document - var readDoc UserDoc - if _, err := col.ReadDocument(nil, meta.Key, &readDoc); err != nil { - t.Fatalf("Failed to read document '%s': %s", meta.Key, describe(err)) + } + + reader := func() { + for { + meta, ok := <-docChan + if !ok { + return + } + // Document must exists now + if found, err := col.DocumentExists(nil, meta.Key); err != nil { + t.Fatalf("DocumentExists failed for '%s': %s", meta.Key, describe(err)) + } else if !found { + t.Errorf("DocumentExists returned false for '%s', expected true", meta.Key) + } + // Read document + var readDoc UserDoc + if _, err := col.ReadDocument(nil, meta.Key, &readDoc); err != nil { + t.Fatalf("Failed to read document '%s': %s", meta.Key, describe(err)) + } } } - } - noCreators := getIntFromEnv("NOCREATORS", 25) - noReaders := getIntFromEnv("NOREADERS", 50) - noDocuments := getIntFromEnv("NODOCUMENTS", 100) // per creator - - wgCreators := sync.WaitGroup{} - // Run N concurrent creators - for i := 0; i < noCreators; i++ { - wgCreators.Add(1) - go func() { - defer wgCreators.Done() - creator(noDocuments, noCreators) - }() - } - wgReaders := sync.WaitGroup{} - // Run M readers - for i := 0; i < noReaders; i++ { - wgReaders.Add(1) - go func() { - defer wgReaders.Done() - reader() - }() + noCreators := getIntFromEnv("NOCREATORS", 25) + noReaders := getIntFromEnv("NOREADERS", 50) + noDocuments := getIntFromEnv("NODOCUMENTS", 100) // per creator + + wgCreators := sync.WaitGroup{} + // Run N concurrent creators + for i := 0; i < noCreators; i++ { + wgCreators.Add(1) + go func() { + defer wgCreators.Done() + creator(noDocuments, noCreators) + }() + } + wgReaders := sync.WaitGroup{} + // Run M readers + for i := 0; i < noReaders; i++ { + wgReaders.Add(1) + go func() { + defer wgReaders.Done() + reader() + }() + } + wgCreators.Wait() + close(docChan) + wgReaders.Wait() } - wgCreators.Wait() - close(docChan) - wgReaders.Wait() } diff --git a/deps/github.com/arangodb/go-driver/test/cursor_test.go b/deps/github.com/arangodb/go-driver/test/cursor_test.go index a992ecc6e..2c806faeb 100644 --- a/deps/github.com/arangodb/go-driver/test/cursor_test.go +++ b/deps/github.com/arangodb/go-driver/test/cursor_test.go @@ -229,6 +229,27 @@ func TestCreateCursor(t *testing.T) { } } +// TestCreateCursorReturnNull creates a cursor with a `RETURN NULL` query. +func TestCreateCursorReturnNull(t *testing.T) { + ctx := context.Background() + c := createClientFromEnv(t, true) + db := ensureDatabase(ctx, c, "cursor_test", nil, t) + + var result interface{} + query := "return null" + cursor, err := db.Query(ctx, query, nil) + if err != nil { + t.Fatalf("Query(return null) failed: %s", describe(err)) + } + defer cursor.Close() + if _, err := cursor.ReadDocument(ctx, &result); err != nil { + t.Fatalf("ReadDocument failed: %s", describe(err)) + } + if result != nil { + t.Errorf("Expected result to be nil, got %#v", result) + } +} + // Test stream query cursors. The goroutines are technically only // relevant for the MMFiles engine, but don't hurt on rocksdb either func TestCreateStreamCursor(t *testing.T) { diff --git a/deps/github.com/arangodb/go-driver/test/user_auth_test.go b/deps/github.com/arangodb/go-driver/test/user_auth_test.go index 810da6829..cc60a12bb 100644 --- a/deps/github.com/arangodb/go-driver/test/user_auth_test.go +++ b/deps/github.com/arangodb/go-driver/test/user_auth_test.go @@ -257,35 +257,69 @@ func TestGrantUserDefaultDatabase(t *testing.T) { t.Fatalf("SetDatabaseAccess failed: %s", describe(err)) } - // wait for change to propagate (TODO add a check to the coordinators) - time.Sleep(time.Second * 5) - - // Try to create document in collection, should fail because there are no collection grants for this user and/or collection. - if _, err := authCol.CreateDocument(nil, Book{Title: "I cannot write"}); !driver.IsForbidden(err) { - t.Errorf("Expected failure, got %s", describe(err)) - } + // wait for change to propagate + { + deadline := time.Now().Add(time.Minute) + for { + // Try to create document in collection, should fail because there are no collection grants for this user and/or collection. + if _, err := authCol.CreateDocument(nil, Book{Title: "I cannot write"}); err == nil { + if time.Now().Before(deadline) { + t.Logf("Expected failure, got %s, trying again...", describe(err)) + time.Sleep(time.Second * 2) + continue + } + t.Errorf("Expected failure, got %s", describe(err)) + } - // Try to create collection, should fail - if _, err := authDb.CreateCollection(nil, "books_def_ro_db", nil); !driver.IsForbidden(err) { - t.Errorf("Expected failure, got %s", describe(err)) + // Try to create collection, should fail + if _, err := authDb.CreateCollection(nil, "books_def_ro_db", nil); err == nil { + t.Errorf("Expected failure, got %s", describe(err)) + } + break + } } // Grant no access to default database if err := u.SetDatabaseAccess(nil, nil, driver.GrantNone); err != nil { t.Fatalf("SetDatabaseAccess failed: %s", describe(err)) } - // Try to create collection, should fail - if _, err := authDb.CreateCollection(nil, "books_def_none_db", nil); !driver.IsUnauthorized(err) { - t.Errorf("Expected failure, got %s", describe(err)) + + // wait for change to propagate + { + deadline := time.Now().Add(time.Minute) + for { + // Try to create collection, should fail + if _, err := authDb.CreateCollection(nil, "books_def_none_db", nil); err == nil { + if time.Now().Before(deadline) { + t.Logf("Expected failure, got %s, trying again...", describe(err)) + time.Sleep(time.Second * 2) + continue + } + t.Errorf("Expected failure, got %s", describe(err)) + } + break + } } // Remove default database access, should fallback to "no-access" then if err := u.RemoveDatabaseAccess(nil, nil); err != nil { t.Fatalf("RemoveDatabaseAccess failed: %s", describe(err)) } - // Try to create collection, should fail - if _, err := authDb.CreateCollection(nil, "books_def_star_db", nil); !driver.IsUnauthorized(err) { - t.Errorf("Expected failure, got %s", describe(err)) + // wait for change to propagate + { + deadline := time.Now().Add(time.Minute) + for { + // Try to create collection, should fail + if _, err := authDb.CreateCollection(nil, "books_def_star_db", nil); err == nil { + if time.Now().Before(deadline) { + t.Logf("Expected failure, got %s, trying again...", describe(err)) + time.Sleep(time.Second * 2) + continue + } + t.Errorf("Expected failure, got %s", describe(err)) + } + break + } } } diff --git a/deps/github.com/arangodb/go-driver/vst/protocol/chunk.go b/deps/github.com/arangodb/go-driver/vst/protocol/chunk.go index eb1e59bb4..d16559adb 100644 --- a/deps/github.com/arangodb/go-driver/vst/protocol/chunk.go +++ b/deps/github.com/arangodb/go-driver/vst/protocol/chunk.go @@ -87,6 +87,10 @@ func buildChunks(messageID uint64, maxChunkSize uint32, messageParts ...[]byte) func readBytes(dst []byte, r io.Reader) error { offset := 0 remaining := len(dst) + if remaining == 0 { + // Nothing left to read + return nil + } for { n, err := r.Read(dst[offset:]) offset += n diff --git a/deps/github.com/arangodb/go-driver/vst/protocol/connection.go b/deps/github.com/arangodb/go-driver/vst/protocol/connection.go index 986a1e514..d1af33c4a 100644 --- a/deps/github.com/arangodb/go-driver/vst/protocol/connection.go +++ b/deps/github.com/arangodb/go-driver/vst/protocol/connection.go @@ -43,13 +43,14 @@ type Connection struct { msgStore messageStore conn net.Conn writeMutex sync.Mutex - closing bool + closing int32 lastActivity time.Time configured int32 // Set to 1 after the configuration callback has finished without errors. } const ( defaultMaxChunkSize = 30000 + maxRecentErrors = 64 ) var ( @@ -59,13 +60,8 @@ var ( // dial opens a new connection to the server on the given address. func dial(version Version, addr string, tlsConfig *tls.Config) (*Connection, error) { - var conn net.Conn - var err error - if tlsConfig != nil { - conn, err = tls.Dial("tcp", addr, tlsConfig) - } else { - conn, err = net.Dial("tcp", addr) - } + // Create TCP connection + conn, err := net.Dial("tcp", addr) if err != nil { return nil, driver.WithStack(err) } @@ -76,6 +72,12 @@ func dial(version Version, addr string, tlsConfig *tls.Config) (*Connection, err tcpConn.SetNoDelay(true) } + // Add TLS if needed + if tlsConfig != nil { + tlsConn := tls.Client(conn, tlsConfig) + conn = tlsConn + } + // Send protocol header switch version { case Version1_0: @@ -112,8 +114,7 @@ func (c *Connection) load() int { // Close the connection to the server func (c *Connection) Close() error { - if !c.closing { - c.closing = true + if atomic.CompareAndSwapInt32(&c.closing, 0, 1) { if err := c.conn.Close(); err != nil { return driver.WithStack(err) } @@ -126,7 +127,7 @@ func (c *Connection) Close() error { // IsClosed returns true when the connection is closed, false otherwise. func (c *Connection) IsClosed() bool { - return c.closing + return atomic.LoadInt32(&c.closing) == 1 } // IsConfigured returns true when the configuration callback has finished on this connection, without errors. @@ -208,8 +209,10 @@ func (c *Connection) sendChunk(deadline time.Time, chunk chunk) error { // readChunkLoop reads chunks from the connection until it is closed. func (c *Connection) readChunkLoop() { + recentErrors := 0 + goodChunks := 0 for { - if c.closing { + if c.IsClosed() { // Closing, we're done return } @@ -225,17 +228,27 @@ func (c *Connection) readChunkLoop() { } c.updateLastActivity() if err != nil { - if !c.closing { + if !c.IsClosed() { // Handle error if err == io.EOF { // Connection closed c.Close() } else { - fmt.Printf("readChunkLoop error: %#v\n", err) + recentErrors++ + fmt.Printf("readChunkLoop error: %#v (goodChunks=%d)\n", err, goodChunks) + if recentErrors > maxRecentErrors { + // When we get to many errors in a row, close this connection + c.Close() + } else { + // Backoff a bit, so we allow things to settle. + time.Sleep(time.Millisecond * time.Duration(recentErrors*5)) + } } } } else { // Process chunk + recentErrors = 0 + goodChunks++ go c.processChunk(chunk) } } diff --git a/pkg/apis/deployment/v1alpha/member_status.go b/pkg/apis/deployment/v1alpha/member_status.go index 1bd08e888..a1c18e56a 100644 --- a/pkg/apis/deployment/v1alpha/member_status.go +++ b/pkg/apis/deployment/v1alpha/member_status.go @@ -50,6 +50,8 @@ type MemberStatus struct { // IsInitialized is set after the very first time a pod was created for this member. // After that, DBServers must have a UUID field or fail. IsInitialized bool `json:"initialized"` + // CleanoutJobID holds the ID of the agency job for cleaning out this server + CleanoutJobID string `json:"cleanout-job-id,omitempty"` } // Age returns the duration since the creation timestamp of this member. diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 63e171caa..f9efb7f06 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -29,12 +29,14 @@ import ( "github.com/arangodb/arangosync/client" "github.com/arangodb/arangosync/tasks" driver "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/agency" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) @@ -146,6 +148,15 @@ func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id str return result, nil } +// GetAgency returns a connection to the entire agency. +func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) { + result, err := arangod.CreateArangodAgencyClient(ctx, d.deps.KubeCli.CoreV1(), d.apiObject) + if err != nil { + return nil, maskAny(err) + } + return result, nil +} + // GetSyncServerClient returns a cached client for a specific arangosync server. func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) { // Fetch monitoring token diff --git a/pkg/deployment/reconcile/action.go b/pkg/deployment/reconcile/action.go index b06356a15..d6869b5a3 100644 --- a/pkg/deployment/reconcile/action.go +++ b/pkg/deployment/reconcile/action.go @@ -34,8 +34,8 @@ type Action interface { // the start time needs to be recorded and a ready condition needs to be checked. Start(ctx context.Context) (bool, error) // CheckProgress checks the progress of the action. - // Returns true if the action is completely finished, false otherwise. - CheckProgress(ctx context.Context) (bool, error) + // Returns: ready, abort, error. + CheckProgress(ctx context.Context) (bool, bool, error) // Timeout returns the amount of time after which this action will timeout. Timeout() time.Duration } diff --git a/pkg/deployment/reconcile/action_add_member.go b/pkg/deployment/reconcile/action_add_member.go index c63225a3f..4f4094781 100644 --- a/pkg/deployment/reconcile/action_add_member.go +++ b/pkg/deployment/reconcile/action_add_member.go @@ -61,9 +61,9 @@ func (a *actionAddMember) Start(ctx context.Context) (bool, error) { // CheckProgress checks the progress of the action. // Returns true if the action is completely finished, false otherwise. -func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, error) { +func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, bool, error) { // Nothing todo - return true, nil + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_cleanout_member.go b/pkg/deployment/reconcile/action_cleanout_member.go index cb4b86fe7..eedefab15 100644 --- a/pkg/deployment/reconcile/action_cleanout_member.go +++ b/pkg/deployment/reconcile/action_cleanout_member.go @@ -26,8 +26,11 @@ import ( "context" "time" + driver "github.com/arangodb/go-driver" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/rs/zerolog" + + "github.com/arangodb/kube-arangodb/pkg/util/arangod" ) // NewCleanOutMemberAction creates a new Action that implements the given @@ -67,12 +70,16 @@ func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) { log.Debug().Err(err).Msg("Failed to access cluster") return false, maskAny(err) } + var jobID string + ctx = driver.WithJobIDResponse(ctx, &jobID) if err := cluster.CleanOutServer(ctx, a.action.MemberID); err != nil { log.Debug().Err(err).Msg("Failed to cleanout member") return false, maskAny(err) } + log.Debug().Str("job-id", jobID).Msg("Cleanout member started") // Update status m.Phase = api.MemberPhaseCleanOut + m.CleanoutJobID = jobID if a.actionCtx.UpdateMember(m); err != nil { return false, maskAny(err) } @@ -80,40 +87,62 @@ func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) { } // CheckProgress checks the progress of the action. -// Returns true if the action is completely finished, false otherwise. -func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, error) { +// Returns: ready, abort, error. +func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, bool, error) { log := a.log m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !ok { // We wanted to remove and it is already gone. All ok - return true, nil + return true, false, nil } c, err := a.actionCtx.GetDatabaseClient(ctx) if err != nil { - log.Debug().Err(err).Msg("Failed to create member client") - return false, maskAny(err) + log.Debug().Err(err).Msg("Failed to create database client") + return false, false, maskAny(err) } cluster, err := c.Cluster(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to access cluster") - return false, maskAny(err) + return false, false, maskAny(err) } cleanedOut, err := cluster.IsCleanedOut(ctx, a.action.MemberID) if err != nil { - return false, maskAny(err) + log.Debug().Err(err).Msg("IsCleanedOut failed") + return false, false, maskAny(err) } if !cleanedOut { - // We're not done yet - return false, nil + // We're not done yet, check job status + log.Debug().Msg("IsCleanedOut returned false") + + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create database client") + return false, false, maskAny(err) + } + agency, err := a.actionCtx.GetAgency(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create agency client") + return false, false, maskAny(err) + } + jobStatus, err := arangod.CleanoutServerJobStatus(ctx, m.CleanoutJobID, c, agency) + if err != nil { + log.Debug().Err(err).Msg("Failed to fetch cleanout job status") + return false, false, maskAny(err) + } + if jobStatus.IsFailed() { + log.Warn().Str("reason", jobStatus.Reason()).Msg("Cleanout Job failed. Aborting plan") + return false, true, nil + } + return false, false, nil } // Cleanout completed if m.Conditions.Update(api.ConditionTypeCleanedOut, true, "CleanedOut", "") { if a.actionCtx.UpdateMember(m); err != nil { - return false, maskAny(err) + return false, false, maskAny(err) } } // Cleanout completed - return true, nil + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 2aae87e49..c127e24b8 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -26,6 +26,8 @@ import ( "context" "fmt" + "github.com/arangodb/go-driver/agency" + "github.com/arangodb/arangosync/client" driver "github.com/arangodb/go-driver" "github.com/rs/zerolog" @@ -46,6 +48,8 @@ type ActionContext interface { GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) // GetAgencyClients returns a client connection for every agency member. GetAgencyClients(ctx context.Context) ([]driver.Connection, error) + // GetAgency returns a connection to the entire agency. + GetAgency(ctx context.Context) (agency.Agency, error) // GetSyncServerClient returns a cached client for a specific arangosync server. GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) // GetMemberStatusByID returns the current member status @@ -118,6 +122,15 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect return c, nil } +// GetAgency returns a connection to the entire agency. +func (ac *actionContext) GetAgency(ctx context.Context) (agency.Agency, error) { + a, err := ac.context.GetAgency(ctx) + if err != nil { + return nil, maskAny(err) + } + return a, nil +} + // GetSyncServerClient returns a cached client for a specific arangosync server. func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) { c, err := ac.context.GetSyncServerClient(ctx, group, id) diff --git a/pkg/deployment/reconcile/action_remove_member.go b/pkg/deployment/reconcile/action_remove_member.go index 64cda8ccb..4aee6cf4b 100644 --- a/pkg/deployment/reconcile/action_remove_member.go +++ b/pkg/deployment/reconcile/action_remove_member.go @@ -24,6 +24,7 @@ package reconcile import ( "context" + "fmt" "time" "github.com/pkg/errors" @@ -86,14 +87,18 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { if err := a.actionCtx.RemoveMemberByID(a.action.MemberID); err != nil { return false, maskAny(err) } + // Check that member has been removed + if _, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID); found { + return false, maskAny(fmt.Errorf("Member %s still exists", a.action.MemberID)) + } return true, nil } // CheckProgress checks the progress of the action. // Returns true if the action is completely finished, false otherwise. -func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, error) { +func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, bool, error) { // Nothing todo - return true, nil + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_renew_tls_certificate.go b/pkg/deployment/reconcile/action_renew_tls_certificate.go index 99e0ffc93..6f1b61bf1 100644 --- a/pkg/deployment/reconcile/action_renew_tls_certificate.go +++ b/pkg/deployment/reconcile/action_renew_tls_certificate.go @@ -67,8 +67,8 @@ func (a *renewTLSCertificateAction) Start(ctx context.Context) (bool, error) { // CheckProgress checks the progress of the action. // Returns true if the action is completely finished, false otherwise. -func (a *renewTLSCertificateAction) CheckProgress(ctx context.Context) (bool, error) { - return true, nil +func (a *renewTLSCertificateAction) CheckProgress(ctx context.Context) (bool, bool, error) { + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_rotate_member.go b/pkg/deployment/reconcile/action_rotate_member.go index c829be15d..8b4c59f86 100644 --- a/pkg/deployment/reconcile/action_rotate_member.go +++ b/pkg/deployment/reconcile/action_rotate_member.go @@ -70,7 +70,7 @@ func (a *actionRotateMember) Start(ctx context.Context) (bool, error) { defer cancel() if err := c.Shutdown(ctx, removeFromCluster); err != nil { // Shutdown failed. Let's check if we're already done - if ready, err := a.CheckProgress(ctx); err == nil && ready { + if ready, _, err := a.CheckProgress(ctx); err == nil && ready { // We're done return true, nil } @@ -92,30 +92,31 @@ func (a *actionRotateMember) Start(ctx context.Context) (bool, error) { } // CheckProgress checks the progress of the action. -// Returns true if the action is completely finished, false otherwise. -func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, error) { +// Returns: ready, abort, error. +func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, bool, error) { // Check that pod is removed log := a.log m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !found { log.Error().Msg("No such member") - return true, nil + return true, false, nil } if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { // Pod is not yet terminated - return false, nil + return false, false, nil } // Pod is terminated, we can now remove it if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, maskAny(err) + return false, false, maskAny(err) } // Pod is now gone, update the member status m.Phase = api.MemberPhaseNone m.RecentTerminations = nil // Since we're rotating, we do not care about old terminations. + m.CleanoutJobID = "" if err := a.actionCtx.UpdateMember(m); err != nil { - return false, maskAny(err) + return false, false, maskAny(err) } - return true, nil + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_shutdown_member.go b/pkg/deployment/reconcile/action_shutdown_member.go index 833295d18..69735ce3b 100644 --- a/pkg/deployment/reconcile/action_shutdown_member.go +++ b/pkg/deployment/reconcile/action_shutdown_member.go @@ -75,7 +75,7 @@ func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { defer cancel() if err := c.Shutdown(ctx, removeFromCluster); err != nil { // Shutdown failed. Let's check if we're already done - if ready, err := a.CheckProgress(ctx); err == nil && ready { + if ready, _, err := a.CheckProgress(ctx); err == nil && ready { // We're done return true, nil } @@ -97,19 +97,19 @@ func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { } // CheckProgress checks the progress of the action. -// Returns true if the action is completely finished, false otherwise. -func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, error) { +// Returns: ready, abort, error. +func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, bool, error) { m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !found { // Member not long exists - return true, nil + return true, false, nil } if m.Conditions.IsTrue(api.ConditionTypeTerminated) { // Shutdown completed - return true, nil + return true, false, nil } // Member still not shutdown, retry soon - return false, nil + return false, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_upgrade_member.go b/pkg/deployment/reconcile/action_upgrade_member.go index 793470b0a..3b52030e0 100644 --- a/pkg/deployment/reconcile/action_upgrade_member.go +++ b/pkg/deployment/reconcile/action_upgrade_member.go @@ -75,7 +75,7 @@ func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { defer cancel() if err := c.Shutdown(ctx, removeFromCluster); err != nil { // Shutdown failed. Let's check if we're already done - if ready, err := a.CheckProgress(ctx); err == nil && ready { + if ready, _, err := a.CheckProgress(ctx); err == nil && ready { // We're done return true, nil } @@ -98,13 +98,13 @@ func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { // CheckProgress checks the progress of the action. // Returns true if the action is completely finished, false otherwise. -func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) { +func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, bool, error) { // Check that pod is removed log := a.log m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) if !found { log.Error().Msg("No such member") - return true, nil + return true, false, nil } isUpgrading := m.Phase == api.MemberPhaseUpgrading log = log.With(). @@ -112,20 +112,21 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) { Bool("is-upgrading", isUpgrading).Logger() if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { // Pod is not yet terminated - return false, nil + return false, false, nil } // Pod is terminated, we can now remove it log.Debug().Msg("Deleting pod") if err := a.actionCtx.DeletePod(m.PodName); err != nil { - return false, maskAny(err) + return false, false, maskAny(err) } // Pod is now gone, update the member status m.Phase = api.MemberPhaseNone m.RecentTerminations = nil // Since we're upgrading, we do not care about old terminations. + m.CleanoutJobID = "" if err := a.actionCtx.UpdateMember(m); err != nil { - return false, maskAny(err) + return false, false, maskAny(err) } - return isUpgrading, nil + return isUpgrading, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/action_wait_for_member_up.go b/pkg/deployment/reconcile/action_wait_for_member_up.go index 9675ae293..b61ab3bae 100644 --- a/pkg/deployment/reconcile/action_wait_for_member_up.go +++ b/pkg/deployment/reconcile/action_wait_for_member_up.go @@ -54,7 +54,7 @@ type actionWaitForMemberUp struct { // Returns true if the action is completely finished, false in case // the start time needs to be recorded and a ready condition needs to be checked. func (a *actionWaitForMemberUp) Start(ctx context.Context) (bool, error) { - ready, err := a.CheckProgress(ctx) + ready, _, err := a.CheckProgress(ctx) if err != nil { return false, maskAny(err) } @@ -63,7 +63,7 @@ func (a *actionWaitForMemberUp) Start(ctx context.Context) (bool, error) { // CheckProgress checks the progress of the action. // Returns true if the action is completely finished, false otherwise. -func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, error) { +func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, bool, error) { if a.action.Group.IsArangosync() { return a.checkProgressArangoSync(ctx) } @@ -85,85 +85,85 @@ func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, error) // checkProgressSingle checks the progress of the action in the case // of a single server. -func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, error) { +func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, bool, error) { log := a.log c, err := a.actionCtx.GetDatabaseClient(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to create database client") - return false, maskAny(err) + return false, false, maskAny(err) } if _, err := c.Version(ctx); err != nil { log.Debug().Err(err).Msg("Failed to get version") - return false, maskAny(err) + return false, false, maskAny(err) } - return true, nil + return true, false, nil } // checkProgressAgent checks the progress of the action in the case // of an agent. -func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) { +func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, bool, error) { log := a.log clients, err := a.actionCtx.GetAgencyClients(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to create agency clients") - return false, maskAny(err) + return false, false, maskAny(err) } if err := agency.AreAgentsHealthy(ctx, clients); err != nil { log.Debug().Err(err).Msg("Not all agents are ready") - return false, nil + return false, false, nil } log.Debug().Msg("Agency is happy") - return true, nil + return true, false, nil } // checkProgressCluster checks the progress of the action in the case // of a cluster deployment (coordinator/dbserver). -func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, error) { +func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, bool, error) { log := a.log c, err := a.actionCtx.GetDatabaseClient(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to create database client") - return false, maskAny(err) + return false, false, maskAny(err) } cluster, err := c.Cluster(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to access cluster") - return false, maskAny(err) + return false, false, maskAny(err) } h, err := cluster.Health(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to get cluster health") - return false, maskAny(err) + return false, false, maskAny(err) } sh, found := h.Health[driver.ServerID(a.action.MemberID)] if !found { log.Debug().Msg("Member not yet found in cluster health") - return false, nil + return false, false, nil } if sh.Status != driver.ServerStatusGood { log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good") - return false, nil + return false, false, nil } - return true, nil + return true, false, nil } // checkProgressArangoSync checks the progress of the action in the case // of a sync master / worker. -func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, error) { +func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, bool, error) { log := a.log c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.ID) if err != nil { log.Debug().Err(err).Msg("Failed to create arangosync client") - return false, maskAny(err) + return false, false, maskAny(err) } if err := c.Health(ctx); err != nil { log.Debug().Err(err).Msg("Health not ok yet") - return false, maskAny(err) + return false, false, maskAny(err) } - return true, nil + return true, false, nil } // Timeout returns the amount of time after which this action will timeout. diff --git a/pkg/deployment/reconcile/context.go b/pkg/deployment/reconcile/context.go index 96a5c8fd2..66b9e2b23 100644 --- a/pkg/deployment/reconcile/context.go +++ b/pkg/deployment/reconcile/context.go @@ -27,6 +27,7 @@ import ( "github.com/arangodb/arangosync/client" driver "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/agency" "k8s.io/api/core/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" @@ -52,6 +53,8 @@ type Context interface { // GetAgencyClients returns a client connection for every agency member. // If the given predicate is not nil, only agents are included where the given predicate returns true. GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) + // GetAgency returns a connection to the entire agency. + GetAgency(ctx context.Context) (agency.Agency, error) // GetSyncServerClient returns a cached client for a specific arangosync server. GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) // CreateEvent creates a given event. diff --git a/pkg/deployment/reconcile/plan_builder.go b/pkg/deployment/reconcile/plan_builder.go index ba4edd63c..002f96f69 100644 --- a/pkg/deployment/reconcile/plan_builder.go +++ b/pkg/deployment/reconcile/plan_builder.go @@ -358,7 +358,7 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api Str("role", group.AsRole()). Msg("Creating scale-up plan") } else if len(members) > count { - // Note, we scale down 1 member as a time + // Note, we scale down 1 member at a time if m, err := members.SelectMemberToRemove(); err == nil { if group == api.ServerGroupDBServers { plan = append(plan, @@ -373,6 +373,7 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api Int("count", count). Int("actual-count", len(members)). Str("role", group.AsRole()). + Str("member-id", m.ID). Msg("Creating scale-down plan") } } diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 2de276127..81ff7570d 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -44,7 +44,10 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) { for { loopStatus, _ := d.context.GetStatus() if len(loopStatus.Plan) == 0 { - // No plan exists, nothing to be done + // No plan exists or all action have finished, nothing to be done + if !firstLoop { + log.Debug().Msg("Reconciliation plan has finished") + } return !firstLoop, nil } firstLoop = false @@ -92,7 +95,7 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) { // Continue with next action } else { // First action of plan has been started, check its progress - ready, err := action.CheckProgress(ctx) + ready, abort, err := action.CheckProgress(ctx) if err != nil { log.Debug().Err(err).Msg("Failed to check action progress") return false, maskAny(err) @@ -109,14 +112,26 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) { } } } - log.Debug().Bool("ready", ready).Msg("Action CheckProgress completed") + log.Debug(). + Bool("abort", abort). + Bool("ready", ready). + Msg("Action CheckProgress completed") if !ready { - // Not ready yet, check timeout - deadline := planAction.CreationTime.Add(action.Timeout()) - if time.Now().After(deadline) { - // Timeout has expired - log.Warn().Msg("Action not finished in time. Removing the entire plan") - d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) + deadlineExpired := false + if abort { + log.Warn().Msg("Action aborted. Removing the entire plan") + d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) + } else { + // Not ready yet & no abort, check timeout + deadline := planAction.CreationTime.Add(action.Timeout()) + if time.Now().After(deadline) { + // Timeout has expired + deadlineExpired = true + log.Warn().Msg("Action not finished in time. Removing the entire plan") + d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) + } + } + if abort || deadlineExpired { // Replace plan with empty one and save it. status, lastVersion := d.context.GetStatus() status.Plan = api.Plan{} diff --git a/pkg/util/arangod/cleanout_server.go b/pkg/util/arangod/cleanout_server.go new file mode 100644 index 000000000..b464efbb7 --- /dev/null +++ b/pkg/util/arangod/cleanout_server.go @@ -0,0 +1,99 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + "fmt" + + driver "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/agency" +) + +// CleanoutJobStatus is a strongly typed status of an agency cleanout-server-job. +type CleanoutJobStatus struct { + state string + reason string +} + +// IsFailed returns true when the job is failed +func (s CleanoutJobStatus) IsFailed() bool { + return s.state == "Failed" +} + +// IsFinished returns true when the job is finished +func (s CleanoutJobStatus) IsFinished() bool { + return s.state == "Finished" +} + +// Reason returns the reason for the current state. +func (s CleanoutJobStatus) Reason() string { + return s.reason +} + +// String returns a string representation of the given state. +func (s CleanoutJobStatus) String() string { + return fmt.Sprintf("state: '%s', reason: '%s'", s.state, s.reason) +} + +var ( + agencyJobStateKeyPrefixes = [][]string{ + {"arango", "Target", "ToDo"}, + {"arango", "Target", "Pending"}, + {"arango", "Target", "Finished"}, + {"arango", "Target", "Failed"}, + } +) + +type agencyJob struct { + Reason string `json:"reason,omitempty"` + Server string `json:"server,omitempty"` + JobID string `json:"jobId,omitempty"` + Type string `json:"type,omitempty"` +} + +const ( + agencyJobTypeCleanOutServer = "cleanOutServer" +) + +// CleanoutServerJobStatus checks the status of a cleanout-server job with given ID. +func CleanoutServerJobStatus(ctx context.Context, jobID string, client driver.Client, agencyClient agency.Agency) (CleanoutJobStatus, error) { + for _, keyPrefix := range agencyJobStateKeyPrefixes { + key := append(keyPrefix, jobID) + var job agencyJob + if err := agencyClient.ReadKey(ctx, key, &job); err == nil { + return CleanoutJobStatus{ + state: keyPrefix[len(keyPrefix)-1], + reason: job.Reason, + }, nil + } else if agency.IsKeyNotFound(err) { + continue + } else { + return CleanoutJobStatus{}, maskAny(err) + } + } + // Job not found in any states + return CleanoutJobStatus{ + reason: "job not found", + }, nil +} diff --git a/pkg/util/arangod/client.go b/pkg/util/arangod/client.go index 25526f69e..7774a63ce 100644 --- a/pkg/util/arangod/client.go +++ b/pkg/util/arangod/client.go @@ -32,6 +32,7 @@ import ( "time" driver "github.com/arangodb/go-driver" + "github.com/arangodb/go-driver/agency" "github.com/arangodb/go-driver/http" "github.com/arangodb/go-driver/jwt" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -109,6 +110,38 @@ func CreateArangodDatabaseClient(ctx context.Context, cli corev1.CoreV1Interface return c, nil } +// CreateArangodAgencyClient creates a go-driver client for accessing the agents of the given deployment. +func CreateArangodAgencyClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment) (agency.Agency, error) { + var dnsNames []string + for _, m := range apiObject.Status.Members.Agents { + dnsName := k8sutil.CreatePodDNSName(apiObject, api.ServerGroupAgents.AsRole(), m.ID) + dnsNames = append(dnsNames, dnsName) + } + connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, dnsNames) + if err != nil { + return nil, maskAny(err) + } + agencyConn, err := agency.NewAgencyConnection(connConfig) + if err != nil { + return nil, maskAny(err) + } + auth, err := createArangodClientAuthentication(ctx, cli, apiObject) + if err != nil { + return nil, maskAny(err) + } + if auth != nil { + agencyConn, err = agencyConn.SetAuthentication(auth) + if err != nil { + return nil, maskAny(err) + } + } + a, err := agency.NewAgency(agencyConn) + if err != nil { + return nil, maskAny(err) + } + return a, nil +} + // CreateArangodImageIDClient creates a go-driver client for an ArangoDB instance // running in an Image-ID pod. func CreateArangodImageIDClient(ctx context.Context, deployment k8sutil.APIObject, role, id string) (driver.Client, error) { @@ -123,6 +156,34 @@ func CreateArangodImageIDClient(ctx context.Context, deployment k8sutil.APIObjec // CreateArangodClientForDNSName creates a go-driver client for a given DNS name. func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsName string) (driver.Client, error) { + connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, []string{dnsName}) + if err != nil { + return nil, maskAny(err) + } + // TODO deal with TLS with proper CA checking + conn, err := http.NewConnection(connConfig) + if err != nil { + return nil, maskAny(err) + } + + // Create client + config := driver.ClientConfig{ + Connection: conn, + } + auth, err := createArangodClientAuthentication(ctx, cli, apiObject) + if err != nil { + return nil, maskAny(err) + } + config.Authentication = auth + c, err := driver.NewClient(config) + if err != nil { + return nil, maskAny(err) + } + return c, nil +} + +// createArangodHTTPConfigForDNSNames creates a go-driver HTTP connection config for a given DNS names. +func createArangodHTTPConfigForDNSNames(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsNames []string) (http.ConnectionConfig, error) { scheme := "http" transport := sharedHTTPTransport if apiObject != nil && apiObject.Spec.IsSecure() { @@ -130,20 +191,17 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa transport = sharedHTTPSTransport } connConfig := http.ConnectionConfig{ - Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))}, Transport: transport, DontFollowRedirect: true, } - // TODO deal with TLS with proper CA checking - conn, err := http.NewConnection(connConfig) - if err != nil { - return nil, maskAny(err) + for _, dnsName := range dnsNames { + connConfig.Endpoints = append(connConfig.Endpoints, scheme+"://"+net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))) } + return connConfig, nil +} - // Create client - config := driver.ClientConfig{ - Connection: conn, - } +// createArangodClientAuthentication creates a go-driver authentication for the servers in the given deployment. +func createArangodClientAuthentication(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment) (driver.Authentication, error) { if apiObject != nil && apiObject.Spec.IsAuthenticated() { // Authentication is enabled. // Should we skip using it? @@ -156,7 +214,7 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa if err != nil { return nil, maskAny(err) } - config.Authentication = driver.RawAuthentication(jwt) + return driver.RawAuthentication(jwt), nil } } else { // Authentication is not enabled. @@ -165,9 +223,5 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa return nil, maskAny(fmt.Errorf("Authentication is required by context, but not provided in API object")) } } - c, err := driver.NewClient(config) - if err != nil { - return nil, maskAny(err) - } - return c, nil + return nil, nil } diff --git a/pkg/util/k8sutil/events.go b/pkg/util/k8sutil/events.go index 6a03d9028..599257fad 100644 --- a/pkg/util/k8sutil/events.go +++ b/pkg/util/k8sutil/events.go @@ -155,6 +155,16 @@ func NewPlanTimeoutEvent(apiObject APIObject, itemType, memberID, role string) * return event } +// NewPlanAbortedEvent creates an event indicating that an item on a reconciliation plan wants to abort +// the entire plan. +func NewPlanAbortedEvent(apiObject APIObject, itemType, memberID, role string) *v1.Event { + event := newDeploymentEvent(apiObject) + event.Type = v1.EventTypeNormal + event.Reason = "Reconciliation Plan Aborted" + event.Message = fmt.Sprintf("An plan item of type %s or member %s with role %s wants to abort the plan", itemType, memberID, role) + return event +} + // NewErrorEvent creates an even of type error. func NewErrorEvent(reason string, err error, apiObject APIObject) *v1.Event { event := newDeploymentEvent(apiObject)