Skip to content

Commit

Permalink
Changes in BaseOperation to retry createOrReplace on server failure
Browse files Browse the repository at this point in the history
+ Moved createOrReplace logic to CreateOrReplaceHelper class
+ Added tests
  • Loading branch information
rohanKanojia committed Nov 10, 2020
1 parent b6d9ba7 commit 2aaa79b
Show file tree
Hide file tree
Showing 14 changed files with 608 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package io.fabric8.kubernetes.client.dsl.base;

import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -71,7 +71,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -405,37 +404,22 @@ public final T createOrReplace(T... items) {

return withName(itemToCreateOrReplace.getMetadata().getName()).createOrReplace(itemToCreateOrReplace);
}

final CompletableFuture<T> future = new CompletableFuture<>();
while (!future.isDone()) {
try {
// Create
KubernetesResourceUtil.setResourceVersion(itemToCreateOrReplace, null);
future.complete(create(itemToCreateOrReplace));
} catch (KubernetesClientException exception) {
final T itemFromServer;
if (exception.getCode() == HttpURLConnection.HTTP_INTERNAL_ERROR) {
itemFromServer = fromServer().get();
if (itemFromServer == null) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
continue;
}
} else if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
} else {
itemFromServer = fromServer().get();
T finalItemToCreateOrReplace = itemToCreateOrReplace;
CreateOrReplaceHelper<T> createOrReplaceHelper = new CreateOrReplaceHelper<>(
this::create,
this::replace,
m -> {
try {
return waitUntilCondition(Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return null;
},
m -> fromServer().get()
);

// Conflict; Do Replace
KubernetesResourceUtil.setResourceVersion(itemToCreateOrReplace, KubernetesResourceUtil.getResourceVersion(itemFromServer));
future.complete(replace(itemToCreateOrReplace));
}
}
return future.join();
return createOrReplaceHelper.createOrReplace(finalItemToCreateOrReplace);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.Utils;

import java.net.HttpURLConnection;
import java.util.function.Predicate;

import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -57,6 +55,9 @@
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import okhttp3.OkHttpClient;

import static io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem;
import static io.fabric8.kubernetes.client.utils.DeleteAndCreateHelper.deleteAndCreateItem;

public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl extends OperationSupport implements
NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicable<HasMetadata, Boolean>,
Waitable<HasMetadata, HasMetadata>,
Expand Down Expand Up @@ -135,29 +136,10 @@ public HasMetadata createOrReplace() {
HasMetadata meta = acceptVisitors(asHasMetadata(item), visitors);
ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
String namespaceToUse = meta.getMetadata().getNamespace();

String resourceVersion = KubernetesResourceUtil.getResourceVersion(meta);
try {
// Create
KubernetesResourceUtil.setResourceVersion(meta, null);
return h.create(client, config, namespaceToUse, meta);
} catch (KubernetesClientException exception) {
if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

// Conflict; check deleteExisting flag otherwise replace
if (Boolean.TRUE.equals(deletingExisting)) {
Boolean deleted = h.delete(client, config, namespaceToUse, propagationPolicy, meta);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + meta);
}
return h.create(client, config, namespaceToUse, meta);
} else {
KubernetesResourceUtil.setResourceVersion(meta, resourceVersion);
return h.replace(client, config, namespaceToUse, meta);
}
if (Boolean.TRUE.equals(deletingExisting)) {
return deleteAndCreateItem(client, config, meta, h, namespaceToUse, propagationPolicy);
}
return createOrReplaceItem(client, config, meta, h, namespaceToUse);
}

@Override
Expand Down Expand Up @@ -327,5 +309,4 @@ private static <T> ResourceHandler handlerOf(T item) {
throw new IllegalArgumentException("Could not find a registered handler for item: [" + item + "].");
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import io.fabric8.kubernetes.client.handlers.KubernetesListHandler;
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;

Expand All @@ -45,7 +44,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -62,6 +60,9 @@
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;

import static io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper.createOrReplaceItem;
import static io.fabric8.kubernetes.client.utils.DeleteAndCreateHelper.deleteAndCreateItem;

public class NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl extends OperationSupport implements ParameterNamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata, Boolean>,
Waitable<List<HasMetadata>, HasMetadata>, Readiable {

Expand Down Expand Up @@ -285,29 +286,11 @@ public List<HasMetadata> createOrReplace() {
List<HasMetadata> result = new ArrayList<>();
for (HasMetadata meta : acceptVisitors(asHasMetadata(item, true), visitors)) {
ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h = handlerOf(meta);
String namespaceToUse = meta.getMetadata().getNamespace();
String namespaceToUse = meta.getMetadata().getNamespace();

String resourceVersion = KubernetesResourceUtil.getResourceVersion(meta);
try {
// Create
KubernetesResourceUtil.setResourceVersion(meta, null);
result.add(h.create(client, config, namespaceToUse, meta));
} catch (KubernetesClientException exception) {
if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

// Conflict; check deleteExisting flag otherwise replace
if (Boolean.TRUE.equals(deletingExisting)) {
Boolean deleted = h.delete(client, config, namespaceToUse, propagationPolicy, meta);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + meta);
}
result.add(h.create(client, config, namespaceToUse, meta));
} else {
KubernetesResourceUtil.setResourceVersion(meta, resourceVersion);
result.add(h.replace(client, config, namespaceToUse, meta));
}
HasMetadata createdItem = createOrReplaceOrDeleteExisting(meta, h, namespaceToUse);
if (createdItem != null) {
result.add(createdItem);
}
}
return result;
Expand Down Expand Up @@ -455,4 +438,12 @@ private static <T> ResourceHandler handlerOf(T item) {
throw new IllegalArgumentException("Could not find a registered handler for item: [" + item + "].");
}
}

private HasMetadata createOrReplaceOrDeleteExisting(HasMetadata meta, ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h, String namespaceToUse) {
if (Boolean.TRUE.equals(deletingExisting)) {
return deleteAndCreateItem(client, config, meta, h, namespaceToUse, propagationPolicy);
}
return createOrReplaceItem(client, config, meta, h, namespaceToUse);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.HasMetadataVisitiableBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.ResourceHandler;
import okhttp3.OkHttpClient;

import java.net.HttpURLConnection;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;

public class CreateOrReplaceHelper<T extends HasMetadata> {
public static final int CREATE_OR_REPLACE_RETRIES = 3;
private final UnaryOperator<T> createTask;
private final UnaryOperator<T> replaceTask;
private final UnaryOperator<T> waitTask;
private final UnaryOperator<T> reloadTask;

public CreateOrReplaceHelper(UnaryOperator<T> createTask, UnaryOperator<T> replaceTask, UnaryOperator<T> waitTask, UnaryOperator<T> reloadTask) {
this.createTask = createTask;
this.replaceTask = replaceTask;
this.waitTask = waitTask;
this.reloadTask = reloadTask;
}

public T createOrReplace(T item) {
String resourceVersion = KubernetesResourceUtil.getResourceVersion(item);
final CompletableFuture<T> future = new CompletableFuture<>();
int nTries = 0;
while (!future.isDone() && nTries < CREATE_OR_REPLACE_RETRIES) {
try {
// Create
KubernetesResourceUtil.setResourceVersion(item, null);
return createTask.apply(item);
} catch (KubernetesClientException exception) {
if (shouldRetry(exception.getCode())) {
T itemFromServer = reloadTask.apply(item);
if (itemFromServer == null) {
waitTask.apply(item);
nTries++;
continue;
}
} else if (exception.getCode() != HttpURLConnection.HTTP_CONFLICT) {
throw exception;
}

future.complete(replace(item, resourceVersion));
}
}
return future.join();
}

public static HasMetadata createOrReplaceItem(OkHttpClient client, Config config, HasMetadata meta, ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h, String namespaceToUse) {
CreateOrReplaceHelper<HasMetadata> createOrReplaceHelper = new CreateOrReplaceHelper<>(
m -> h.create(client, config, namespaceToUse, m),
m -> h.replace(client, config, namespaceToUse, m),
m -> {
try {
return h.waitUntilCondition(client, config, namespaceToUse, m, Objects::nonNull, 1, TimeUnit.SECONDS);
} catch (InterruptedException interruptedException) {
interruptedException.printStackTrace();
}
return null;
},
m -> h.reload(client, config, namespaceToUse, m)
);

return createOrReplaceHelper.createOrReplace(meta);
}

private T replace(T item, String resourceVersion) {
KubernetesResourceUtil.setResourceVersion(item, resourceVersion);
return replaceTask.apply(item);
}

private boolean shouldRetry(int responseCode) {
return responseCode > 499;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.utils;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.HasMetadataVisitiableBuilder;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.ResourceHandler;
import okhttp3.OkHttpClient;

import java.util.function.Function;
import java.util.function.UnaryOperator;

public class DeleteAndCreateHelper<T extends HasMetadata> {
private final UnaryOperator<T> createTask;
private final Function<T, Boolean> deleteTask;

public DeleteAndCreateHelper(UnaryOperator<T> createTask, Function<T, Boolean> deleteTask) {
this.createTask = createTask;
this.deleteTask = deleteTask;
}

public T deleteAndCreate(T item) {
Boolean deleted = deleteTask.apply(item);
if (Boolean.FALSE.equals(deleted)) {
throw new KubernetesClientException("Failed to delete existing item:" + item.getMetadata().getName());
}
return createTask.apply(item);
}

public static HasMetadata deleteAndCreateItem(OkHttpClient client, Config config, HasMetadata meta, ResourceHandler<HasMetadata, HasMetadataVisitiableBuilder> h, String namespaceToUse, DeletionPropagation propagationPolicy) {
DeleteAndCreateHelper<HasMetadata> deleteAndCreateHelper = new DeleteAndCreateHelper<>(
m -> h.create(client, config, namespaceToUse, m),
m -> h.delete(client, config, namespaceToUse, propagationPolicy, m)
);

return deleteAndCreateHelper.deleteAndCreate(meta);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,5 @@ public static List<String> getCommandPlatformPrefix() {
private static String getOperatingSystemFromSystemProperty() {
return System.getProperty(OS_NAME);
}

}
Loading

0 comments on commit 2aaa79b

Please sign in to comment.