Skip to content

Commit

Permalink
fix: kubeapi watch updates, allow configurable cidr (#1075)
Browse files Browse the repository at this point in the history
## Description

This PR contains two changes, both aimed at providing fixes for
lingering issues with the KubeAPI watch:
1. NetworkPolicy updates based on changes to KubeAPI endpoints have
never actually run as expected. The label we use to select existing
KubeAPI network policies was never actually applied to policies in the
first place. Previously we applied a `uds/generated` label but selected
on `uds.dev/generated`, so these never lined up. Additionally our apply
would have failed due to the existence of managed fields on the object.
This has been the main cause of the problem with our auto-update logic.
Pepr watcher restarts fixed the network policies not because of watch
fixes, but because we re-reconcile all packages on startup.
2. While the watch does appear to be stable, this PR additionally adds a
config option to manually set a CIDR to use instead of relying on the
watch. This could be useful in some clusters (such as EKS) where the
controlplane IPs update frequently to reduce churn on network policy
modifications.

## Related Issue

Fixes #821

## Type of change

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Other (security config, docs update, etc)

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor
Guide](https://github.com/defenseunicorns/uds-template-capability/blob/main/CONTRIBUTING.md)
followed
mjnagel authored Dec 5, 2024
1 parent 2cb4181 commit 3285908
Showing 9 changed files with 483 additions and 53 deletions.
23 changes: 23 additions & 0 deletions docs/reference/configuration/uds-networking-configuration.md
Original file line number Diff line number Diff line change
@@ -2,6 +2,29 @@
title: Networking Configuration
---

## KubeAPI Egress

The UDS operator is responsible for dynamically updating network policies that use the `remoteGenerated: KubeAPI` custom selector, in response to changes in the Kubernetes API server’s IP address. This ensures that policies remain accurate as cluster configurations evolve. However, in environments where the API server IP(s) frequently change, this behavior can lead to unnecessary overhead or instability.

To address this, the UDS operator provides an option to configure a static CIDR range. This approach eliminates the need for continuous updates by using a predefined range of IP addresses for network policies. To configure a specific CIDR range, set an override to `operator.KUBEAPI_CIDR` in your bundle as a value or variable. For example:

```yaml
packages:
- name: uds-core
repository: ghcr.io/defenseunicorns/packages/uds/core
ref: x.x.x
overrides:
uds-operator-config:
uds-operator-config:
values:
- path: operator.KUBEAPI_CIDR
value: "172.0.0.0/24"
```
This configuration directs the operator to use the specified CIDR range (`172.0.0.0/24` in this case) for KubeAPI network policies instead of dynamically tracking the API server’s IP(s).

When configuring a static CIDR range, it is important to make the range as restrictive as possible to limit the potential for unexpected networking access. An overly broad range could inadvertently allow egress traffic to destinations beyond the intended scope. Additionally, careful alignment with the actual IP addresses used by the Kubernetes API server is essential. A mismatch between the specified CIDR range and the cluster's configuration can result in network policy enforcement issues or disrupted connectivity.

## Additional Network Allowances

Applications deployed in UDS Core utilize [Network Policies](https://kubernetes.io/docs/concepts/services-networking/network-policies/) with a "Deny by Default" configuration to ensure network traffic is restricted to only what is necessary. Some applications in UDS Core allow for overrides to accommodate environment-specific requirements.
3 changes: 3 additions & 0 deletions src/pepr/config.ts
Original file line number Diff line number Diff line change
@@ -31,6 +31,9 @@ export const UDSConfig = {
// Redis URI for Authservice
authserviceRedisUri,

// Static CIDR range to use for KubeAPI instead of k8s watch
kubeApiCidr: process.env.KUBEAPI_CIDR,

// Track if UDS Core identity-authorization layer is deployed
isIdentityDeployed: false,
};
5 changes: 5 additions & 0 deletions src/pepr/operator/controllers/network/generate.ts
Original file line number Diff line number Diff line change
@@ -93,6 +93,11 @@ export function generate(namespace: string, policy: Allow): kind.NetworkPolicy {
};
}

// Add the generated policy label (used to track KubeAPI policies)
if (policy.remoteGenerated) {
generated.metadata!.labels!["uds/generated"] = policy.remoteGenerated;
}

// Create the network policy peers
const peers: V1NetworkPolicyPeer[] = getPeers(policy);

261 changes: 261 additions & 0 deletions src/pepr/operator/controllers/network/generators/kubeAPI.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
/**
* Copyright 2024 Defense Unicorns
* SPDX-License-Identifier: AGPL-3.0-or-later OR LicenseRef-Defense-Unicorns-Commercial
*/

import { beforeEach, describe, expect, it, jest } from "@jest/globals";
import { K8s, kind } from "pepr";
import { updateAPIServerCIDR } from "./kubeAPI";

type KubernetesList<T> = {
items: T[];
};

jest.mock("pepr", () => {
const originalModule = jest.requireActual("pepr") as object;
return {
...originalModule,
K8s: jest.fn(),
};
});

describe("updateAPIServerCIDR", () => {
const mockApply = jest.fn();
const mockGet = jest.fn<() => Promise<KubernetesList<kind.NetworkPolicy>>>();

beforeEach(() => {
jest.clearAllMocks();
(K8s as jest.Mock).mockImplementation(() => ({
WithLabel: jest.fn(() => ({
Get: mockGet,
})),
Apply: mockApply,
}));
});

it("handles a static CIDR string", async () => {
const mockService = {
spec: {
clusterIP: "10.0.0.1",
},
} as kind.Service;

const staticCIDR = "192.168.1.0/24";

// Mock the return of `Get` method
mockGet.mockResolvedValue({
items: [
{
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "0.0.0.0/0" } }],
},
],
},
},
],
} as KubernetesList<kind.NetworkPolicy>);

await updateAPIServerCIDR(mockService, staticCIDR);

expect(mockGet).toHaveBeenCalledWith();
expect(mockApply).toHaveBeenCalledWith(
expect.objectContaining({
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: staticCIDR } }, { ipBlock: { cidr: "10.0.0.1/32" } }],
},
],
},
}),
{ force: true }, // Include the second argument in the call
);
});

it("handles an EndpointSlice with multiple endpoints", async () => {
const mockService = {
spec: {
clusterIP: "10.0.0.1",
},
} as kind.Service;

const mockSlice = {
endpoints: [{ addresses: ["192.168.1.2"] }, { addresses: ["192.168.1.3"] }],
} as kind.EndpointSlice;

// Mock the return of `Get` method
mockGet.mockResolvedValue({
items: [
{
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "0.0.0.0/0" } }],
},
],
},
},
],
} as KubernetesList<kind.NetworkPolicy>);

await updateAPIServerCIDR(mockService, mockSlice);

expect(mockGet).toHaveBeenCalledWith();
expect(mockApply).toHaveBeenCalledWith(
expect.objectContaining({
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [
{ ipBlock: { cidr: "192.168.1.2/32" } },
{ ipBlock: { cidr: "192.168.1.3/32" } },
{ ipBlock: { cidr: "10.0.0.1/32" } },
],
},
],
},
}),
{ force: true }, // Include the second argument in the call
);
});

it("handles an empty EndpointSlice", async () => {
const mockService = {
spec: {
clusterIP: "10.0.0.1",
},
} as kind.Service;

const mockSlice = {
endpoints: [{}],
} as kind.EndpointSlice;

// Mock the return of `Get` method
mockGet.mockResolvedValue({
items: [
{
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "0.0.0.0/0" } }],
},
],
},
},
],
} as KubernetesList<kind.NetworkPolicy>);

await updateAPIServerCIDR(mockService, mockSlice);

expect(mockGet).toHaveBeenCalledWith();
expect(mockApply).toHaveBeenCalledWith(
expect.objectContaining({
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "10.0.0.1/32" } }],
},
],
},
}),
{ force: true }, // Include the second argument in the call
);
});

it("handles a Service with missing clusterIP", async () => {
const mockService = {
spec: {},
} as kind.Service;

const mockSlice = {
endpoints: [{ addresses: ["192.168.1.2"] }],
} as kind.EndpointSlice;

// Mock the return of `Get` method
mockGet.mockResolvedValue({
items: [
{
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "0.0.0.0/0" } }],
},
],
},
},
],
} as KubernetesList<kind.NetworkPolicy>);

await updateAPIServerCIDR(mockService, mockSlice);

expect(mockGet).toHaveBeenCalledWith();
expect(mockApply).toHaveBeenCalledWith(
expect.objectContaining({
metadata: {
name: "mock-netpol",
namespace: "default",
},
spec: {
egress: [
{
to: [{ ipBlock: { cidr: "192.168.1.2/32" } }],
},
],
},
}),
{ force: true }, // Include the second argument in the call
);
});

it("handles no matching NetworkPolicies", async () => {
const mockService = {
spec: {
clusterIP: "10.0.0.1",
},
} as kind.Service;

const mockSlice = {
endpoints: [{ addresses: ["192.168.1.2"] }],
} as kind.EndpointSlice;

// Mock the return of `Get` method to return no items
mockGet.mockResolvedValue({
items: [],
} as KubernetesList<kind.NetworkPolicy>);

await updateAPIServerCIDR(mockService, mockSlice);

expect(mockGet).toHaveBeenCalledWith();
expect(mockApply).not.toHaveBeenCalled();
});
});
171 changes: 125 additions & 46 deletions src/pepr/operator/controllers/network/generators/kubeAPI.ts
Original file line number Diff line number Diff line change
@@ -6,8 +6,10 @@
import { V1NetworkPolicyPeer } from "@kubernetes/client-node";
import { K8s, kind, R } from "pepr";

import { UDSConfig } from "../../../../config";
import { Component, setupLogger } from "../../../../logger";
import { RemoteGenerated } from "../../../crd";
import { retryWithDelay } from "../../utils";
import { anywhere } from "./anywhere";

// configure subproject logger
@@ -17,19 +19,33 @@ const log = setupLogger(Component.OPERATOR_GENERATORS);
let apiServerPeers: V1NetworkPolicyPeer[];

/**
* Initialize the API server CIDR by getting the EndpointSlice and Service for the API server
* Initialize the API server CIDR.
*
* This function checks if a static CIDR is defined in the configuration.
* If a static CIDR exists, it skips the EndpointSlice lookup and uses the static value.
* Otherwise, it fetches the EndpointSlice and updates the CIDR dynamically.
*/
export async function initAPIServerCIDR() {
const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes");
await updateAPIServerCIDR(slice, svc);
const svc = await retryWithDelay(fetchKubernetesService, log);

// If static CIDR is defined, pass it directly
if (UDSConfig.kubeApiCidr) {
log.info(
`Static CIDR (${UDSConfig.kubeApiCidr}) is defined for KubeAPI, skipping EndpointSlice lookup.`,
);
await updateAPIServerCIDR(svc, UDSConfig.kubeApiCidr); // Pass static CIDR
} else {
const slice = await retryWithDelay(fetchKubernetesEndpointSlice, log);
await updateAPIServerCIDR(svc, slice);
}
}

/**
* Get the API server CIDR
* @returns The API server CIDR
* Get the API server CIDR.
*
* @returns {V1NetworkPolicyPeer[]} The cached API server CIDR if available; otherwise, defaults to `0.0.0.0/0`.
*/
export function kubeAPI() {
export function kubeAPI(): V1NetworkPolicyPeer[] {
// If the API server peers are already cached, return them
if (apiServerPeers) {
return apiServerPeers;
@@ -41,82 +57,145 @@ export function kubeAPI() {
}

/**
* When the kubernetes EndpointSlice is created or updated, update the API server CIDR
* @param slice The EndpointSlice for the API server
* When the Kubernetes EndpointSlice is created or updated, update the API server CIDR.
*
* @param {kind.EndpointSlice} slice - The EndpointSlice object for the API server.
*/
export async function updateAPIServerCIDRFromEndpointSlice(slice: kind.EndpointSlice) {
try {
log.debug(
"Processing watch for endpointslices, getting k8s service for updating API server CIDR",
);
const svc = await K8s(kind.Service).InNamespace("default").Get("kubernetes");
await updateAPIServerCIDR(slice, svc);
const svc = await retryWithDelay(fetchKubernetesService, log);
await updateAPIServerCIDR(svc, slice);
} catch (err) {
const msg = "Failed to update network policies from endpoint slice watch";
log.error({ err }, msg);
}
}

/**
* When the kubernetes Service is created or updated, update the API server CIDR
* @param svc The Service for the API server
* When the Kubernetes Service is created or updated, update the API server CIDR.
*
* If a static CIDR is defined, it skips fetching the EndpointSlice and uses the static value.
*
* @param {kind.Service} svc - The Service object for the API server.
*/
export async function updateAPIServerCIDRFromService(svc: kind.Service) {
try {
log.debug(
"Processing watch for api service, getting endpoint slices for updating API server CIDR",
);
const slice = await K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
await updateAPIServerCIDR(slice, svc);
if (UDSConfig.kubeApiCidr) {
log.debug("Processing watch for api service, using configured API CIDR for endpoints");
await updateAPIServerCIDR(svc, UDSConfig.kubeApiCidr);
} else {
log.debug(
"Processing watch for api service, getting endpoint slices for updating API server CIDR",
);
const slice = await retryWithDelay(fetchKubernetesEndpointSlice, log);
await updateAPIServerCIDR(svc, slice);
}
} catch (err) {
const msg = "Failed to update network policies from api service watch";
const msg = "Failed to update network policies from API service watch";
log.error({ err }, msg);
}
}

/**
* Update the API server CIDR and update the NetworkPolicies
* Update the API server CIDR and apply it to the NetworkPolicies.
*
* @param slice The EndpointSlice for the API server
* @param svc The Service for the API server
* @param {kind.Service} svc - The Service object representing the Kubernetes API server.
* @param {kind.EndpointSlice | string} slice - Either the EndpointSlice for dynamic CIDR generation or a static CIDR string.
*/
export async function updateAPIServerCIDR(slice: kind.EndpointSlice, svc: kind.Service) {
const { endpoints } = slice;
export async function updateAPIServerCIDR(svc: kind.Service, slice: kind.EndpointSlice | string) {
const k8sApiIP = svc.spec?.clusterIP;

// Flatten the endpoints into a list of IPs
const peers = endpoints?.flatMap(e => e.addresses);
let peers: string[] = [];

// Handle static CIDR or dynamic EndpointSlice
if (typeof slice === "string") {
peers.push(slice);
} else {
const { endpoints } = slice;
peers = Array.isArray(endpoints)
? endpoints.flatMap(e => {
if (!Array.isArray(e?.addresses) || e.addresses.length === 0) {
return []; // No addresses, skip this endpoint
}
return e.addresses.map(addr => `${addr}/32`); // Add /32 to each address
})
: [];
}

// Add the clusterIP from the service
if (k8sApiIP) {
peers?.push(k8sApiIP);
peers.push(`${k8sApiIP}/32`);
}

// If the peers are found, cache and process them
if (peers?.length) {
apiServerPeers = peers.flatMap(ip => ({
// Convert peers into NetworkPolicyPeer objects
if (peers.length) {
apiServerPeers = peers.flatMap(cidr => ({
ipBlock: {
cidr: `${ip}/32`,
cidr: cidr,
},
}));

// Get all the KubeAPI NetworkPolicies
const netPols = await K8s(kind.NetworkPolicy)
.WithLabel("uds.dev/generated", RemoteGenerated.KubeAPI)
.Get();

for (const netPol of netPols.items) {
// Get the old peers
const oldPeers = netPol.spec?.egress?.[0].to;
// Update NetworkPolicies
await updateKubeAPINetworkPolicies(apiServerPeers);
} else {
log.warn("No peers found for the API server CIDR update.");
}
}

// Update the NetworkPolicy if the peers have changed
if (!R.equals(oldPeers, apiServerPeers)) {
// Note using the apiServerPeers variable here instead of the oldPeers variable
// in case another EndpointSlice is updated before this one
netPol.spec!.egress![0].to = apiServerPeers;
/**
* Update NetworkPolicies with new API server peers.
*
* @param {V1NetworkPolicyPeer[]} newPeers - The updated list of peers to apply to the NetworkPolicies.
*/
export async function updateKubeAPINetworkPolicies(newPeers: V1NetworkPolicyPeer[]) {
const netPols = await K8s(kind.NetworkPolicy)
.WithLabel("uds/generated", RemoteGenerated.KubeAPI)
.Get();

for (const netPol of netPols.items) {
const oldPeers = netPol.spec?.egress?.[0].to;

if (!R.equals(oldPeers, newPeers)) {
netPol.spec!.egress![0].to = newPeers;
if (netPol.metadata) {
// Remove managed fields to prevent errors on server side apply
netPol.metadata.managedFields = undefined;
}

log.debug(`Updating ${netPol.metadata!.namespace}/${netPol.metadata!.name}`);
await K8s(kind.NetworkPolicy).Apply(netPol);
log.debug(
`Updating KubeAPI NetworkPolicy ${netPol.metadata!.namespace}/${netPol.metadata!.name} with new CIDRs.`,
);
try {
await K8s(kind.NetworkPolicy).Apply(netPol, { force: true });
} catch (err) {
let message = err.data?.message || "Unknown error while applying KubeAPI network policies";
if (UDSConfig.kubeApiCidr) {
message +=
", ensure that the KUBEAPI_CIDR override configured for the operator is correct.";
}
throw new Error(message);
}
}
}
}

/**
* Fetches the Kubernetes Service object for the API server.
*
* @returns {Promise<kind.Service>} - The Service object.
*/
async function fetchKubernetesService(): Promise<kind.Service> {
return K8s(kind.Service).InNamespace("default").Get("kubernetes");
}

/**
* Fetches the Kubernetes EndpointSlice object for the API server.
*
* @returns {Promise<kind.EndpointSlice>} - The EndpointSlice object.
*/
async function fetchKubernetesEndpointSlice(): Promise<kind.EndpointSlice> {
return K8s(kind.EndpointSlice).InNamespace("default").Get("kubernetes");
}
17 changes: 15 additions & 2 deletions src/pepr/operator/controllers/network/policies.ts
Original file line number Diff line number Diff line change
@@ -5,8 +5,9 @@

import { K8s, kind } from "pepr";

import { UDSConfig } from "../../../config";
import { Component, setupLogger } from "../../../logger";
import { Allow, Direction, Gateway, UDSPackage } from "../../crd";
import { Allow, Direction, Gateway, RemoteGenerated, UDSPackage } from "../../crd";
import { getOwnerRef, purgeOrphans, sanitizeResourceName } from "../utils";
import { allowEgressDNS } from "./defaults/allow-egress-dns";
import { allowEgressIstiod } from "./defaults/allow-egress-istiod";
@@ -148,7 +149,19 @@ export async function networkPolicies(pkg: UDSPackage, namespace: string) {
policy.metadata.ownerReferences = getOwnerRef(pkg);

// Apply the NetworkPolicy and force overwrite any existing policy
await K8s(kind.NetworkPolicy).Apply(policy, { force: true });
try {
await K8s(kind.NetworkPolicy).Apply(policy, { force: true });
} catch (err) {
let message = err.data?.message || "Unknown error while applying network policies";
if (
UDSConfig.kubeApiCidr &&
policy.metadata.labels["uds/generated"] === RemoteGenerated.KubeAPI
) {
message +=
", ensure that the KUBEAPI_CIDR override configured for the operator is correct.";
}
throw new Error(message);
}
}

await purgeOrphans(generation, namespace, pkgName, kind.NetworkPolicy, log);
42 changes: 42 additions & 0 deletions src/pepr/operator/controllers/utils.ts
Original file line number Diff line number Diff line change
@@ -73,3 +73,45 @@ export async function purgeOrphans<T extends GenericClass>(
}
}
}

/**
* Lightweight retry helper with a delay between attempts.
*
* @param {() => Promise<T>} fn - The async function to retry.
* @param {Logger} log - Logger instance for logging debug messages.
* @param {number} retries - Number of retry attempts.
* @param {number} delayMs - Delay in milliseconds between attempts.
* @returns {Promise<T>} - The result of the function if successful.
* @throws {Error} - Throws an error after exhausting retries.
*/
export async function retryWithDelay<T>(
fn: () => Promise<T>,
log: Logger,
retries = 5,
delayMs = 2000,
): Promise<T> {
let attempt = 0;
while (attempt < retries) {
try {
return await fn();
} catch (err) {
attempt++;
if (attempt >= retries) {
throw err; // Exceeded retries, rethrow the error.
}
let error = `${JSON.stringify(err)}`;
// Error responses from network calls (i.e. K8s().Get() will be this shape)
if (err.data?.message) {
error = err.data.message;
// Other error types have a message
} else if (err.message) {
error = err.message;
}
log.warn(`Attempt ${attempt} of ${fn.name} failed, retrying in ${delayMs}ms.`, { error });
await new Promise(resolve => setTimeout(resolve, delayMs));
}
}

// This line should never be reached, but TypeScript wants it for safety.
throw new Error("Retry loop exited unexpectedly without returning.");
}
13 changes: 8 additions & 5 deletions src/pepr/operator/index.ts
Original file line number Diff line number Diff line change
@@ -34,11 +34,14 @@ const log = setupLogger(Component.OPERATOR);
void initAPIServerCIDR();

// Watch for changes to the API server EndpointSlice and update the API server CIDR
When(a.EndpointSlice)
.IsCreatedOrUpdated()
.InNamespace("default")
.WithName("kubernetes")
.Reconcile(updateAPIServerCIDRFromEndpointSlice);
// Skip if a CIDR is defined in the UDS Config
if (!UDSConfig.kubeApiCidr) {
When(a.EndpointSlice)
.IsCreatedOrUpdated()
.InNamespace("default")
.WithName("kubernetes")
.Reconcile(updateAPIServerCIDRFromEndpointSlice);
}

// Watch for changes to the API server Service and update the API server CIDR
When(a.Service)
1 change: 1 addition & 0 deletions src/pepr/uds-operator-config/values.yaml
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ operator:
UDS_ALLOW_ALL_NS_EXEMPTIONS: "###ZARF_VAR_ALLOW_ALL_NS_EXEMPTIONS###"
UDS_LOG_LEVEL: "###ZARF_VAR_UDS_LOG_LEVEL###"
AUTHSERVICE_REDIS_URI: "###ZARF_VAR_AUTHSERVICE_REDIS_URI###"
KUBEAPI_CIDR: ""
# Allow Pepr watch to be configurable to react to dropped connections faster
PEPR_LAST_SEEN_LIMIT_SECONDS: "300"
# Allow Pepr to re-list resources more frequently to avoid missing resources

0 comments on commit 3285908

Please sign in to comment.