diff --git a/pkg/controller/orchestrator/orchestrator_reconcile.go b/pkg/controller/orchestrator/orchestrator_reconcile.go index 9a4e2c0b0..4e4ca8b47 100644 --- a/pkg/controller/orchestrator/orchestrator_reconcile.go +++ b/pkg/controller/orchestrator/orchestrator_reconcile.go @@ -68,6 +68,10 @@ func (ou *orcUpdater) Sync(ctx context.Context) (syncer.SyncResult, error) { if instances, err = ou.orcClient.Cluster(ou.cluster.GetClusterAlias()); err != nil { log.V(-1).Info("can't get instances from orchestrator", "alias", ou.cluster.GetClusterAlias(), "error", err.Error()) + if !orc.IsNotFound(err) { + log.Error(err, "orchestrator is not reachable", "cluster_alias", ou.cluster.GetClusterAlias()) + return syncer.SyncResult{}, err + } } if len(instances) != 0 { diff --git a/pkg/controller/orchestrator/orchestrator_reconcile_test.go b/pkg/controller/orchestrator/orchestrator_reconcile_test.go index 8ea7d712d..9473d0e16 100644 --- a/pkg/controller/orchestrator/orchestrator_reconcile_test.go +++ b/pkg/controller/orchestrator/orchestrator_reconcile_test.go @@ -79,6 +79,33 @@ var _ = Describe("Orchestrator reconciler", func() { }) }) + When("orchestrator is not available", func() { + BeforeEach(func() { + // register nodes into orchestrator + cluster.Status.ReadyNodes = 1 + _, err := orcSyncer.Sync(context.TODO()) + Expect(err).To(Succeed()) + + // second reconcile event to update cluster status + _, err = orcSyncer.Sync(context.TODO()) + Expect(err).To(Succeed()) + + // check that sync was successful + Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(0))).To( + haveNodeCondWithStatus(api.NodeConditionMaster, core.ConditionTrue)) + + // make orchestrator fake client unreachable + orcClient.MakeOrcUnreachable() + }) + + It("should not reconcile and keep the last known state", func() { + _, err := orcSyncer.Sync(context.TODO()) + Expect(err).ToNot(Succeed()) + + Expect(cluster.GetNodeStatusFor(cluster.GetPodHostname(0))).To(haveNodeCondWithStatus(api.NodeConditionMaster, core.ConditionTrue)) + }) + }) + When("cluster is registered in orchestrator", func() { BeforeEach(func() { // AddRecoveries signature: cluster, acked diff --git a/pkg/orchestrator/errors.go b/pkg/orchestrator/errors.go new file mode 100644 index 000000000..9cbdab522 --- /dev/null +++ b/pkg/orchestrator/errors.go @@ -0,0 +1,87 @@ +/* +Copyright 2018 Pressinfra SRL + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package orchestrator + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "strings" +) + +// Error contains orchestrator error details +type Error struct { + HTTPStatus int + Path string + Message string + Details interface{} +} + +func (e Error) Error() string { + return fmt.Sprintf("[orc]: status: %d path: %s msg: %s, details: %v", + e.HTTPStatus, e.Path, e.Message, e.Details) +} + +// NewError returns a specific orchestrator error with extra details +func NewError(resp *http.Response, path string, details interface{}) *Error { + rsp := &Error{ + HTTPStatus: resp.StatusCode, + Path: path, + Details: details, + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + rsp.Message = "<>" + return rsp + } + + if err = json.Unmarshal(body, rsp); err != nil { + log.V(-1).Info("error when unmarhal error data", "body", string(body)) + rsp.Message = fmt.Sprintf("<>", err, body) + return rsp + } + + return rsp +} + +// NewErrorMsg returns an orchestrator error with extra msg +func NewErrorMsg(msg string, path string) *Error { + return &Error{ + HTTPStatus: 0, + Message: msg, + Path: path, + } +} + +// IsNotFound checks if the given error is orchestrator error and it's cluster not found. +func IsNotFound(err error) bool { + if orcErr, ok := err.(*Error); ok { + if strings.Contains(orcErr.Message, "Unable to determine cluster name") { + return true + } + + // When querying for instances orchestrator returns the following error message when the + // replica cannot be reached. + // https://github.com/github/orchestrator/blob/151029a103429fe16123b9842d1a5b4b175bd5d5/go/http/api.go#L184 + if strings.Contains(orcErr.Message, "Cannot read instance") { + return true + } + } + return false +} diff --git a/pkg/orchestrator/fake/client.go b/pkg/orchestrator/fake/client.go index 922c4ca5d..ea2339a01 100644 --- a/pkg/orchestrator/fake/client.go +++ b/pkg/orchestrator/fake/client.go @@ -35,7 +35,8 @@ type OrcFakeClient struct { Discovered []InstanceKey - lock *sync.Mutex + lock *sync.Mutex + reachable bool } var nextID int64 @@ -53,7 +54,8 @@ const ( // New fake orchestrator client func New() *OrcFakeClient { return &OrcFakeClient{ - lock: &sync.Mutex{}, + lock: &sync.Mutex{}, + reachable: true, } } @@ -65,6 +67,11 @@ func (o *OrcFakeClient) Reset() { o.Discovered = []InstanceKey{} } +// MakeOrcUnreachable makes every function return an error +func (o *OrcFakeClient) MakeOrcUnreachable() { + o.reachable = false +} + // AddInstance add a instance to orchestrator client func (o *OrcFakeClient) AddInstance(instance Instance) *Instance { o.lock.Lock() @@ -176,6 +183,10 @@ func (o *OrcFakeClient) getHostClusterAlias(host string) string { // Discover register a host into orchestrator func (o *OrcFakeClient) Discover(host string, port int) error { + if !o.reachable { + return NewErrorMsg("can't connect to orc", "/") + } + o.Discovered = append(o.Discovered, InstanceKey{ Hostname: host, Port: port, @@ -199,6 +210,9 @@ func (o *OrcFakeClient) Discover(host string, port int) error { // Forget removes a host from orchestrator func (o *OrcFakeClient) Forget(host string, port int) error { + if !o.reachable { + return NewErrorMsg("can't connect to orc", "/") + } // determine cluster name cluster := o.getHostClusterAlias(host) o.RemoveInstance(cluster, host) @@ -210,16 +224,20 @@ func (o *OrcFakeClient) Master(clusterHint string) (*Instance, error) { o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return nil, NewErrorMsg("can't connect to orc", "/") + } + insts, ok := o.Clusters[clusterHint] if !ok { - return nil, fmt.Errorf("not found") + return nil, NewErrorMsg("Unable to determine cluster name", "/master") } for _, inst := range insts { if !inst.ReadOnly { return inst, nil } } - return nil, fmt.Errorf("[faker] master not found") + return nil, NewErrorMsg("Unable to determine master", "/master") } // Cluster returns the list of instances from a cluster @@ -227,9 +245,13 @@ func (o *OrcFakeClient) Cluster(cluster string) ([]Instance, error) { o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return nil, NewErrorMsg("can't connect to orc", "/") + } + instsPointers, ok := o.Clusters[cluster] if !ok { - return nil, fmt.Errorf("not found") + return nil, NewErrorMsg("Unable to determine cluster name", "/cluster") } insts := []Instance{} @@ -245,9 +267,13 @@ func (o *OrcFakeClient) AuditRecovery(cluster string) ([]TopologyRecovery, error o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return nil, NewErrorMsg("can't connect to orc", "/") + } + recoveries, ok := o.Recoveries[cluster] if !ok { - return nil, fmt.Errorf("not found") + return nil, NewErrorMsg("Unable to determine cluster name", "/audit-recovery") } return recoveries, nil @@ -258,6 +284,10 @@ func (o *OrcFakeClient) AckRecovery(id int64, comment string) error { o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return NewErrorMsg("can't connect to orc", "/") + } + o.AckRec = append(o.AckRec, id) return nil } @@ -267,6 +297,10 @@ func (o *OrcFakeClient) SetHostWritable(key InstanceKey) error { o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return NewErrorMsg("can't connect to orc", "/") + } + for _, instances := range o.Clusters { for _, instance := range instances { if instance.Key.Hostname == key.Hostname { @@ -283,6 +317,10 @@ func (o *OrcFakeClient) SetHostReadOnly(key InstanceKey) error { o.lock.Lock() defer o.lock.Unlock() + if !o.reachable { + return NewErrorMsg("can't connect to orc", "/") + } + for _, instances := range o.Clusters { for _, instance := range instances { if instance.Key.Hostname == key.Hostname { diff --git a/pkg/orchestrator/util.go b/pkg/orchestrator/util.go index c26fb2558..c58cc84d2 100644 --- a/pkg/orchestrator/util.go +++ b/pkg/orchestrator/util.go @@ -29,71 +29,27 @@ import ( var log = logf.Log.WithName("orchestrator.client") -type orcError struct { - HTTPStatus int - Path string - Message string - Details interface{} -} - -func (e orcError) Error() string { - return fmt.Sprintf("[orc]: status: %d path: %s msg: %s, details: %v", - e.HTTPStatus, e.Path, e.Message, e.Details) -} - -// NewOrcError returns a specific orchestrator error with extra details -func NewOrcError(resp *http.Response, path string, details interface{}) error { - rsp := orcError{ - HTTPStatus: resp.StatusCode, - Path: path, - Details: details, - } - - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - rsp.Message = "Can't read body" - return rsp - } - - if err = json.Unmarshal(body, &rsp); err != nil { - log.V(1).Info("error when unmarhal error data", "body", string(body)) - rsp.Message = fmt.Sprintf("can't get more details, in error: error: %s, body: %s", err, body) - return rsp - } - - return rsp -} - -// NewOrcErrorMsg returns an orchestrator error with extra msg -func NewOrcErrorMsg(msg string, path string) error { - return orcError{ - HTTPStatus: 0, - Message: msg, - Path: path, - } -} - -func (o *orchestrator) makeGetRequest(path string, out interface{}) error { +func (o *orchestrator) makeGetRequest(path string, out interface{}) *Error { uri := fmt.Sprintf("%s/%s", o.connectURI, path) log.V(2).Info("orchestrator request info", "uri", uri, "outobj", out) req, err := http.NewRequest("GET", uri, nil) if err != nil { - return NewOrcErrorMsg(fmt.Sprintf("can't create request: %s", err.Error()), path) + return NewErrorMsg(fmt.Sprintf("can't create request: %s", err.Error()), path) } client := &http.Client{} resp, err := client.Do(req) if err != nil { - return NewOrcErrorMsg(err.Error(), path) + return NewErrorMsg(err.Error(), path) } if resp.StatusCode >= 500 { - return NewOrcError(resp, path, nil) + return NewError(resp, path, nil) } if err := unmarshalJSON(resp.Body, out); err != nil { - return NewOrcError(resp, path, err) + return NewError(resp, path, err) } return nil