Skip to content

Commit

Permalink
Merge pull request #67 from terassyi/refactor-block-allocation
Browse files Browse the repository at this point in the history
Refactor address block allocation logic
  • Loading branch information
terassyi authored Apr 9, 2024
2 parents 690f1f4 + bdb399e commit 1c3959d
Show file tree
Hide file tree
Showing 38 changed files with 1,079 additions and 1,172 deletions.
11 changes: 6 additions & 5 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@ ARG RUST_VERSION=1.76.0

FROM --platform=$BUILDPLATFORM rust:${RUST_VERSION} as builder

WORKDIR /home
COPY ./sartd /home/sartd
COPY ./sart /home/sart
COPY ./sartcni /home/sartcni
COPY ./proto /home/proto

RUN apt update -y && \
apt install -y protobuf-compiler libprotobuf-dev clang llvm mold gcc-multilib
Expand All @@ -30,6 +25,12 @@ RUN case "$TARGETPLATFORM" in \

RUN rustup target add $(cat /rust_target.txt)

WORKDIR /home
COPY ./sartd /home/sartd
COPY ./sart /home/sart
COPY ./sartcni /home/sartcni
COPY ./proto /home/proto

RUN cd sartd; cargo build --release --target $(cat /rust_target.txt) && \
cp /home/sartd/target/$(cat /rust_target.txt)/release/sartd /usr/local/bin/sartd && \
cargo build --release --bin cni-installer --target $(cat /rust_target.txt) && \
Expand Down
2 changes: 1 addition & 1 deletion docs/design.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ Sart is Kubernetes network load-balancer and CNI plugin for Kubernetes using BGP
This project is inspired by [Metallb](https://github.com/metallb/metallb) and [Coil](https://github.com/cybozu-go/coil).

> [!WARNING]
> CNI feature is not implemented yet.
> This project is under experimental.
## Programs

Expand Down
12 changes: 6 additions & 6 deletions manifests/lb/sample/lb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ spec:
spec:
containers:
- name: app
image: nginx:latest
image: ghcr.io/terassyi/test-server:0.1.2
ports:
- containerPort: 80
---
Expand All @@ -43,7 +43,7 @@ spec:
spec:
containers:
- name: app
image: nginx:latest
image: ghcr.io/terassyi/test-server:0.1.2
ports:
- containerPort: 80
---
Expand All @@ -64,7 +64,7 @@ spec:
spec:
containers:
- name: app
image: nginx:latest
image: ghcr.io/terassyi/test-server:0.1.2
ports:
- containerPort: 80
---
Expand All @@ -83,7 +83,7 @@ spec:
ports:
- name: http
port: 80
targetPort: 80
targetPort: 8080
---
# LoadBalancer Service
apiVersion: v1
Expand All @@ -101,7 +101,7 @@ spec:
ports:
- name: http
port: 80
targetPort: 80
targetPort: 8080
---
# LoadBalancer Service
apiVersion: v1
Expand All @@ -120,7 +120,7 @@ spec:
ports:
- name: http
port: 80
targetPort: 80
targetPort: 8080
---
apiVersion: v1
kind: Pod
Expand Down
91 changes: 90 additions & 1 deletion sartd/src/ipam/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::{
collections::HashMap,
collections::{BTreeMap, HashMap},
net::IpAddr,
sync::{Arc, Mutex},
vec,
};

use ipnet::IpNet;

use crate::bitset::BitSet;

use super::{
allocator::{Allocator, AllocatorMethod},
error::Error,
Expand Down Expand Up @@ -158,3 +160,90 @@ impl AllocationInfo {
self.blocks.get(block)
}
}

#[derive(Debug, Default, Clone)]
pub struct BlockAllocator {
pub inner: Arc<Mutex<BlockAllocatorInner>>,
}

impl BlockAllocator {
pub fn new(pool_map: HashMap<String, Pool>) -> BlockAllocator {
BlockAllocator {
inner: Arc::new(Mutex::new(BlockAllocatorInner { pools: pool_map })),
}
}
}

#[derive(Debug, Default)]
pub struct BlockAllocatorInner {
pools: HashMap<String, Pool>,
}

impl BlockAllocatorInner {
pub fn insert(&mut self, name: &str, pool: Pool) {
self.pools.insert(name.to_string(), pool);
}

pub fn get(&self, name: &str) -> Option<&Pool> {
self.pools.get(name)
}

pub fn get_mut(&mut self, name: &str) -> Option<&mut Pool> {
self.pools.get_mut(name)
}

pub fn release(&mut self, name: &str) {
self.pools.remove(name);
}
}

#[derive(Debug)]
pub struct Pool {
pub cidr: IpNet,
pub size: u32,
pub block_size: u32,
pub allocator: BitSet,
pub blocks: BTreeMap<String, u32>,
pub counter: u32,
}

impl Pool {
pub fn new(cidr: &IpNet, block_size: u32) -> Pool {
let prefix_len = cidr.prefix_len();
let bitset_size = 2u128.pow(block_size - (prefix_len as u32));
Pool {
cidr: *cidr,
size: prefix_len as u32,
block_size,
allocator: BitSet::new(bitset_size),
blocks: BTreeMap::new(),
counter: 0,
}
}

pub fn allocate(&mut self, name: &str) -> Result<u128, Error> {
let index = self.allocator.set_next().map_err(Error::BitSet)?;
self.blocks.insert(name.to_string(), index as u32);
self.counter += 1;
Ok(index)
}

// for recovering allocations
pub fn allocate_with(&mut self, index: u128, name: &str) -> Result<u128, Error> {
let index = self.allocator.set(index, true).map_err(Error::BitSet)?;
self.blocks.insert(name.to_string(), index as u32);
Ok(index)
}

pub fn release(&mut self, index: u32) -> Result<(), Error> {
self.allocator
.set(index as u128, false)
.map_err(Error::BitSet)?;
let mut name = String::new();
if let Some((n, _)) = self.blocks.iter_mut().find(|(_, i)| index == **i) {
name = n.to_string();
}
self.blocks.remove(&name);
Ok(())
}
}
4 changes: 2 additions & 2 deletions sartd/src/kubernetes/src/agent/cni/netlink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::{collections::HashMap, net::IpAddr, str::FromStr};

use futures::TryStreamExt;

use ipnet::{IpAdd, IpNet};
use ipnet::IpNet;
use netlink_packet_route::{
link::{self, LinkAttribute},
link::LinkAttribute,
route::{RouteAddress, RouteAttribute, RouteProtocol, RouteScope},
rule::RuleAction,
};
Expand Down
8 changes: 8 additions & 0 deletions sartd/src/kubernetes/src/agent/cni/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,14 @@ impl CNIServerInner {
);
// let created_br = self.receiver.blocking_recv();
let created_br = self.receiver.recv().await.ok_or(Error::ReceiveNotify)?;
tracing::info!(
name = pod_info.name,
namespace = pod_info.namespace,
container_id = pod_info.container_id,
cmd = "ADD",
"Got the signal of address block creation"
);

block_request_api
.delete(&br.name_any(), &DeleteParams::default())
.await
Expand Down
22 changes: 4 additions & 18 deletions sartd/src/kubernetes/src/agent/reconciler/address_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration};
use futures::StreamExt;
use ipnet::IpNet;
use kube::{
api::{DeleteParams, ListParams, PostParams},
api::{DeleteParams, ListParams, PatchParams, PostParams},
core::ObjectMeta,
runtime::{
controller::Action,
Expand All @@ -20,7 +20,7 @@ use crate::{
agent::{error::Error, reconciler::node_bgp::ENV_HOSTNAME},
context::{error_policy, ContextWith, Ctx, State},
crd::{
address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER, ADDRESS_BLOCK_NODE_LABEL},
address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER_AGENT, ADDRESS_BLOCK_NODE_LABEL},
address_pool::{AddressType, ADDRESS_POOL_ANNOTATION},
bgp_advertisement::{
AdvertiseStatus, BGPAdvertisement, BGPAdvertisementSpec, BGPAdvertisementStatus,
Expand Down Expand Up @@ -48,7 +48,7 @@ pub async fn reconciler(
let address_blocks = Api::<AddressBlock>::all(ctx.client().clone());
finalizer(
&address_blocks,
ADDRESS_BLOCK_FINALIZER,
ADDRESS_BLOCK_FINALIZER_AGENT,
ab,
|event| async {
match event {
Expand All @@ -63,7 +63,7 @@ pub async fn reconciler(

#[tracing::instrument(skip_all)]
async fn reconcile(
api: &Api<AddressBlock>,
_api: &Api<AddressBlock>,
ab: &AddressBlock,
ctx: Arc<ContextWith<Arc<PodAllocator>>>,
) -> Result<Action, Error> {
Expand Down Expand Up @@ -111,12 +111,6 @@ async fn reconcile(
Some(_block) => {
tracing::info!(name = ab.name_any(), "Address block already exists");

// GC empty block
// if block.allocator.is_empty() {
// tracing::info!(name = ab.name_any(), "Block is empty");
// need_gc = true;
// }

match ab.spec.auto_assign {
true => {
// Check if already set
Expand Down Expand Up @@ -179,14 +173,6 @@ async fn reconcile(
}
}

// if need_gc {
// tracing::info!(name = ab.name_any(), "Delete empty AddressBlock");
// api.delete(&ab.name_any(), &DeleteParams::default())
// .await
// .map_err(Error::Kube)?;
// return Ok(Action::await_change());
// }

if create_adv {
let adv = BGPAdvertisement {
metadata: ObjectMeta {
Expand Down
20 changes: 8 additions & 12 deletions sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{net::IpAddr, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
api::{ListParams, PostParams},
api::{ListParams, Patch, PatchParams, PostParams},
runtime::{
controller::Action,
finalizer::{finalizer, Event},
Expand Down Expand Up @@ -75,6 +75,8 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
let mut established = false;
let mut reset = false;

let ssapply = PatchParams::apply("agent-bgppeer");

// create or update peer and its status
match speaker_client
.get_neighbor(sartd_proto::sart::GetNeighborRequest {
Expand Down Expand Up @@ -182,12 +184,9 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
addr = bp.spec.addr,
"Update BGPPeer status"
);
let patch = Patch::Merge(&new_bp);
if let Err(e) = api
.replace_status(
&bp.name_any(),
&PostParams::default(),
serde_json::to_vec(&new_bp).map_err(Error::Serialization)?,
)
.patch_status(&bp.name_any(), &ssapply, &patch)
.await
.map_err(Error::Kube)
{
Expand Down Expand Up @@ -309,12 +308,9 @@ async fn reconcile(api: &Api<BGPPeer>, bp: &BGPPeer, ctx: Arc<Context>) -> Resul
&get_namespace::<BGPAdvertisement>(ba)
.map_err(Error::KubeLibrary)?,
);
let patch = Patch::Merge(&ba);
ns_ba_api
.replace_status(
&ba.name_any(),
&PostParams::default(),
serde_json::to_vec(&ba).map_err(Error::Serialization)?,
)
.patch_status(&ba.name_any(), &ssapply, &patch)
.await
.map_err(Error::Kube)?;
}
Expand Down
7 changes: 5 additions & 2 deletions sartd/src/kubernetes/src/agent/reconciler/bgp_peer_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use k8s_openapi::api::discovery::v1::EndpointSlice;
use kube::{
api::{ListParams, PostParams},
api::{ListParams, Patch, PatchParams, PostParams},
Api, Client, ResourceExt,
};
use sartd_proto::sart::{
Expand Down Expand Up @@ -167,8 +167,11 @@ impl BgpExporterApi for BGPPeerStateWatcher {
Err(e) => return Err(Status::internal(e.to_string())),
};
let ns_eps_api = Api::<EndpointSlice>::namespaced(client.clone(), &ns);
let eps_name = eps.name_any();
let ssapply = PatchParams::apply("agent-peerwatcher");
let patch = Patch::Merge(eps);
if let Err(e) = ns_eps_api
.replace(&eps.name_any(), &PostParams::default(), eps)
.patch(&eps_name, &ssapply, &patch)
.await
.map_err(Error::Kube)
{
Expand Down
5 changes: 1 addition & 4 deletions sartd/src/kubernetes/src/agent/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::net::IpAddr;
use std::sync::Arc;
use std::time::Duration;

use actix_web::{
get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder,
Expand All @@ -14,9 +13,7 @@ use sartd_ipam::manager::AllocatorSet;
use sartd_trace::init::{prepare_tracing, TraceConfig};
use tokio::sync::mpsc::unbounded_channel;

use crate::agent::cni::server::{CNIServer, CNI_ROUTE_TABLE_ID};
use crate::agent::cni::{self, gc};
use crate::agent::reconciler::address_block::PodAllocator;
use crate::agent::{cni::{self, server::{CNIServer, CNI_ROUTE_TABLE_ID}}, reconciler::address_block::PodAllocator};
use crate::config::Mode;
use crate::context::State;
use crate::crd::address_block::AddressBlock;
Expand Down
9 changes: 9 additions & 0 deletions sartd/src/kubernetes/src/controller/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ pub enum Error {
#[error("BlockRequest not performed")]
BlockRequestNotPerformed,

#[error("BlockRequest not completed")]
BlockRequestNotCompleted,

#[error("Invalid address type")]
InvalidAddressType,

Expand All @@ -122,6 +125,12 @@ pub enum Error {

#[error("Invalid pool")]
InvalidPool,

#[error("AddressBlock index is not set")]
AddressBlockIndexNotSet,

#[error("Address pool is not empty")]
AddressPoolNotEmpty,
}

#[derive(Debug, Error)]
Expand Down
Loading

0 comments on commit 1c3959d

Please sign in to comment.