Skip to content

Commit

Permalink
fix: add retries + delay on k8s gets to avoid api instability
Browse files Browse the repository at this point in the history
  • Loading branch information
mjnagel committed Dec 5, 2024
1 parent 7e1441d commit 4465b80
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 4 deletions.
27 changes: 23 additions & 4 deletions src/pepr/operator/controllers/network/generators/kubeAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { K8s, kind, R } from "pepr";
import { UDSConfig } from "../../../../config";
import { Component, setupLogger } from "../../../../logger";
import { RemoteGenerated } from "../../../crd";
import { retryWithDelay } from "../../utils";
import { anywhere } from "./anywhere";

// configure subproject logger
Expand All @@ -25,7 +26,7 @@ let apiServerPeers: V1NetworkPolicyPeer[];
* Otherwise, it fetches the EndpointSlice and updates the CIDR dynamically.
*/
export async function initAPIServerCIDR() {
const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes");
const svc = await retryWithDelay(fetchKubernetesService, log);

// If static CIDR is defined, pass it directly
if (UDSConfig.kubeApiCidr) {
Expand All @@ -34,7 +35,7 @@ export async function initAPIServerCIDR() {
);
await updateAPIServerCIDR(svc, UDSConfig.kubeApiCidr); // Pass static CIDR
} else {
const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
const slice = await retryWithDelay(fetchKubernetesEndpointSlice, log);
await updateAPIServerCIDR(svc, slice);
}
}
Expand Down Expand Up @@ -65,7 +66,7 @@ export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointS
log.debug(
"Processing watch for endpointslices, getting k8s service for updating API server CIDR",
);
const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes");
const svc = await retryWithDelay(fetchKubernetesService, log);
await updateAPIServerCIDR(svc, slice);
} catch (err) {
const msg = "Failed to update network policies from endpoint slice watch";
Expand All @@ -89,7 +90,7 @@ export async function updateAPIServerCIDRFromService(svc: kind.Service) {
log.debug(
"Processing watch for api service, getting endpoint slices for updating API server CIDR",
);
const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
const slice = await retryWithDelay(fetchKubernetesEndpointSlice, log);
await updateAPIServerCIDR(svc, slice);
}
} catch (err) {
Expand Down Expand Up @@ -180,3 +181,21 @@ export async function updateKubeAPINetworkPolicies(newPeers: V1NetworkPolicyPeer
}
}
}

/**
* Fetches the Kubernetes Service object for the API server.
*
* @returns {Promise<kind.Service>} - The Service object.
*/
async function fetchKubernetesService(): Promise<kind.Service> {
return K8s(kind.Service).InNamespace("default").Get("kubernetes");
}

/**
* Fetches the Kubernetes EndpointSlice object for the API server.
*
* @returns {Promise<kind.EndpointSlice>} - The EndpointSlice object.
*/
async function fetchKubernetesEndpointSlice(): Promise<kind.EndpointSlice> {
return K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
}
36 changes: 36 additions & 0 deletions src/pepr/operator/controllers/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,39 @@ export async function purgeOrphans<T extends GenericClass>(
}
}
}

/**
* Lightweight retry helper with a delay between attempts.
*
* @param {() => Promise<T>} fn - The async function to retry.
* @param {Logger} log - Logger instance for logging debug messages.
* @param {number} retries - Number of retry attempts.
* @param {number} delayMs - Delay in milliseconds between attempts.
* @returns {Promise<T>} - The result of the function if successful.
* @throws {Error} - Throws an error after exhausting retries.
*/
export async function retryWithDelay<T>(
fn: () => Promise<T>,
log: Logger,
retries = 5,
delayMs = 2000,
): Promise<T> {
let attempt = 0;
while (attempt < retries) {
try {
return await fn();
} catch (err) {
attempt++;
if (attempt >= retries) {
throw err; // Exceeded retries, rethrow the error.
}
log.warn(`Attempt ${attempt} of ${fn.name} failed, retrying in ${delayMs}ms...`, {
error: (err as Error).message,
});
await new Promise(resolve => setTimeout(resolve, delayMs));
}
}

// This line should never be reached, but TypeScript wants it for safety.
throw new Error("Retry loop exited unexpectedly without returning.");
}

0 comments on commit 4465b80

Please sign in to comment.