Skip to content

Commit

Permalink
Changes in BaseOperation to retry createOrReplace on server failure
Browse files Browse the repository at this point in the history
+ Moved createOrReplace logic to CreateOrReplaceHelper class
+ Added tests
  • Loading branch information
rohanKanojia committed Oct 27, 2020
1 parent 625b4db commit 57838a0
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -71,7 +71,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -97,6 +96,7 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
private static final String INVOLVED_OBJECT_RESOURCE_VERSION = "involvedObject.resourceVersion";
private static final String INVOLVED_OBJECT_API_VERSION = "involvedObject.apiVersion";
private static final String INVOLVED_OBJECT_FIELD_PATH = "involvedObject.fieldPath";
public static final int CREATE_OR_REPLACE_RETRIES = 3;

private final Boolean cascading;
private final T item;
Expand Down Expand Up @@ -405,37 +405,23 @@ public final T createOrReplace(T... items) {

return withName(itemToCreateOrReplace.getMetadata().getName()).createOrReplace(itemToCreateOrReplace);
}

final CompletableFuture<T> future = new CompletableFuture<>();
while (!future.isDone()) {
try {
// Create
KubernetesResourceUtil.setResourceVersion(itemToCreateOrReplace, null);
future.complete(create(itemToCreateOrReplace));
} catch (KubernetesClientException exception) {
final T itemFromServer;
if (exception.getCode() == HttpURLConnection.HTTP_INTERNAL_ERROR) {
itemFromServer = fromServer().get();
if (itemFromServer == null) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
continue;
}
} else if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
} else {
itemFromServer = fromServer().get();
T finalItemToCreateOrReplace = itemToCreateOrReplace;
CreateOrReplaceHelper<T> createOrReplaceHelper = new CreateOrReplaceHelper<>(
this::create,
this::replace,
m -> {
try {
return waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return null;
},
this::delete,
m -> fromServer().get()
);

// Conflict; Do Replace
KubernetesResourceUtil.setResourceVersion(itemToCreateOrReplace, KubernetesResourceUtil.getResourceVersion(itemFromServer));
future.complete(replace(itemToCreateOrReplace));
}
}
return future.join();
return createOrReplaceHelper.createOrReplace(finalItemToCreateOrReplace, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import io.fabric8.kubernetes.client.utils.Utils;

import java.net.HttpURLConnection;
import java.util.Objects;
import java.util.function.Predicate;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -135,29 +135,22 @@ public HasMetadata createOrReplace() {
HasMetadata meta = acceptVisitors(asHasMetadata(item), visitors);
ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
String namespaceToUse = meta.getMetadata().getNamespace();

String resourceVersion = KubernetesResourceUtil.getResourceVersion(meta);
try {
// Create
KubernetesResourceUtil.setResourceVersion(meta, null);
return h.create(client, config, namespaceToUse, meta);
} catch (KubernetesClientException exception) {
if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

// Conflict; check deleteExisting flag otherwise replace
if (Boolean.TRUE.equals(deletingExisting)) {
Boolean deleted = h.delete(client, config, namespaceToUse, propagationPolicy, meta);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + meta);
CreateOrReplaceHelper<HasMetadata> createOrReplaceHelper = new CreateOrReplaceHelper<>(
m -> h.create(client, config, namespaceToUse, m),
m -> h.replace(client, config, namespaceToUse, m),
m -> {
try {
return h.waitUntilCondition(client, config, namespaceToUse, m, Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return h.create(client, config, namespaceToUse, meta);
} else {
KubernetesResourceUtil.setResourceVersion(meta, resourceVersion);
return h.replace(client, config, namespaceToUse, meta);
}
}
return null;
},
m -> h.delete(client, config, namespaceToUse, propagationPolicy, m),
m -> h.reload(client, config, namespaceToUse, m)
);

return createOrReplaceHelper.createOrReplace(meta, deletingExisting);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.handlers.KubernetesListHandler;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;

Expand All @@ -45,13 +45,13 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -285,29 +285,25 @@ public List<HasMetadata> createOrReplace() {
List<HasMetadata> result = new ArrayList<>();
for (HasMetadata meta : acceptVisitors(asHasMetadata(item, true), visitors)) {
ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
String namespaceToUse = meta.getMetadata().getNamespace();

String resourceVersion = KubernetesResourceUtil.getResourceVersion(meta);
try {
// Create
KubernetesResourceUtil.setResourceVersion(meta, null);
result.add(h.create(client, config, namespaceToUse, meta));
} catch (KubernetesClientException exception) {
if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

// Conflict; check deleteExisting flag otherwise replace
if (Boolean.TRUE.equals(deletingExisting)) {
Boolean deleted = h.delete(client, config, namespaceToUse, propagationPolicy, meta);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + meta);
String namespaceToUse = meta.getMetadata().getNamespace();
CreateOrReplaceHelper<HasMetadata> createOrReplaceHelper = new CreateOrReplaceHelper<>(
m -> h.create(client, config, namespaceToUse, m),
m -> h.replace(client, config, namespaceToUse, m),
m -> {
try {
return h.waitUntilCondition(client, config, namespaceToUse, m, Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
result.add(h.create(client, config, namespaceToUse, meta));
} else {
KubernetesResourceUtil.setResourceVersion(meta, resourceVersion);
result.add(h.replace(client, config, namespaceToUse, meta));
}
return null;
},
m -> h.delete(client, config, namespaceToUse, propagationPolicy, m),
m -> h.reload(client, config, namespaceToUse, m)
);

HasMetadata createdItem = createOrReplaceHelper.createOrReplace(meta, deletingExisting);
if (createdItem != null) {
result.add(createdItem);
}
}
return result;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClientException;

import java.net.HttpURLConnection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.UnaryOperator;

public class CreateOrReplaceHelper<T extends HasMetadata> {
public static final int CREATE_OR_REPLACE_RETRIES = 3;
private final UnaryOperator<T> createTask;
private final UnaryOperator<T> replaceTask;
private final UnaryOperator<T> waitTask;
private final Function<T, Boolean> deleteTask;
private final UnaryOperator<T> reloadTask;

public CreateOrReplaceHelper(UnaryOperator<T> createTask, UnaryOperator<T> replaceTask, UnaryOperator<T> waitTask, Function<T, Boolean> deleteTask, UnaryOperator<T> reloadTask) {
this.createTask = createTask;
this.replaceTask = replaceTask;
this.waitTask = waitTask;
this.deleteTask = deleteTask;
this.reloadTask = reloadTask;
}

public T createOrReplace(T item, boolean deletingExisting) {
String resourceVersion = KubernetesResourceUtil.getResourceVersion(item);
final CompletableFuture<T> future = new CompletableFuture<>();
int nTries = 0;
while (!future.isDone() && nTries < CREATE_OR_REPLACE_RETRIES) {
try {
// Create
KubernetesResourceUtil.setResourceVersion(item, null);
return createTask.apply(item);
} catch (KubernetesClientException exception) {
if (Utils.isHttpStatusCodeFromErrorEncounteredByServer(exception.getCode())) {
T itemFromServer = reloadTask.apply(item);
if (itemFromServer == null) {
waitTask.apply(item);
nTries++;
continue;
}
} else if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

future.complete(checkDeletingExistingOrReplace(item, deletingExisting, resourceVersion));
}
}
return future.join();
}

private T handleDeletingExistingAndCreate(T item) {
Boolean deleted = deleteTask.apply(item);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + item.getMetadata().getName());
}
return createTask.apply(item);
}

private T checkDeletingExistingOrReplace(T item, boolean deletingExisting, String resourceVersion) {
if (Boolean.TRUE.equals(deletingExisting)) {
return handleDeletingExistingAndCreate(item);
} else {
KubernetesResourceUtil.setResourceVersion(item, resourceVersion);
return replaceTask.apply(item);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.annotation.Annotation;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;
import java.net.URL;
import java.net.URLEncoder;
Expand Down Expand Up @@ -472,4 +473,11 @@ public static List<String> getCommandPlatformPrefix() {
private static String getOperatingSystemFromSystemProperty() {
return System.getProperty(OS_NAME);
}

public static boolean isHttpStatusCodeFromErrorEncounteredByServer(int code) {
return code == HttpURLConnection.HTTP_INTERNAL_ERROR ||
code == HttpURLConnection.HTTP_BAD_GATEWAY ||
code == HttpURLConnection.HTTP_UNAVAILABLE ||
code == HttpURLConnection.HTTP_VERSION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,13 @@
import io.fabric8.kubernetes.client.utils.URLUtils;
import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext;
import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl;

public class BaseOperationTest {
class BaseOperationTest {

@Test
public void testSimpleFieldQueryParamConcatenation() {
void testSimpleFieldQueryParamConcatenation() {
Map<String, String> fieldsMap = new HashMap<>();
fieldsMap.put("yesKey1", "yesValue1");
fieldsMap.put("yesKey2", "yesValue2");
Expand All @@ -68,7 +65,7 @@ public void testSimpleFieldQueryParamConcatenation() {
}

@Test
public void testSkippingFieldNotMatchingNullValues() {
void testSkippingFieldNotMatchingNullValues() {
final PodOperationsImpl operation = new PodOperationsImpl(new PodOperationContext());
operation
.withField("key1", "value1")
Expand All @@ -83,22 +80,22 @@ public void testSkippingFieldNotMatchingNullValues() {
}

@Test
public void testDefaultGracePeriod() {
void testDefaultGracePeriod() {
final BaseOperation operation = new BaseOperation(new OperationContext());

assertThat(operation.getGracePeriodSeconds(), is(-1L));
}

@Test
public void testChainingGracePeriodAndPropagationPolicy() {
void testChainingGracePeriodAndPropagationPolicy() {
final BaseOperation operation = new BaseOperation(new OperationContext());
EditReplacePatchDeletable<?, ?, ?, Boolean> operationWithPropagationPolicy = operation.withPropagationPolicy(DeletionPropagation.FOREGROUND);
assertThat(operationWithPropagationPolicy, is(notNullValue()));
assertNotNull(operationWithPropagationPolicy.withGracePeriod(10));
}

@Test
public void testListOptions() throws MalformedURLException {
void testListOptions() throws MalformedURLException {
// Given
URL url = new URL("https://172.17.0.2:8443/api/v1/namespaces/default/pods");
final BaseOperation<Pod, PodList, DoneablePod, Resource<Pod, DoneablePod>> operation = new BaseOperation<>(new OperationContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.junit.jupiter.api.Test;

import java.io.File;
import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -371,4 +372,13 @@ void testGetCommandPlatformPrefix() {
assertEquals("-c", commandPrefix.get(1));
}
}

@Test
void testIsHttpStatusCodeFromErrorEncounteredByServer() {
assertTrue(Utils.isHttpStatusCodeFromErrorEncounteredByServer(HttpURLConnection.HTTP_INTERNAL_ERROR));
assertTrue(Utils.isHttpStatusCodeFromErrorEncounteredByServer(HttpURLConnection.HTTP_BAD_GATEWAY));
assertTrue(Utils.isHttpStatusCodeFromErrorEncounteredByServer(HttpURLConnection.HTTP_UNAVAILABLE));
assertTrue(Utils.isHttpStatusCodeFromErrorEncounteredByServer(HttpURLConnection.HTTP_VERSION));
assertFalse(Utils.isHttpStatusCodeFromErrorEncounteredByServer(HttpURLConnection.HTTP_NOT_FOUND));
}
}
Loading

0 comments on commit 57838a0

Please sign in to comment.