Skip to content

Commit

Permalink
Add initial upgrade flow
Browse files Browse the repository at this point in the history
Signed-off-by: killianmuldoon <[email protected]>
  • Loading branch information
killianmuldoon committed Jul 5, 2023
1 parent ae9464d commit bd52d47
Show file tree
Hide file tree
Showing 7 changed files with 278 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,7 @@ const (
// By using this mechanism leadership can be forwarded to another pod with an atomic operation
// (add/update of the annotation to the pod/etcd member we are forwarding leadership to).
EtcdLeaderFromAnnotationName = "etcd.inmemory.infrastructure.cluster.x-k8s.io/leader-from"

// EtcdMemberRemoved is added to etcd pods which have been removed from the etcd cluster.
EtcdMemberRemoved = "etcd.inmemory.infrastructure.cluster.x-k8s.io/member-removed"
)
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"time"

jsonpatch "github.com/evanphx/json-patch/v5"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -117,6 +119,15 @@ func (c *cache) List(resourceGroup string, list client.ObjectList, opts ...clien
}
}

// TODO(killianmuldoon): This only matches the nodeName field for pods. No other fieldSelectors are implemented. This should return an error if another fieldselector is used.
if pod, ok := obj.(*corev1.Pod); ok {
if listOpts.FieldSelector != nil && !listOpts.FieldSelector.Empty() {
if !listOpts.FieldSelector.Matches(fields.Set{"spec.nodeName": pod.Spec.NodeName}) {
continue
}
}
}

obj := obj.DeepCopyObject().(client.Object)
switch list.(type) {
case *unstructured.UnstructuredList:
Expand Down Expand Up @@ -272,11 +283,6 @@ func updateTrackerOwnerReferences(tracker *resourceGroupTracker, oldObj, newObj
}

func (c *cache) Patch(resourceGroup string, obj client.Object, patch client.Patch) error {
obj = obj.DeepCopyObject().(client.Object)
if err := c.Get(resourceGroup, client.ObjectKeyFromObject(obj), obj); err != nil {
return err
}

patchData, err := patch.Data(obj)
if err != nil {
return apierrors.NewInternalError(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
"tier": "control-plane",
},
},
Spec: corev1.PodSpec{
NodeName: inMemoryMachine.Name,
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
Expand All @@ -443,7 +446,7 @@ func (r *InMemoryMachineReconciler) reconcileNormalETCD(ctx context.Context, clu
}

// Gets info about the current etcd cluster, if any.
info, err := r.inspectEtcd(ctx, cloudClient)
info, err := r.getEtcdInfo(ctx, cloudClient)
if err != nil {
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -527,7 +530,7 @@ type etcdInfo struct {
members sets.Set[string]
}

func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) {
func (r *InMemoryMachineReconciler) getEtcdInfo(ctx context.Context, cloudClient cclient.Client) (etcdInfo, error) {
etcdPods := &corev1.PodList{}
if err := cloudClient.List(ctx, etcdPods,
client.InNamespace(metav1.NamespaceSystem),
Expand All @@ -547,6 +550,9 @@ func (r *InMemoryMachineReconciler) inspectEtcd(ctx context.Context, cloudClient
}
var leaderFrom time.Time
for _, pod := range etcdPods.Items {
if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok {
continue
}
if info.clusterID == "" {
info.clusterID = pod.Annotations[cloudv1.EtcdClusterIDAnnotationName]
} else if pod.Annotations[cloudv1.EtcdClusterIDAnnotationName] != info.clusterID {
Expand Down Expand Up @@ -626,6 +632,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalAPIServer(ctx context.Context
"tier": "control-plane",
},
},
Spec: corev1.PodSpec{
NodeName: inMemoryMachine.Name,
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
Expand Down Expand Up @@ -712,6 +721,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalScheduler(ctx context.Context
"tier": "control-plane",
},
},
Spec: corev1.PodSpec{
NodeName: inMemoryMachine.Name,
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
Expand Down Expand Up @@ -757,6 +769,9 @@ func (r *InMemoryMachineReconciler) reconcileNormalControllerManager(ctx context
"tier": "control-plane",
},
},
Spec: corev1.PodSpec{
NodeName: inMemoryMachine.Name,
},
Status: corev1.PodStatus{
Phase: corev1.PodRunning,
Conditions: []corev1.PodCondition{
Expand Down Expand Up @@ -831,9 +846,12 @@ func (r *InMemoryMachineReconciler) reconcileNormalKubeadmObjects(ctx context.Co
Name: "kubeadm-config",
Namespace: metav1.NamespaceSystem,
},
Data: map[string]string{
"ClusterConfiguration": "",
},
}
if err := cloudClient.Create(ctx, cm); err != nil && !apierrors.IsAlreadyExists(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to create ubeadm-config ConfigMap")
return ctrl.Result{}, errors.Wrapf(err, "failed to create kubeadm-config ConfigMap")
}

return ctrl.Result{}, nil
Expand Down Expand Up @@ -1012,6 +1030,8 @@ func (r *InMemoryMachineReconciler) reconcileDeleteNode(ctx context.Context, clu
Name: inMemoryMachine.Name,
},
}

// TODO(killianmuldoon): check if we can drop this given that the MachineController is already draining pods and deleting nodes.
if err := cloudClient.Delete(ctx, node); err != nil && !apierrors.IsNotFound(err) {
return ctrl.Result{}, errors.Wrapf(err, "failed to delete Node")
}
Expand Down
15 changes: 15 additions & 0 deletions test/infrastructure/inmemory/internal/server/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand Down Expand Up @@ -252,6 +253,16 @@ func (h *apiServerHandler) apiV1List(req *restful.Request, resp *restful.Respons
listOpts = append(listOpts, client.InNamespace(req.PathParameter("namespace")))
}

// TODO: The only field selector which works is for `spec.nodeName` on pods.
selector, err := fields.ParseSelector(req.QueryParameter("fieldSelector"))
if err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if selector != nil {
listOpts = append(listOpts, client.MatchingFieldsSelector{Selector: selector})
}

if err := cloudClient.List(ctx, list, listOpts...); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
Expand Down Expand Up @@ -412,6 +423,10 @@ func (h *apiServerHandler) apiV1Patch(req *restful.Request, resp *restful.Respon
obj.SetName(req.PathParameter("name"))
obj.SetNamespace(req.PathParameter("namespace"))

if err := cloudClient.Get(ctx, client.ObjectKeyFromObject(obj), obj); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
}
if err := cloudClient.Patch(ctx, obj, patch); err != nil {
_ = resp.WriteErrorString(http.StatusInternalServerError, err.Error())
return
Expand Down
86 changes: 81 additions & 5 deletions test/infrastructure/inmemory/internal/server/etcd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,48 @@ func (m *maintenanceServer) Snapshot(_ *pb.SnapshotRequest, _ pb.Maintenance_Sna
return fmt.Errorf("not implemented: Snapshot")
}

func (m *maintenanceServer) MoveLeader(_ context.Context, _ *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
return nil, fmt.Errorf("not implemented: MoveLeader")
func (m *maintenanceServer) MoveLeader(ctx context.Context, req *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
out := new(pb.MoveLeaderResponse)
resourceGroup, _, err := m.getResourceGroupAndMember(ctx)
if err != nil {
return nil, err
}
etcdPods := &corev1.PodList{}
cloudClient := m.manager.GetResourceGroup(resourceGroup).GetClient()
if err := cloudClient.List(ctx, etcdPods,
client.InNamespace(metav1.NamespaceSystem),
client.MatchingLabels{
"component": "etcd",
"tier": "control-plane"},
); err != nil {
return nil, errors.Wrap(err, "failed to list etcd members")
}

if len(etcdPods.Items) == 0 {
return nil, errors.New("failed to list etcd members: no etcd pods found")
}

for i := range etcdPods.Items {
pod := &etcdPods.Items[i]
for k, v := range pod.GetAnnotations() {
if k == cloudv1.EtcdMemberIDAnnotationName {
target := strconv.FormatInt(int64(req.TargetID), 10)
if v == target {
updatedPod := pod.DeepCopy()
annotations := updatedPod.GetAnnotations()
annotations[cloudv1.EtcdLeaderFromAnnotationName] = time.Now().Format(time.RFC3339)
updatedPod.SetAnnotations(annotations)
err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(pod))
if err != nil {
return nil, err
}
return out, nil
}
}
}
}
// If we reach this point leadership was not moved.
return nil, errors.Errorf("etcd member with ID %d did not become the leader: expected etcd Pod not found", req.TargetID)
}

func (m *maintenanceServer) Downgrade(_ context.Context, _ *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
Expand All @@ -130,8 +170,39 @@ func (c *clusterServerServer) MemberAdd(_ context.Context, _ *pb.MemberAddReques
return nil, fmt.Errorf("not implemented: MemberAdd")
}

func (c *clusterServerServer) MemberRemove(_ context.Context, _ *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
return nil, fmt.Errorf("not implemented: MemberRemove")
func (c *clusterServerServer) MemberRemove(ctx context.Context, req *pb.MemberRemoveRequest) (*pb.MemberRemoveResponse, error) {
out := new(pb.MemberRemoveResponse)
resourceGroup, _, err := c.getResourceGroupAndMember(ctx)
if err != nil {
return nil, err
}
cloudClient := c.manager.GetResourceGroup(resourceGroup).GetClient()

etcdPods := &corev1.PodList{}

if err := cloudClient.List(ctx, etcdPods,
client.InNamespace(metav1.NamespaceSystem),
client.MatchingLabels{
"component": "etcd",
"tier": "control-plane"},
); err != nil {
return nil, errors.Wrap(err, "failed to list etcd members")
}

for i := range etcdPods.Items {
pod := etcdPods.Items[i]
memberID := pod.Annotations[cloudv1.EtcdMemberIDAnnotationName]
if memberID != fmt.Sprintf("%d", req.ID) {
continue
}
updatedPod := pod.DeepCopy()
updatedPod.Annotations[cloudv1.EtcdMemberRemoved] = ""
if err := cloudClient.Patch(ctx, updatedPod, client.MergeFrom(&pod)); err != nil {
return nil, err
}
return out, nil
}
return nil, errors.Errorf("no etcd member with id %d found", req.ID)
}

func (c *clusterServerServer) MemberUpdate(_ context.Context, _ *pb.MemberUpdateRequest) (*pb.MemberUpdateResponse, error) {
Expand Down Expand Up @@ -197,6 +268,12 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client
var leaderID int
var leaderFrom time.Time
for _, pod := range etcdPods.Items {
if _, ok := pod.Annotations[cloudv1.EtcdMemberRemoved]; ok {
if pod.Name == fmt.Sprintf("%s%s", "etcd-", etcdMember) {
return nil, nil, errors.New("inspect called on etcd which has been removed")
}
continue
}
if clusterID == 0 {
var err error
clusterID, err = strconv.Atoi(pod.Annotations[cloudv1.EtcdClusterIDAnnotationName])
Expand Down Expand Up @@ -224,7 +301,6 @@ func (b *baseServer) inspectEtcd(ctx context.Context, cloudClient cclient.Client
ClusterId: uint64(clusterID),
MemberId: uint64(memberID),
}

statusResponse.Header = memberList.Header
}
memberList.Members = append(memberList.Members, &pb.Member{
Expand Down
Loading

0 comments on commit bd52d47

Please sign in to comment.