Skip to content

Commit

Permalink
Merge pull request #145 from arangodb/feature/sync-reconciliation
Browse files Browse the repository at this point in the history
Adding syncmaster&worker reconciliation support.
  • Loading branch information
ewoutp authored Jun 5, 2018
2 parents 11e1768 + f7e916c commit 4ab7a63
Show file tree
Hide file tree
Showing 28 changed files with 697 additions and 174 deletions.
11 changes: 0 additions & 11 deletions docs/Manual/Deployment/Kubernetes/DeploymentResource.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,17 +205,6 @@ replication in the cluster. When enabled, the cluster will contain
a number of `syncmaster` & `syncworker` servers.
The default value is `false`.

### `spec.sync.image: string`

This setting specifies the docker image to use for all ArangoSync servers.
When not specified, the `spec.image` value is used.

### `spec.sync.imagePullPolicy: string`

This setting specifies the pull policy for the docker image to use for all ArangoSync servers.
For possible values, see `spec.imagePullPolicy`.
When not specified, the `spec.imagePullPolicy` value is used.

### `spec.sync.externalAccess.type: string`

This setting specifies the type of `Service` that will be created to provide
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/deployment/v1alpha/deployment_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (s *DeploymentSpec) SetDefaults(deploymentName string) {
s.RocksDB.SetDefaults()
s.Authentication.SetDefaults(deploymentName + "-jwt")
s.TLS.SetDefaults(deploymentName + "-ca")
s.Sync.SetDefaults(s.GetImage(), s.GetImagePullPolicy(), deploymentName+"-sync-jwt", deploymentName+"-sync-client-auth-ca", deploymentName+"-sync-ca")
s.Sync.SetDefaults(deploymentName+"-sync-jwt", deploymentName+"-sync-client-auth-ca", deploymentName+"-sync-ca", deploymentName+"-sync-mt")
s.Single.SetDefaults(ServerGroupSingle, s.GetMode().HasSingleServers(), s.GetMode())
s.Agents.SetDefaults(ServerGroupAgents, s.GetMode().HasAgents(), s.GetMode())
s.DBServers.SetDefaults(ServerGroupDBServers, s.GetMode().HasDBServers(), s.GetMode())
Expand Down
8 changes: 6 additions & 2 deletions pkg/apis/deployment/v1alpha/monitoring_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,12 @@ func (s MonitoringSpec) Validate() error {
}

// SetDefaults fills in missing defaults
func (s *MonitoringSpec) SetDefaults() {
// Nothing needed
func (s *MonitoringSpec) SetDefaults(defaultTokenSecretName string) {
if s.GetTokenSecretName() == "" {
// Note that we don't check for nil here, since even a specified, but empty
// string should result in the default value.
s.TokenSecretName = util.NewString(defaultTokenSecretName)
}
}

// SetDefaultsFrom fills unspecified fields with a value from given source spec.
Expand Down
7 changes: 6 additions & 1 deletion pkg/apis/deployment/v1alpha/monitoring_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ func TestMonitoringSpecValidate(t *testing.T) {

func TestMonitoringSpecSetDefaults(t *testing.T) {
def := func(spec MonitoringSpec) MonitoringSpec {
spec.SetDefaults()
spec.SetDefaults("")
return spec
}
def2 := func(spec MonitoringSpec) MonitoringSpec {
spec.SetDefaults("def2")
return spec
}

assert.Equal(t, "", def(MonitoringSpec{}).GetTokenSecretName())
assert.Equal(t, "def2", def2(MonitoringSpec{}).GetTokenSecretName())
assert.Equal(t, "foo", def(MonitoringSpec{TokenSecretName: util.NewString("foo")}).GetTokenSecretName())
}
2 changes: 2 additions & 0 deletions pkg/apis/deployment/v1alpha/server_group_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (s *ServerGroupSpec) SetDefaults(group ServerGroup, used bool, mode Deploym
default:
s.Count = util.NewInt(3)
}
} else if s.GetCount() > 0 && !used {
s.Count = util.NewInt(0)
}
if _, found := s.Resources.Requests[v1.ResourceStorage]; !found {
switch group {
Expand Down
34 changes: 3 additions & 31 deletions pkg/apis/deployment/v1alpha/sync_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,13 @@ package v1alpha

import (
"github.com/pkg/errors"
"k8s.io/api/core/v1"

"github.com/arangodb/kube-arangodb/pkg/util"
)

// SyncSpec holds dc2dc replication specific configuration settings
type SyncSpec struct {
Enabled *bool `json:"enabled,omitempty"`
Image *string `json:"image,omitempty"`
ImagePullPolicy *v1.PullPolicy `json:"imagePullPolicy,omitempty"`
Enabled *bool `json:"enabled,omitempty"`

ExternalAccess SyncExternalAccessSpec `json:"externalAccess"`
Authentication SyncAuthenticationSpec `json:"auth"`
Expand All @@ -46,24 +43,11 @@ func (s SyncSpec) IsEnabled() bool {
return util.BoolOrDefault(s.Enabled)
}

// GetImage returns the value of image.
func (s SyncSpec) GetImage() string {
return util.StringOrDefault(s.Image)
}

// GetImagePullPolicy returns the value of imagePullPolicy.
func (s SyncSpec) GetImagePullPolicy() v1.PullPolicy {
return util.PullPolicyOrDefault(s.ImagePullPolicy)
}

// Validate the given spec
func (s SyncSpec) Validate(mode DeploymentMode) error {
if s.IsEnabled() && !mode.SupportsSync() {
return maskAny(errors.Wrapf(ValidationError, "Cannot enable sync with mode: '%s'", mode))
}
if s.GetImage() == "" {
return maskAny(errors.Wrapf(ValidationError, "image must be set"))
}
if s.IsEnabled() {
if err := s.ExternalAccess.Validate(); err != nil {
return maskAny(err)
Expand All @@ -82,30 +66,18 @@ func (s SyncSpec) Validate(mode DeploymentMode) error {
}

// SetDefaults fills in missing defaults
func (s *SyncSpec) SetDefaults(defaultImage string, defaulPullPolicy v1.PullPolicy, defaultJWTSecretName, defaultClientAuthCASecretName, defaultTLSCASecretName string) {
if s.GetImage() == "" {
s.Image = util.NewString(defaultImage)
}
if s.GetImagePullPolicy() == "" {
s.ImagePullPolicy = util.NewPullPolicy(defaulPullPolicy)
}
func (s *SyncSpec) SetDefaults(defaultJWTSecretName, defaultClientAuthCASecretName, defaultTLSCASecretName, defaultMonitoringSecretName string) {
s.ExternalAccess.SetDefaults()
s.Authentication.SetDefaults(defaultJWTSecretName, defaultClientAuthCASecretName)
s.TLS.SetDefaults(defaultTLSCASecretName)
s.Monitoring.SetDefaults()
s.Monitoring.SetDefaults(defaultMonitoringSecretName)
}

// SetDefaultsFrom fills unspecified fields with a value from given source spec.
func (s *SyncSpec) SetDefaultsFrom(source SyncSpec) {
if s.Enabled == nil {
s.Enabled = util.NewBoolOrNil(source.Enabled)
}
if s.Image == nil {
s.Image = util.NewStringOrNil(source.Image)
}
if s.ImagePullPolicy == nil {
s.ImagePullPolicy = util.NewPullPolicyOrNil(source.ImagePullPolicy)
}
s.ExternalAccess.SetDefaultsFrom(source.ExternalAccess)
s.Authentication.SetDefaultsFrom(source.Authentication)
s.TLS.SetDefaultsFrom(source.TLS)
Expand Down
35 changes: 8 additions & 27 deletions pkg/apis/deployment/v1alpha/sync_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,40 +27,33 @@ import (

"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/stretchr/testify/assert"
"k8s.io/api/core/v1"
)

func TestSyncSpecValidate(t *testing.T) {
// Valid
auth := SyncAuthenticationSpec{JWTSecretName: util.NewString("foo"), ClientCASecretName: util.NewString("foo-client")}
tls := TLSSpec{CASecretName: util.NewString("None")}
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeSingle))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeSingle))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Nil(t, SyncSpec{Authentication: auth}.Validate(DeploymentModeCluster))
assert.Nil(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeCluster))

// Not valid
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeActiveFailover))
assert.Error(t, SyncSpec{Image: util.NewString(""), Authentication: auth}.Validate(DeploymentModeCluster))
assert.Error(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Image: util.NewString("foo"), Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeActiveFailover))
assert.Error(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeSingle))
assert.Error(t, SyncSpec{Authentication: auth, TLS: tls, Enabled: util.NewBool(true)}.Validate(DeploymentModeActiveFailover))
}

func TestSyncSpecSetDefaults(t *testing.T) {
def := func(spec SyncSpec) SyncSpec {
spec.SetDefaults("test-image", v1.PullAlways, "test-jwt", "test-client-auth-ca", "test-tls-ca")
spec.SetDefaults("test-jwt", "test-client-auth-ca", "test-tls-ca", "test-mon")
return spec
}

assert.False(t, def(SyncSpec{}).IsEnabled())
assert.False(t, def(SyncSpec{Enabled: util.NewBool(false)}).IsEnabled())
assert.True(t, def(SyncSpec{Enabled: util.NewBool(true)}).IsEnabled())
assert.Equal(t, "test-image", def(SyncSpec{}).GetImage())
assert.Equal(t, "foo", def(SyncSpec{Image: util.NewString("foo")}).GetImage())
assert.Equal(t, v1.PullAlways, def(SyncSpec{}).GetImagePullPolicy())
assert.Equal(t, v1.PullNever, def(SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)}).GetImagePullPolicy())
assert.Equal(t, "test-jwt", def(SyncSpec{}).Authentication.GetJWTSecretName())
assert.Equal(t, "test-mon", def(SyncSpec{}).Monitoring.GetTokenSecretName())
assert.Equal(t, "foo", def(SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("foo")}}).Authentication.GetJWTSecretName())
}

Expand All @@ -84,18 +77,6 @@ func TestSyncSpecResetImmutableFields(t *testing.T) {
SyncSpec{Enabled: util.NewBool(false)},
nil,
},
{
SyncSpec{Image: util.NewString("foo")},
SyncSpec{Image: util.NewString("foo2")},
SyncSpec{Image: util.NewString("foo2")},
nil,
},
{
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullAlways)},
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)},
SyncSpec{ImagePullPolicy: util.NewPullPolicy(v1.PullNever)},
nil,
},
{
SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("None"), ClientCASecretName: util.NewString("some")}},
SyncSpec{Authentication: SyncAuthenticationSpec{JWTSecretName: util.NewString("None"), ClientCASecretName: util.NewString("some")}},
Expand Down
18 changes: 0 additions & 18 deletions pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,24 +678,6 @@ func (in *SyncSpec) DeepCopyInto(out *SyncSpec) {
**out = **in
}
}
if in.Image != nil {
in, out := &in.Image, &out.Image
if *in == nil {
*out = nil
} else {
*out = new(string)
**out = **in
}
}
if in.ImagePullPolicy != nil {
in, out := &in.ImagePullPolicy, &out.ImagePullPolicy
if *in == nil {
*out = nil
} else {
*out = new(core_v1.PullPolicy)
**out = **in
}
}
in.ExternalAccess.DeepCopyInto(&out.ExternalAccess)
in.Authentication.DeepCopyInto(&out.Authentication)
in.TLS.DeepCopyInto(&out.TLS)
Expand Down
34 changes: 34 additions & 0 deletions pkg/deployment/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ package deployment
import (
"context"

"github.com/arangodb/arangosync/client"
"github.com/arangodb/arangosync/tasks"
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -119,6 +121,38 @@ func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id str
return result, nil
}

// GetSyncServerClient returns a cached client for a specific arangosync server.
func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
// Fetch monitoring token
log := d.deps.Log
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
secretName := d.apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
monitoringToken, err := k8sutil.GetTokenSecret(kubecli.CoreV1(), secretName, ns)
if err != nil {
log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync monitoring secret")
return nil, maskAny(err)
}

// Fetch server DNS name
dnsName := k8sutil.CreatePodDNSName(d.apiObject, group.AsRole(), id)

// Build client
source := client.Endpoint{dnsName}
tlsAuth := tasks.TLSAuthentication{
TLSClientAuthentication: tasks.TLSClientAuthentication{
ClientToken: monitoringToken,
},
}
auth := client.NewAuthentication(tlsAuth, "")
insecureSkipVerify := true
c, err := d.syncClientCache.GetClient(d.deps.Log, source, auth, insecureSkipVerify)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}

// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
Expand Down
3 changes: 3 additions & 0 deletions pkg/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sync/atomic"
"time"

"github.com/arangodb/arangosync/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/api/core/v1"
Expand Down Expand Up @@ -101,6 +102,7 @@ type Deployment struct {
resilience *resilience.Resilience
resources *resources.Resources
chaosMonkey *chaos.Monkey
syncClientCache client.ClientCache
}

// New creates a new Deployment from the given API object.
Expand Down Expand Up @@ -280,6 +282,7 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
}
newAPIObject := current.DeepCopy()
newAPIObject.Spec.SetDefaultsFrom(specBefore)
newAPIObject.Spec.SetDefaults(d.apiObject.GetName())
newAPIObject.Status = d.status
resetFields := specBefore.ResetImmutableFields(&newAPIObject.Spec)
if len(resetFields) > 0 {
Expand Down
12 changes: 12 additions & 0 deletions pkg/deployment/reconcile/action_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"fmt"

"github.com/arangodb/arangosync/client"
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
Expand All @@ -45,6 +46,8 @@ type ActionContext interface {
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]driver.Connection, error)
// GetSyncServerClient returns a cached client for a specific arangosync server.
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down Expand Up @@ -115,6 +118,15 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect
return c, nil
}

// GetSyncServerClient returns a cached client for a specific arangosync server.
func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
c, err := ac.context.GetSyncServerClient(ctx, group, id)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}

// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
Expand Down
11 changes: 10 additions & 1 deletion pkg/deployment/reconcile/action_wait_for_member_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,15 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
// checkProgressArangoSync checks the progress of the action in the case
// of a sync master / worker.
func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, error) {
// TODO
log := a.log
c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.ID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create arangosync client")
return false, maskAny(err)
}
if err := c.Health(ctx); err != nil {
log.Debug().Err(err).Msg("Health not ok yet")
return false, maskAny(err)
}
return true, nil
}
3 changes: 3 additions & 0 deletions pkg/deployment/reconcile/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package reconcile
import (
"context"

"github.com/arangodb/arangosync/client"
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"

Expand All @@ -51,6 +52,8 @@ type Context interface {
// GetAgencyClients returns a client connection for every agency member.
// If the given predicate is not nil, only agents are included where the given predicate returns true.
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error)
// GetSyncServerClient returns a cached client for a specific arangosync server.
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
Expand Down
5 changes: 4 additions & 1 deletion pkg/deployment/reconcile/plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,12 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object,
// Only scale singles
plan = append(plan, createScalePlan(log, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount())...)
case api.DeploymentModeCluster:
// Scale dbservers, coordinators, syncmasters & syncworkers
// Scale dbservers, coordinators
plan = append(plan, createScalePlan(log, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount())...)
plan = append(plan, createScalePlan(log, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount())...)
}
if spec.GetMode().SupportsSync() {
// Scale syncmasters & syncworkers
plan = append(plan, createScalePlan(log, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount())...)
plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...)
}
Expand Down
Loading

0 comments on commit 4ab7a63

Please sign in to comment.