diff --git a/Dockerfile b/Dockerfile index f7ad2d2..590acfb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,11 +4,6 @@ ARG RUST_VERSION=1.76.0 FROM --platform=$BUILDPLATFORM rust:${RUST_VERSION} as builder -WORKDIR /home -COPY ./sartd /home/sartd -COPY ./sart /home/sart -COPY ./sartcni /home/sartcni -COPY ./proto /home/proto RUN apt update -y && \ apt install -y protobuf-compiler libprotobuf-dev clang llvm mold gcc-multilib @@ -30,6 +25,12 @@ RUN case "$TARGETPLATFORM" in \ RUN rustup target add $(cat /rust_target.txt) +WORKDIR /home +COPY ./sartd /home/sartd +COPY ./sart /home/sart +COPY ./sartcni /home/sartcni +COPY ./proto /home/proto + RUN cd sartd; cargo build --release --target $(cat /rust_target.txt) && \ cp /home/sartd/target/$(cat /rust_target.txt)/release/sartd /usr/local/bin/sartd && \ cargo build --release --bin cni-installer --target $(cat /rust_target.txt) && \ diff --git a/docs/design.md b/docs/design.md index 58b36b3..3fcef17 100644 --- a/docs/design.md +++ b/docs/design.md @@ -6,7 +6,7 @@ Sart is Kubernetes network load-balancer and CNI plugin for Kubernetes using BGP This project is inspired by [Metallb](https://github.com/metallb/metallb) and [Coil](https://github.com/cybozu-go/coil). > [!WARNING] -> CNI feature is not implemented yet. +> This project is under experimental. ## Programs diff --git a/manifests/lb/sample/lb.yaml b/manifests/lb/sample/lb.yaml index e28ecce..43e287e 100644 --- a/manifests/lb/sample/lb.yaml +++ b/manifests/lb/sample/lb.yaml @@ -22,7 +22,7 @@ spec: spec: containers: - name: app - image: nginx:latest + image: ghcr.io/terassyi/test-server:0.1.2 ports: - containerPort: 80 --- @@ -43,7 +43,7 @@ spec: spec: containers: - name: app - image: nginx:latest + image: ghcr.io/terassyi/test-server:0.1.2 ports: - containerPort: 80 --- @@ -64,7 +64,7 @@ spec: spec: containers: - name: app - image: nginx:latest + image: ghcr.io/terassyi/test-server:0.1.2 ports: - containerPort: 80 --- @@ -83,7 +83,7 @@ spec: ports: - name: http port: 80 - targetPort: 80 + targetPort: 8080 --- # LoadBalancer Service apiVersion: v1 @@ -101,7 +101,7 @@ spec: ports: - name: http port: 80 - targetPort: 80 + targetPort: 8080 --- # LoadBalancer Service apiVersion: v1 @@ -120,7 +120,7 @@ spec: ports: - name: http port: 80 - targetPort: 80 + targetPort: 8080 --- apiVersion: v1 kind: Pod diff --git a/sartd/src/ipam/src/manager.rs b/sartd/src/ipam/src/manager.rs index bbe6a4c..7e17a0e 100644 --- a/sartd/src/ipam/src/manager.rs +++ b/sartd/src/ipam/src/manager.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, net::IpAddr, sync::{Arc, Mutex}, vec, @@ -7,6 +7,8 @@ use std::{ use ipnet::IpNet; +use crate::bitset::BitSet; + use super::{ allocator::{Allocator, AllocatorMethod}, error::Error, @@ -158,3 +160,90 @@ impl AllocationInfo { self.blocks.get(block) } } + +#[derive(Debug, Default, Clone)] +pub struct BlockAllocator { + pub inner: Arc>, +} + +impl BlockAllocator { + pub fn new(pool_map: HashMap) -> BlockAllocator { + BlockAllocator { + inner: Arc::new(Mutex::new(BlockAllocatorInner { pools: pool_map })), + } + } +} + +#[derive(Debug, Default)] +pub struct BlockAllocatorInner { + pools: HashMap, +} + +impl BlockAllocatorInner { + pub fn insert(&mut self, name: &str, pool: Pool) { + self.pools.insert(name.to_string(), pool); + } + + pub fn get(&self, name: &str) -> Option<&Pool> { + self.pools.get(name) + } + + pub fn get_mut(&mut self, name: &str) -> Option<&mut Pool> { + self.pools.get_mut(name) + } + + pub fn release(&mut self, name: &str) { + self.pools.remove(name); + } +} + +#[derive(Debug)] +pub struct Pool { + pub cidr: IpNet, + pub size: u32, + pub block_size: u32, + pub allocator: BitSet, + pub blocks: BTreeMap, + pub counter: u32, +} + +impl Pool { + pub fn new(cidr: &IpNet, block_size: u32) -> Pool { + let prefix_len = cidr.prefix_len(); + let bitset_size = 2u128.pow(block_size - (prefix_len as u32)); + Pool { + cidr: *cidr, + size: prefix_len as u32, + block_size, + allocator: BitSet::new(bitset_size), + blocks: BTreeMap::new(), + counter: 0, + } + } + + pub fn allocate(&mut self, name: &str) -> Result { + let index = self.allocator.set_next().map_err(Error::BitSet)?; + self.blocks.insert(name.to_string(), index as u32); + self.counter += 1; + Ok(index) + } + + // for recovering allocations + pub fn allocate_with(&mut self, index: u128, name: &str) -> Result { + let index = self.allocator.set(index, true).map_err(Error::BitSet)?; + self.blocks.insert(name.to_string(), index as u32); + Ok(index) + } + + pub fn release(&mut self, index: u32) -> Result<(), Error> { + self.allocator + .set(index as u128, false) + .map_err(Error::BitSet)?; + let mut name = String::new(); + if let Some((n, _)) = self.blocks.iter_mut().find(|(_, i)| index == **i) { + name = n.to_string(); + } + self.blocks.remove(&name); + Ok(()) + } +} diff --git a/sartd/src/kubernetes/src/agent/cni/netlink.rs b/sartd/src/kubernetes/src/agent/cni/netlink.rs index 3020108..d9c0d24 100644 --- a/sartd/src/kubernetes/src/agent/cni/netlink.rs +++ b/sartd/src/kubernetes/src/agent/cni/netlink.rs @@ -2,9 +2,9 @@ use std::{collections::HashMap, net::IpAddr, str::FromStr}; use futures::TryStreamExt; -use ipnet::{IpAdd, IpNet}; +use ipnet::IpNet; use netlink_packet_route::{ - link::{self, LinkAttribute}, + link::LinkAttribute, route::{RouteAddress, RouteAttribute, RouteProtocol, RouteScope}, rule::RuleAction, }; diff --git a/sartd/src/kubernetes/src/agent/cni/server.rs b/sartd/src/kubernetes/src/agent/cni/server.rs index 5dd8b1a..bb645d4 100644 --- a/sartd/src/kubernetes/src/agent/cni/server.rs +++ b/sartd/src/kubernetes/src/agent/cni/server.rs @@ -188,6 +188,14 @@ impl CNIServerInner { ); // let created_br = self.receiver.blocking_recv(); let created_br = self.receiver.recv().await.ok_or(Error::ReceiveNotify)?; + tracing::info!( + name = pod_info.name, + namespace = pod_info.namespace, + container_id = pod_info.container_id, + cmd = "ADD", + "Got the signal of address block creation" + ); + block_request_api .delete(&br.name_any(), &DeleteParams::default()) .await diff --git a/sartd/src/kubernetes/src/agent/reconciler/address_block.rs b/sartd/src/kubernetes/src/agent/reconciler/address_block.rs index 9b7c299..ea9d72e 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/address_block.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/address_block.rs @@ -3,7 +3,7 @@ use std::{collections::BTreeMap, str::FromStr, sync::Arc, time::Duration}; use futures::StreamExt; use ipnet::IpNet; use kube::{ - api::{DeleteParams, ListParams, PostParams}, + api::{DeleteParams, ListParams, PatchParams, PostParams}, core::ObjectMeta, runtime::{ controller::Action, @@ -20,7 +20,7 @@ use crate::{ agent::{error::Error, reconciler::node_bgp::ENV_HOSTNAME}, context::{error_policy, ContextWith, Ctx, State}, crd::{ - address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER, ADDRESS_BLOCK_NODE_LABEL}, + address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER_AGENT, ADDRESS_BLOCK_NODE_LABEL}, address_pool::{AddressType, ADDRESS_POOL_ANNOTATION}, bgp_advertisement::{ AdvertiseStatus, BGPAdvertisement, BGPAdvertisementSpec, BGPAdvertisementStatus, @@ -48,7 +48,7 @@ pub async fn reconciler( let address_blocks = Api::::all(ctx.client().clone()); finalizer( &address_blocks, - ADDRESS_BLOCK_FINALIZER, + ADDRESS_BLOCK_FINALIZER_AGENT, ab, |event| async { match event { @@ -63,7 +63,7 @@ pub async fn reconciler( #[tracing::instrument(skip_all)] async fn reconcile( - api: &Api, + _api: &Api, ab: &AddressBlock, ctx: Arc>>, ) -> Result { @@ -111,12 +111,6 @@ async fn reconcile( Some(_block) => { tracing::info!(name = ab.name_any(), "Address block already exists"); - // GC empty block - // if block.allocator.is_empty() { - // tracing::info!(name = ab.name_any(), "Block is empty"); - // need_gc = true; - // } - match ab.spec.auto_assign { true => { // Check if already set @@ -179,14 +173,6 @@ async fn reconcile( } } - // if need_gc { - // tracing::info!(name = ab.name_any(), "Delete empty AddressBlock"); - // api.delete(&ab.name_any(), &DeleteParams::default()) - // .await - // .map_err(Error::Kube)?; - // return Ok(Action::await_change()); - // } - if create_adv { let adv = BGPAdvertisement { metadata: ObjectMeta { diff --git a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs index e979b73..6f3daee 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer.rs @@ -1,9 +1,9 @@ -use std::{net::IpAddr, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use futures::StreamExt; use k8s_openapi::api::discovery::v1::EndpointSlice; use kube::{ - api::{ListParams, PostParams}, + api::{ListParams, Patch, PatchParams, PostParams}, runtime::{ controller::Action, finalizer::{finalizer, Event}, @@ -75,6 +75,8 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul let mut established = false; let mut reset = false; + let ssapply = PatchParams::apply("agent-bgppeer"); + // create or update peer and its status match speaker_client .get_neighbor(sartd_proto::sart::GetNeighborRequest { @@ -182,12 +184,9 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul addr = bp.spec.addr, "Update BGPPeer status" ); + let patch = Patch::Merge(&new_bp); if let Err(e) = api - .replace_status( - &bp.name_any(), - &PostParams::default(), - serde_json::to_vec(&new_bp).map_err(Error::Serialization)?, - ) + .patch_status(&bp.name_any(), &ssapply, &patch) .await .map_err(Error::Kube) { @@ -309,12 +308,9 @@ async fn reconcile(api: &Api, bp: &BGPPeer, ctx: Arc) -> Resul &get_namespace::(ba) .map_err(Error::KubeLibrary)?, ); + let patch = Patch::Merge(&ba); ns_ba_api - .replace_status( - &ba.name_any(), - &PostParams::default(), - serde_json::to_vec(&ba).map_err(Error::Serialization)?, - ) + .patch_status(&ba.name_any(), &ssapply, &patch) .await .map_err(Error::Kube)?; } diff --git a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer_watcher.rs b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer_watcher.rs index efa97ad..14007b8 100644 --- a/sartd/src/kubernetes/src/agent/reconciler/bgp_peer_watcher.rs +++ b/sartd/src/kubernetes/src/agent/reconciler/bgp_peer_watcher.rs @@ -1,6 +1,6 @@ use k8s_openapi::api::discovery::v1::EndpointSlice; use kube::{ - api::{ListParams, PostParams}, + api::{ListParams, Patch, PatchParams, PostParams}, Api, Client, ResourceExt, }; use sartd_proto::sart::{ @@ -167,8 +167,11 @@ impl BgpExporterApi for BGPPeerStateWatcher { Err(e) => return Err(Status::internal(e.to_string())), }; let ns_eps_api = Api::::namespaced(client.clone(), &ns); + let eps_name = eps.name_any(); + let ssapply = PatchParams::apply("agent-peerwatcher"); + let patch = Patch::Merge(eps); if let Err(e) = ns_eps_api - .replace(&eps.name_any(), &PostParams::default(), eps) + .patch(&eps_name, &ssapply, &patch) .await .map_err(Error::Kube) { diff --git a/sartd/src/kubernetes/src/agent/server.rs b/sartd/src/kubernetes/src/agent/server.rs index 595c940..a5650a5 100644 --- a/sartd/src/kubernetes/src/agent/server.rs +++ b/sartd/src/kubernetes/src/agent/server.rs @@ -1,6 +1,5 @@ use std::net::IpAddr; use std::sync::Arc; -use std::time::Duration; use actix_web::{ get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder, @@ -14,9 +13,7 @@ use sartd_ipam::manager::AllocatorSet; use sartd_trace::init::{prepare_tracing, TraceConfig}; use tokio::sync::mpsc::unbounded_channel; -use crate::agent::cni::server::{CNIServer, CNI_ROUTE_TABLE_ID}; -use crate::agent::cni::{self, gc}; -use crate::agent::reconciler::address_block::PodAllocator; +use crate::agent::{cni::{self, server::{CNIServer, CNI_ROUTE_TABLE_ID}}, reconciler::address_block::PodAllocator}; use crate::config::Mode; use crate::context::State; use crate::crd::address_block::AddressBlock; diff --git a/sartd/src/kubernetes/src/controller/error.rs b/sartd/src/kubernetes/src/controller/error.rs index 74e806e..70e64d7 100644 --- a/sartd/src/kubernetes/src/controller/error.rs +++ b/sartd/src/kubernetes/src/controller/error.rs @@ -114,6 +114,9 @@ pub enum Error { #[error("BlockRequest not performed")] BlockRequestNotPerformed, + #[error("BlockRequest not completed")] + BlockRequestNotCompleted, + #[error("Invalid address type")] InvalidAddressType, @@ -122,6 +125,12 @@ pub enum Error { #[error("Invalid pool")] InvalidPool, + + #[error("AddressBlock index is not set")] + AddressBlockIndexNotSet, + + #[error("Address pool is not empty")] + AddressPoolNotEmpty, } #[derive(Debug, Error)] diff --git a/sartd/src/kubernetes/src/controller/reconciler/address_block.rs b/sartd/src/kubernetes/src/controller/reconciler/address_block.rs index a7d3167..d112b62 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/address_block.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/address_block.rs @@ -3,33 +3,37 @@ use std::{str::FromStr, sync::Arc}; use futures::StreamExt; use ipnet::IpNet; use kube::{ - api::ListParams, - runtime::{ + api::ListParams, runtime::{ controller::{Action, Controller}, finalizer::{finalizer, Event}, watcher::Config, - }, - Api, Client, ResourceExt, + }, Api, Client, ResourceExt }; -use sartd_ipam::manager::{AllocatorSet, Block}; +use sartd_ipam::manager::{AllocatorSet, Block, BlockAllocator}; use crate::{ context::{error_policy, ContextWith, Ctx, State}, controller::error::Error, - crd::{address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER}, address_pool::AddressType}, + crd::{address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER_CONTROLLER}, address_pool::AddressType}, }; +#[derive(Debug, Clone)] +pub struct ControllerAddressBlockContext { + pub allocator_set: Arc, + pub block_allocator: Arc, +} + #[tracing::instrument(skip_all, fields(trace_id))] pub async fn reconciler( ab: Arc, - ctx: Arc>>, + ctx: Arc>, ) -> Result { let address_blocks = Api::::all(ctx.client().clone()); finalizer( &address_blocks, - ADDRESS_BLOCK_FINALIZER, + ADDRESS_BLOCK_FINALIZER_CONTROLLER, ab, |event| async { match event { @@ -44,9 +48,31 @@ pub async fn reconciler( #[tracing::instrument(skip_all)] async fn reconcile( + api: &Api, + ab: &AddressBlock, + ctx: Arc>, +) -> Result { + match ab.spec.r#type { + AddressType::Pod => reconcile_pod(api, ab, ctx).await, + AddressType::Service => reconcile_service(api, ab, ctx).await + } +} + +#[tracing::instrument(skip_all)] +async fn reconcile_pod( + _api: &Api, + _ab: &AddressBlock, + _ctx: Arc>, +) -> Result { + Ok(Action::await_change()) +} + + +#[tracing::instrument(skip_all)] +async fn reconcile_service( _api: &Api, ab: &AddressBlock, - ctx: Arc>>, + ctx: Arc>, ) -> Result { // only handling lb address block here if ab.spec.r#type.ne(&AddressType::Service) { @@ -54,7 +80,7 @@ async fn reconcile( } tracing::info!(name = ab.name_any(), "Reconcile AddressBlock"); - let component = ctx.component.clone(); + let component = ctx.component.allocator_set.clone(); let mut alloc_set = component.inner.lock().map_err(|_| Error::FailedToGetLock)?; let cidr = IpNet::from_str(&ab.spec.cidr).map_err(|_| Error::InvalidCIDR)?; @@ -118,13 +144,48 @@ async fn reconcile( #[tracing::instrument(skip_all)] async fn cleanup( + api: &Api, + ab: &AddressBlock, + ctx: Arc>, +) -> Result { + match ab.spec.r#type { + AddressType::Pod => cleanup_pod(api, ab, ctx).await, + AddressType::Service => cleanup_service(api, ab, ctx).await, + } +} + +#[tracing::instrument(skip_all)] +async fn cleanup_pod( + _api: &Api, + ab: &AddressBlock, + ctx: Arc>, +) -> Result { + + let index = match &ab.status { + Some(status) => status.index, + None => return Err(Error::AddressBlockIndexNotSet) + }; + + { + let tmp = ctx.component.block_allocator.clone(); + let mut block_allocator = tmp.inner.lock().map_err(|_| Error::FailedToGetLock)?; + if let Some(pool) = block_allocator.get_mut(&ab.spec.pool_ref) { + tracing::info!(pool=ab.spec.pool_ref, cidr=?pool.cidr, block_size=pool.block_size,"Remove the pool from the block allocator"); + pool.release(index).map_err(Error::Ipam)?; + } + } + Ok(Action::await_change()) +} + +#[tracing::instrument(skip_all)] +async fn cleanup_service( _api: &Api, ab: &AddressBlock, - ctx: Arc>>, + ctx: Arc>, ) -> Result { tracing::info!(name = ab.name_any(), "clean up AddressBlock"); - let component = ctx.component.clone(); + let component = ctx.component.allocator_set.clone(); let mut alloc_set = component.inner.lock().map_err(|_| Error::FailedToGetLock)?; if let Some(auto) = alloc_set.auto_assign.as_ref() { @@ -154,7 +215,7 @@ async fn cleanup( Ok(Action::await_change()) } -pub async fn run(state: State, interval: u64, allocator_set: Arc) { +pub async fn run(state: State, interval: u64, ctx: ControllerAddressBlockContext) { let client = Client::try_default() .await .expect("Failed to create kube client"); @@ -172,8 +233,8 @@ pub async fn run(state: State, interval: u64, allocator_set: Arc) .shutdown_on_signal() .run( reconciler, - error_policy::>>, - state.to_context_with::>(client, interval, allocator_set), + error_policy::>, + state.to_context_with::(client, interval, ctx), ) .filter_map(|x| async move { std::result::Result::ok(x) }) .for_each(|_| futures::future::ready(())) @@ -186,11 +247,11 @@ mod tests { use ipnet::IpNet; use kube::{core::ObjectMeta, Api}; - use sartd_ipam::manager::{AllocatorSet, Block}; + use sartd_ipam::manager::{AllocatorSet, Block, BlockAllocator}; use crate::{ context::{ContextWith, Ctx}, - controller::error::Error, + controller::{error::Error, reconciler::address_block::ControllerAddressBlockContext}, crd::address_block::{AddressBlock, AddressBlockSpec}, fixture::reconciler::{timeout_after_1s, ApiServerVerifier}, }; @@ -219,7 +280,12 @@ mod tests { #[tokio::test] async fn create_address_block() { let alloc_set = Arc::new(AllocatorSet::default()); - let (testctx, fakeserver, _) = ContextWith::test(alloc_set.clone()); + let block_allocator = Arc::new(BlockAllocator::default()); + let ab_ctx = ControllerAddressBlockContext { + allocator_set: alloc_set.clone(), + block_allocator: block_allocator.clone(), + }; + let (testctx, fakeserver, _) = ContextWith::test(ab_ctx); let ab = AddressBlock { metadata: ObjectMeta { name: Some("test-block".to_string()), @@ -251,7 +317,12 @@ mod tests { #[tokio::test] async fn update_address_block_to_auto_assign() { let alloc_set = Arc::new(AllocatorSet::default()); - let (testctx, fakeserver, _) = ContextWith::test(alloc_set.clone()); + let block_allocator = Arc::new(BlockAllocator::default()); + let ab_ctx = ControllerAddressBlockContext { + allocator_set: alloc_set.clone(), + block_allocator: block_allocator.clone(), + }; + let (testctx, fakeserver, _) = ContextWith::test(ab_ctx); let ab = AddressBlock { metadata: ObjectMeta { name: Some("test-block".to_string()), @@ -299,7 +370,12 @@ mod tests { alloc_set_inner.auto_assign ); } - let (testctx, fakeserver, _) = ContextWith::test(alloc_set.clone()); + let block_allocator = Arc::new(BlockAllocator::default()); + let ab_ctx = ControllerAddressBlockContext { + allocator_set: alloc_set.clone(), + block_allocator: block_allocator.clone(), + }; + let (testctx, fakeserver, _) = ContextWith::test(ab_ctx); let ab = AddressBlock { metadata: ObjectMeta { name: Some("test-block".to_string()), @@ -366,7 +442,12 @@ mod tests { alloc_set_inner.auto_assign ); } - let (testctx, fakeserver, _) = ContextWith::test(alloc_set.clone()); + let block_allocator = Arc::new(BlockAllocator::default()); + let ab_ctx = ControllerAddressBlockContext { + allocator_set: alloc_set.clone(), + block_allocator: block_allocator.clone(), + }; + let (testctx, fakeserver, _) = ContextWith::test(ab_ctx); let ab = AddressBlock { metadata: ObjectMeta { name: Some("test-block".to_string()), diff --git a/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs b/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs index 0018328..789d58f 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/address_pool.rs @@ -1,17 +1,15 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, net::{IpAddr, Ipv4Addr, Ipv6Addr}, str::FromStr, sync::Arc, - time::Duration, }; use futures::StreamExt; -use ipnet::{AddrParseError, IpNet, Ipv4Net, Ipv6Net}; -use k8s_openapi::api::core::v1::Node; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; + use kube::{ - api::{DeleteParams, ListParams, PostParams}, + api::{ListParams, PostParams}, core::ObjectMeta, runtime::{ controller::{Action, Controller}, @@ -20,24 +18,27 @@ use kube::{ }, Api, Client, ResourceExt, }; +use sartd_ipam::manager::{BlockAllocator, Pool}; use crate::{ - context::{error_policy, Context, State}, + context::{error_policy, ContextWith, Ctx, State}, controller::error::Error, crd::{ address_block::{AddressBlock, AddressBlockSpec}, address_pool::{ - AddressPool, AddressPoolStatus, AddressType, ADDRESS_POOL_ANNOTATION, + AddressPool, AddressType, ADDRESS_POOL_ANNOTATION, ADDRESS_POOL_FINALIZER, }, - block_request::BlockRequest, }, - util::{create_owner_reference, diff}, + util::create_owner_reference, }; #[tracing::instrument(skip_all, fields(trace_id))] -pub async fn reconciler(ap: Arc, ctx: Arc) -> Result { - let address_pools = Api::::all(ctx.client.clone()); +pub async fn reconciler( + ap: Arc, + ctx: Arc>>, +) -> Result { + let address_pools = Api::::all(ctx.client().clone()); finalizer(&address_pools, ADDRESS_POOL_FINALIZER, ap, |event| async { match event { @@ -53,7 +54,7 @@ pub async fn reconciler(ap: Arc, ctx: Arc) -> Result, ap: &AddressPool, - ctx: Arc, + ctx: Arc>>, ) -> Result { tracing::info!(name = ap.name_any(), "reconcile AddressPool"); @@ -67,9 +68,12 @@ async fn reconcile( async fn reconcile_service_pool( api: &Api, ap: &AddressPool, - ctx: Arc, + ctx: Arc>>, ) -> Result { - let address_blocks = Api::::all(ctx.client.clone()); + let address_blocks = Api::::all(ctx.client().clone()); + + let cidr = IpNet::from_str(&ap.spec.cidr).map_err(|_| Error::InvalidCIDR)?; + let block_size = ap.spec.block_size.unwrap_or(cidr.prefix_len() as u32); match address_blocks .get_opt(&ap.name_any()) @@ -100,40 +104,21 @@ async fn reconcile_service_pool( .create(&PostParams::default(), &ab) .await .map_err(Error::Kube)?; - } - } - let mut new_ap = ap.clone(); - let mut need_update = false; - match new_ap - .status - .as_mut() - .and_then(|status| status.allocated.as_mut()) - { - Some(allocated) => { - if allocated.get(&ap.name_any()).is_some() { - allocated.insert(ap.name_any(), 0); - need_update = true; + let pool = Pool::new(&cidr, block_size); + + { + let tmp = ctx.component.clone(); + let mut block_allocator = tmp.inner.lock().map_err(|_| Error::FailedToGetLock)?; + if block_allocator.get(&ap.name_any()).is_none() { + tracing::info!( + name = ap.name_any(), + "Insert address pool into block allocator" + ); + block_allocator.insert(&ap.name_any(), pool); + } } } - None => { - new_ap.status = Some(AddressPoolStatus { - requested: None, - allocated: Some(HashMap::from([(ap.name_any(), 0)])), - released: None, - }); - need_update = true; - } - } - - if need_update { - api.replace_status( - &ap.name_any(), - &PostParams::default(), - serde_json::to_vec(&new_ap).map_err(Error::Serialization)?, - ) - .await - .map_err(Error::Kube)?; } Ok(Action::await_change()) @@ -143,361 +128,52 @@ async fn reconcile_service_pool( async fn reconcile_pod_pool( api: &Api, ap: &AddressPool, - ctx: Arc, + ctx: Arc>>, ) -> Result { - let cidr: IpNet = ap - .spec - .cidr - .parse() - .map_err(|e: AddrParseError| Error::InvalidParameter(e.to_string()))?; - let block_prefix_len = ap.spec.block_size.unwrap_or(cidr.prefix_len() as u32); - let block_size = 1 << (cidr.max_prefix_len() as u32 - block_prefix_len); - let pool_size: u128 = 1 << (cidr.max_prefix_len() - cidr.prefix_len()); - tracing::info!( - block = block_size, - pool = pool_size, - max_prefix_len = cidr.max_prefix_len(), - prefix_len = cidr.prefix_len() - ); - - let mut new_ap = ap.clone(); - - let address_block_api = Api::::all(ctx.client.clone()); - let params = - ListParams::default().labels(&format!("{}={}", ADDRESS_POOL_ANNOTATION, ap.name_any())); - let ab_list = address_block_api.list(¶ms).await.map_err(Error::Kube)?; - let ab_cidr_map: HashMap = ab_list - .iter() - .map(|ab| (ab.name_any(), ab.spec.cidr.clone())) - .collect(); - - let actual_allocated_blocks: Vec = ab_list.iter().map(|ab| ab.name_any()).collect(); - - let allocated_blocks = if let Some(allocated) = ap - .status - .as_ref() - .and_then(|status| status.allocated.as_ref()) - { - let mut allocated_blocks: Vec = allocated.keys().cloned().collect(); - allocated_blocks.sort(); - allocated_blocks - } else { - Vec::new() - }; - - let (added, _remain, removed) = diff::(&allocated_blocks, &actual_allocated_blocks); - - let mut need_update = !removed.is_empty() || !added.is_empty(); - - for added_ab in added.iter() { - let block_cidr = match ab_cidr_map.get(added_ab) { - Some(cidr) => IpNet::from_str(cidr).map_err(|_e| Error::InvalidAddress)?, - None => { - return Err(Error::FailedToGetData( - "Allocated block is not found".to_string(), - )) - } - }; - let head = contains_cidr(&cidr, &block_cidr)?; - if head % block_size != 0 { - tracing::error!( - name = ap.name_any(), - block = added_ab, - block_size = block_size, - "block's CIDR is invalid" - ); - continue; - } - match new_ap.status.as_mut() { - Some(status) => match status.allocated.as_mut() { - Some(allocated) => { - allocated.insert(added_ab.to_string(), head as u128); - } - None => { - status.allocated = Some(HashMap::from([(added_ab.to_string(), head as u128)])) - } - }, - None => { - new_ap.status = Some(AddressPoolStatus { - requested: None, - allocated: Some(HashMap::from([(added_ab.to_string(), head as u128)])), - released: None, - }) - } - } - } - - for removed_ab in removed.iter() { - match new_ap.status.as_mut() { - Some(status) => match status.released.as_mut() { - Some(released) => { - if !released.iter().any(|r| r.eq(removed_ab)) { - released.push(removed_ab.to_string()); - } - } - None => { - status.released = Some(vec![removed_ab.to_string()]); - } - }, - None => { - new_ap.status = Some(AddressPoolStatus { - requested: None, - allocated: None, - released: Some(vec![removed_ab.to_string()]), - }); - } - } - } - - let mut need_requeue = false; - - let mut new_released = Vec::new(); - - if let Some(released) = new_ap - .status - .as_mut() - .and_then(|status| status.released.as_mut()) - { - for r in released.iter() { - if ab_cidr_map.get(r).is_some() { - // existing - new_released.push(r.to_string()); - tracing::info!(name = ap.name_any(), block = r, "Delete the unused block"); - address_block_api - .delete(r, &DeleteParams::default()) - .await - .map_err(Error::Kube)?; - need_requeue = true; - } - } - *released = new_released.clone(); - need_update = true; - } - - for rel in new_released.iter() { - // if new_released is not empty, need_update must be true. - if let Some(allocated) = new_ap - .status - .as_mut() - .and_then(|status| status.allocated.as_mut()) - { - allocated.remove(rel); - } - } - - let all_blocks: Vec = (0..pool_size).step_by(block_size as usize).collect(); - let allocated_blocks = if let Some(allocated) = ap - .status - .as_ref() - .and_then(|status| status.allocated.as_ref()) - { - let mut allocated_blocks: Vec = allocated.values().copied().collect(); - allocated_blocks.sort(); - allocated_blocks - } else { - Vec::new() - }; - let (allocatable_blocks, _, _) = diff::(&allocated_blocks, &all_blocks); - if allocatable_blocks.is_empty() { - tracing::warn!(name=ap.name_any(), allocated=?allocatable_blocks, all=?all_blocks, "AddressPool cannot create new AddressBlock any more"); - return Err(Error::NoAllocatableBlock); - } - - let mut allocatable_blocks = VecDeque::from(allocatable_blocks); - - let mut new_allocated = Vec::new(); - let mut new_requested = Vec::new(); - - let block_request_api = Api::::all(ctx.client.clone()); - let node_api = Api::::all(ctx.client.clone()); - - if let Some(requested) = new_ap - .status - .as_ref() - .and_then(|status| status.requested.as_ref()) + tracing::info!(name = ap.name_any(), "Reconcile AddressPool"); + let cidr = IpNet::from_str(&ap.spec.cidr).map_err(|_| Error::InvalidCIDR)?; + let block_size = ap.spec.block_size.unwrap_or(cidr.prefix_len() as u32); { - if !requested.is_empty() { - need_update = true; - } - for req in requested.iter() { - let br = match block_request_api.get(req).await.map_err(Error::Kube) { - Ok(br) => br, - Err(e) => { - tracing::error!(name=ap.name_any(), request=req, error=?e, "BlockRequest doesn't exist"); - continue; - } - }; - - if br.spec.pool.ne(&ap.name_any()) { - tracing::error!( - name = ap.name_any(), - target = br.spec.pool, - "request is not for this pool" - ); - continue; - } - let _node = node_api.get(&br.spec.node).await.map_err(Error::Kube)?; - - let head = match allocatable_blocks.pop_front() { - Some(head) => head, - None => { - tracing::error!( - name = ap.name_any(), - target = br.spec.pool, - "Requested node doesn't exist" - ); - continue; - } - }; - - let block_prefix_len = ap.spec.block_size.unwrap_or(cidr.prefix_len() as u32); - let block_cidr = match get_block_cidr(&cidr, block_prefix_len, head) { - Ok(cidr) => cidr, - Err(e) => { - tracing::error!(name=ap.name_any(), request=req, error=?e, "Failed to get block CIDR"); - new_requested.push(req.to_string()); - continue; - } - }; - let ab_name = format!("{}-{}", req, block_cidr.addr()); - tracing::info!( - name = ap.name_any(), - block = ab_name, - request = req, - head = head, - "Create new AddressBlock by a request" - ); - let ab = AddressBlock { - metadata: ObjectMeta { - labels: Some(BTreeMap::from([( - ADDRESS_POOL_ANNOTATION.to_string(), - ap.name_any(), - )])), - name: Some(ab_name.clone()), - ..Default::default() - }, - spec: AddressBlockSpec { - cidr: block_cidr.to_string(), - r#type: AddressType::Pod, - pool_ref: ap.name_any(), - node_ref: Some(br.spec.node.clone()), - auto_assign: ap.spec.auto_assign.unwrap_or(false), - }, - status: None, - }; - - if let Err(e) = address_block_api - .create(&PostParams::default(), &ab) - .await - .map_err(Error::Kube) - { - tracing::error!(name=ap.name_any(), block=ab_name, request=req, error=?e, "Failed to create AddressBlock"); - new_requested.push(req.to_string()); - continue; - } - new_allocated.push((ab_name, head)) - } - } - - match new_ap.status.as_mut() { - Some(status) => { - match status.allocated.as_mut() { - Some(allocated) => { - for (k, v) in new_allocated.into_iter() { - allocated.insert(k, v); - } - } - None => { - status.allocated = Some(new_allocated.into_iter().collect()); - } - } - status.requested = Some(new_requested); - } - None => { - new_ap.status = Some(AddressPoolStatus { - requested: Some(new_requested), - allocated: Some(new_allocated.into_iter().collect()), - released: None, - }); - } - } - - if need_update { - tracing::info!(name=ap.name_any(), status=?new_ap.status, "Update status"); - api.replace_status( - &ap.name_any(), - &PostParams::default(), - serde_json::to_vec(&new_ap).map_err(Error::Serialization)?, - ) - .await - .map_err(Error::Kube)?; - } - - // check auto assign - let auto_assign = ap.spec.auto_assign.unwrap_or(false); - - if auto_assign { - tracing::info!(name = ap.name_any(), "Auto assign is enabled"); - let ap_list = api - .list(&ListParams::default()) - .await - .map_err(Error::Kube)?; - let is_exist = ap_list - .iter() - .filter(|p| p.spec.r#type.eq(&AddressType::Pod) && p.name_any().ne(&ap.name_any())) - .fold(false, |other, p| { - p.spec.auto_assign.unwrap_or(false) || other - }); - if is_exist { - tracing::error!( - name = ap.name_any(), - "Auto assignable AddressPool for Pod already exists" - ); - return Err(Error::AutoAssignMustBeOne); + let tmp = ctx.component.clone(); + let mut block_allocator = tmp.inner.lock().map_err(|_| Error::FailedToGetLock)?; + if block_allocator.get(&ap.name_any()).is_none() { + let pool = Pool::new(&cidr, block_size); + tracing::info!(pool=ap.name_any(), cidr=?cidr, block_size=block_size,"Insert a new pool into the block allocator"); + block_allocator.insert(&ap.name_any(), pool) } } - let ab_list = address_block_api.list(¶ms).await.map_err(Error::Kube)?; - for ab in ab_list.iter() { - if ab.spec.auto_assign != auto_assign { - let mut new_ab = ab.clone(); - new_ab.spec.auto_assign = auto_assign; - tracing::info!( - name = ap.name_any(), - block = ab.name_any(), - auto_assign = auto_assign, - "Change auto assign flag for AddressBlock" - ); - address_block_api - .replace(&new_ab.name_any(), &PostParams::default(), &new_ab) - .await - .map_err(Error::Kube)?; - } - } - - if need_requeue { - return Ok(Action::requeue(Duration::from_secs(10))); - } - Ok(Action::await_change()) } #[tracing::instrument(skip_all)] async fn cleanup( _api: &Api, - _ap: &AddressPool, - _ctx: Arc, + ap: &AddressPool, + ctx: Arc>>, ) -> Result { + tracing::info!(name = ap.name_any(), "Clean up AddressPool"); + let address_block_api = Api::::all(ctx.client().clone()); + let list_params = + ListParams::default().labels(&format!("{}={}", ADDRESS_POOL_ANNOTATION, ap.name_any())); + let ab_list = address_block_api + .list(&list_params) + .await + .map_err(Error::Kube)?; + if ab_list.items.len() != 0 { + return Err(Error::AddressPoolNotEmpty); + } + Ok(Action::await_change()) } -pub async fn run(state: State, interval: u64) { +pub async fn run(state: State, interval: u64, block_allocator: Arc) { let client = Client::try_default() .await .expect("Failed to create kube client"); - let address_pools = Api::::all(client.clone()); - if let Err(e) = address_pools.list(&ListParams::default().limit(1)).await { + let address_pool_api = Api::::all(client.clone()); + if let Err(e) = address_pool_api.list(&ListParams::default().limit(1)).await { tracing::error!("CRD is not queryable; {e:?}. Is the CRD installed?"); tracing::info!("Installation: cargo run --bin crdgen | kubectl apply -f -"); std::process::exit(1); @@ -505,12 +181,12 @@ pub async fn run(state: State, interval: u64) { tracing::info!("Start AddressPool reconciler"); - Controller::new(address_pools, Config::default().any_semantic()) + Controller::new(address_pool_api, Config::default().any_semantic()) .shutdown_on_signal() .run( reconciler, - error_policy::, - state.to_context(client, interval), + error_policy::>>, + state.to_context_with(client, interval, block_allocator), ) .filter_map(|x| async move { std::result::Result::ok(x) }) .for_each(|_| futures::future::ready(())) diff --git a/sartd/src/kubernetes/src/controller/reconciler/block_request.rs b/sartd/src/kubernetes/src/controller/reconciler/block_request.rs index c887f43..f361c0b 100644 --- a/sartd/src/kubernetes/src/controller/reconciler/block_request.rs +++ b/sartd/src/kubernetes/src/controller/reconciler/block_request.rs @@ -1,8 +1,14 @@ -use std::sync::Arc; +use std::{ + collections::BTreeMap, + net::{Ipv4Addr, Ipv6Addr}, + str::FromStr, + sync::Arc, +}; use futures::StreamExt; +use ipnet::{IpNet, Ipv4Net, Ipv6Net}; use kube::{ - api::{ListParams, Patch, PostParams}, + api::{ListParams, ObjectMeta, Patch, PatchParams, PostParams}, runtime::{ controller::Action, finalizer::{finalizer, Event}, @@ -11,18 +17,23 @@ use kube::{ }, Api, Client, ResourceExt, }; +use sartd_ipam::manager::BlockAllocator; use crate::{ - context::{error_policy, Context, Ctx, State}, + context::{error_policy, ContextWith, Ctx, State}, controller::error::Error, crd::{ - address_pool::{AddressPool, AddressPoolStatus, AddressType}, + address_block::{AddressBlock, AddressBlockSpec, AddressBlockStatus}, + address_pool::{AddressPool, AddressType, ADDRESS_POOL_ANNOTATION}, block_request::{BlockRequest, BLOCK_REQUEST_FINALIZER}, }, }; #[tracing::instrument(skip_all, fields(trace_id))] -pub async fn reconciler(br: Arc, ctx: Arc) -> Result { +pub async fn reconciler( + br: Arc, + ctx: Arc>>, +) -> Result { let block_request_api = Api::::all(ctx.client().clone()); finalizer( @@ -44,53 +55,107 @@ pub async fn reconciler(br: Arc, ctx: Arc) -> Result, br: &BlockRequest, - ctx: Arc, + ctx: Arc>>, ) -> Result { tracing::info!(name = br.name_any(), "Reconcile BlockRequest"); - let address_pool_api = Api::::all(ctx.client.clone()); + let address_pool_api = Api::::all(ctx.client().clone()); + let address_block_api = Api::::all(ctx.client().clone()); + let ssapply = PatchParams::apply("agent-blockrequest"); - let mut pool = address_pool_api + let ap = address_pool_api .get(&br.spec.pool) .await .map_err(Error::Kube)?; - if pool.spec.r#type.ne(&AddressType::Pod) { + if ap.spec.r#type.ne(&AddressType::Pod) { return Err(Error::InvalidAddressType); } - match pool.status.as_mut() { - Some(status) => match status.requested.as_mut() { - Some(requested) => { - if requested.iter().any(|r| r.eq(&br.name_any())) { - tracing::warn!(name = br.name_any(), "Same BlockRequest already exists"); - return Err(Error::BlockRequestAlreadyExists); - } - requested.push(br.name_any()); - } - None => { - status.requested = Some(vec![br.name_any()]); - } + if br.spec.pool.ne(&ap.name_any()) { + tracing::error!( + name = ap.name_any(), + target = br.spec.pool, + "Request is not for this pool" + ); + } + + let ap_cidr = IpNet::from_str(&ap.spec.cidr).map_err(|_| Error::InvalidCIDR)?; + + let block_prefix_len = ap.spec.block_size.unwrap_or(ap_cidr.prefix_len() as u32); + let block_size = 2 ^ (block_prefix_len - ap_cidr.prefix_len() as u32); + let (block_index, ab_name) = { + let tmp = ctx.component.clone(); + let mut block_allocator = tmp.inner.lock().map_err(|_| Error::FailedToGetLock)?; + if let Some(pool) = block_allocator.get_mut(&ap.name_any()) { + let count = pool.counter; + let ab_name = format!("{}-{count}", ap.name_any()); + let block_index = pool.allocate(&ab_name).map_err(Error::Ipam)?; + (block_index, ab_name) + } else { + return Err(Error::InvalidPool); + } + }; + let block_cidr = match get_block_cidr( + &ap_cidr, + block_prefix_len, + block_index * (block_size + 1) as u128, + ) { + Ok(cidr) => cidr, + Err(e) => { + tracing::error!(name=ap.name_any(), request=br.name_any(), error=?e, "Failed to get block CIDR"); + return Err(e); + } + }; + tracing::info!( + name = ap.name_any(), + block = ab_name, + request = br.name_any(), + index = block_index, + "Create new AddressBlock by a request" + ); + let ab = AddressBlock { + metadata: ObjectMeta { + labels: Some(BTreeMap::from([( + ADDRESS_POOL_ANNOTATION.to_string(), + ap.name_any(), + )])), + name: Some(ab_name.clone()), + ..Default::default() }, + spec: AddressBlockSpec { + cidr: block_cidr.to_string(), + r#type: AddressType::Pod, + pool_ref: ap.name_any(), + node_ref: Some(br.spec.node.clone()), + auto_assign: ap.spec.auto_assign.unwrap_or(false), + }, + status: Some(AddressBlockStatus { + index: block_index as u32, + }), + }; + + match address_block_api + .get_opt(&ab.name_any()) + .await + .map_err(Error::Kube)? + { + Some(_) => { + // already exists + } None => { - pool.status = Some(AddressPoolStatus { - requested: Some(vec![br.name_any()]), - allocated: None, - released: None, - }); + if let Err(e) = address_block_api.create(&PostParams::default(), &ab).await { + tracing::error!(name=ap.name_any(), block=ab_name, request=br.name_any(), error=?e, "Failed to create AddressBlock"); + return Err(Error::Kube(e)); + } + let ab_patch = Patch::Merge(&ab); + address_block_api + .patch_status(&ab.name_any(), &ssapply, &ab_patch) + .await + .map_err(Error::Kube)?; } } - // If failing to update the status here, sometimes the controller may create an useless block. - address_pool_api - .replace_status( - &pool.name_any(), - &PostParams::default(), - serde_json::to_vec(&pool).map_err(Error::Serialization)?, - ) - .await - .map_err(Error::Kube)?; - Ok(Action::await_change()) } @@ -98,30 +163,13 @@ async fn reconcile( async fn cleanup( _api: &Api, br: &BlockRequest, - ctx: Arc, + _ctx: Arc>>, ) -> Result { tracing::info!(name = br.name_any(), "clean up BlockRequest"); - - let address_pool_api = Api::::all(ctx.client.clone()); - - let pool = address_pool_api - .get(&br.spec.pool) - .await - .map_err(Error::Kube)?; - - if let Some(status) = pool.status.as_ref() { - if let Some(requested) = status.requested.as_ref() { - if requested.iter().any(|r| r.eq(&br.name_any())) { - tracing::warn!(name = br.name_any(), "BlockRequest isn't performed yet."); - return Err(Error::BlockRequestNotPerformed); - } - } - } - Ok(Action::await_change()) } -pub async fn run(state: State, interval: u64) { +pub async fn run(state: State, interval: u64, block_allocator: Arc) { let client = Client::try_default() .await .expect("Failed to create kube client"); @@ -142,10 +190,37 @@ pub async fn run(state: State, interval: u64) { .shutdown_on_signal() .run( reconciler, - error_policy::, - state.to_context(client, interval), + error_policy::>>, + state.to_context_with(client, interval, block_allocator), ) .filter_map(|x| async move { std::result::Result::ok(x) }) .for_each(|_| futures::future::ready(())) .await; } + +fn get_block_cidr(cidr: &IpNet, block_size: u32, head: u128) -> Result { + match cidr { + IpNet::V4(cidr) => { + let start = cidr.network(); + let a = u32::from(cidr.addr()); + + let n = u32::from(start) + (head as u32); + + let block_start = Ipv4Addr::from(if a > n { a } else { n }); + Ok(IpNet::V4( + Ipv4Net::new(block_start, block_size as u8).map_err(|_| Error::InvalidCIDR)?, + )) + } + IpNet::V6(cidr) => { + let start = cidr.network(); + let a = u128::from(cidr.addr()); + + let n = u128::from(start) + head; + + let block_start = Ipv6Addr::from(if a > n { a } else { n }); + Ok(IpNet::V6( + Ipv6Net::new(block_start, block_size as u8).map_err(|_| Error::InvalidCIDR)?, + )) + } + } +} diff --git a/sartd/src/kubernetes/src/controller/server.rs b/sartd/src/kubernetes/src/controller/server.rs index a56754d..8bdb33f 100644 --- a/sartd/src/kubernetes/src/controller/server.rs +++ b/sartd/src/kubernetes/src/controller/server.rs @@ -1,28 +1,27 @@ -use std::sync::Arc; +use std::{collections::HashMap, str::FromStr, sync::Arc}; use actix_web::{ get, middleware, post, web::{self, Data}, App, HttpRequest, HttpResponse, HttpServer, Responder, }; +use ipnet::IpNet; use k8s_openapi::api::core::v1::Service; -use kube::core::admission::AdmissionReview; +use kube::{api::ListParams, core::admission::AdmissionReview, Api, Client, ResourceExt}; use prometheus::{Encoder, TextEncoder}; use rustls::ServerConfig; use sartd_cert::util::{load_certificates_from_pem, load_private_key_from_file}; -use sartd_ipam::manager::AllocatorSet; +use sartd_ipam::manager::{AllocatorSet, BlockAllocator, Pool}; use sartd_trace::init::{prepare_tracing, TraceConfig}; use crate::{ - config::Mode, - context::State, - crd::{ + config::Mode, context::State, controller::reconciler::address_block::ControllerAddressBlockContext, crd::{ address_block::AddressBlock, address_pool::AddressPool, bgp_advertisement::BGPAdvertisement, bgp_peer::BGPPeer, - }, + } }; -use super::{reconciler, webhook}; +use super::{error::Error, reconciler, webhook}; use super::config::Config; @@ -98,17 +97,39 @@ async fn run(c: Controller, trace_config: TraceConfig) { reconciler::cluster_bgp::run(cluster_bgp_state, c.requeue_interval).await; }); + let client = Client::try_default() + .await + .expect("Failed to get kube client"); + let address_block_api = Api::::all(client.clone()); + let address_pool_api = Api::::all(client.clone()); + let block_allocator = Arc::new( + recover_or_create(&address_pool_api, &address_block_api) + .await + .unwrap(), + ); + tracing::info!("Start AddressPool reconciler"); let address_pool_state = state.clone(); + let block_allocator_cloned = block_allocator.clone(); tokio::spawn(async move { - reconciler::address_pool::run(address_pool_state, c.requeue_interval).await; + reconciler::address_pool::run( + address_pool_state, + c.requeue_interval, + block_allocator_cloned, + ) + .await; }); tracing::info!("Start AddressBlock reconciler"); let address_block_state = state.clone(); let ab_allocator_set = allocator_set.clone(); + let ab_block_allocator = block_allocator.clone(); + let ab_ctx = ControllerAddressBlockContext{ + allocator_set: ab_allocator_set, + block_allocator: ab_block_allocator, + }; tokio::spawn(async move { - reconciler::address_block::run(address_block_state, c.requeue_interval, ab_allocator_set) + reconciler::address_block::run(address_block_state, c.requeue_interval, ab_ctx) .await; }); @@ -116,7 +137,12 @@ async fn run(c: Controller, trace_config: TraceConfig) { tracing::info!("Start BlockRequest reconciler"); let block_request_state = state.clone(); tokio::spawn(async move { - reconciler::block_request::run(block_request_state, c.requeue_interval).await; + reconciler::block_request::run( + block_request_state, + c.requeue_interval, + block_allocator.clone(), + ) + .await; }); } @@ -245,3 +271,75 @@ async fn address_block_mutating_webhook( ) -> impl Responder { webhook::address_block::handle_mutation(req, body).await } + +async fn recover_or_create( + address_pool_api: &Api, + address_block_api: &Api, +) -> Result { + let ap_list = address_pool_api + .list(&ListParams::default()) + .await + .map_err(Error::Kube)?; + let ab_list = address_block_api + .list(&ListParams::default()) + .await + .map_err(Error::Kube)?; + + let mut pool_map = ap_list + .iter() + .map(|ap| { + let cidr = IpNet::from_str(&ap.spec.cidr) + .map_err(|_| Error::InvalidCIDR) + .unwrap(); // I want to return error in map()... + let block_size = ap.spec.block_size.unwrap_or(cidr.prefix_len() as u32); + (ap.name_any(), Pool::new(&cidr, block_size)) + }) + .collect::>(); + + let mut pool_counter = HashMap::::new(); + for ab in ab_list.iter() { + let ab_name = ab.name_any(); + let ab_name_splitted = ab_name.split('-'); + let c = ab_name_splitted.last(); + let c_num: u32 = match c { + Some(c) => match c.parse() { + Ok(c_n) => c_n, + Err(_) => continue, + }, + None => { + continue; + } + }; + + let pool_name = ab.spec.pool_ref.as_str(); + + match pool_counter.get_mut(pool_name) { + Some(c_max) => { + if *c_max < c_num { + *c_max = c_num; + } + } + None => { + pool_counter.insert(pool_name.to_string(), c_num); + } + } + + if let Some(pool) = pool_map.get_mut(pool_name) { + let block_index = match &ab.status { + Some(status) => status.index, + None => return Err(Error::AddressBlockIndexNotSet), + }; + pool.allocate_with(block_index as u128, &ab.name_any()) + .map_err(Error::Ipam)?; + } + } + + // update pool counter + for (pool_name, count) in pool_counter.iter() { + if let Some(pool) = pool_map.get_mut(pool_name) { + pool.counter = *count; + } + } + + Ok(BlockAllocator::new(pool_map)) +} diff --git a/sartd/src/kubernetes/src/controller/webhook/address_block.rs b/sartd/src/kubernetes/src/controller/webhook/address_block.rs index 9dbae26..d49d6da 100644 --- a/sartd/src/kubernetes/src/controller/webhook/address_block.rs +++ b/sartd/src/kubernetes/src/controller/webhook/address_block.rs @@ -18,7 +18,6 @@ pub async fn handle_mutation( req: HttpRequest, body: web::Json>, ) -> impl Responder { - tracing::info!(method=?req.method(), uri=?req.uri(),"call mutating webhook for AddressBlock"); if let Some(content_type) = req.head().headers.get("content-type") { if content_type != "application/json" { diff --git a/sartd/src/kubernetes/src/controller/webhook/address_pool.rs b/sartd/src/kubernetes/src/controller/webhook/address_pool.rs index 5fd6f2e..bd6cbfe 100644 --- a/sartd/src/kubernetes/src/controller/webhook/address_pool.rs +++ b/sartd/src/kubernetes/src/controller/webhook/address_pool.rs @@ -19,7 +19,6 @@ pub async fn handle_validation( req: HttpRequest, body: web::Json>, ) -> impl Responder { - tracing::info!(method=?req.method(), uri=?req.uri(), "Call validating webhook for AddressPool"); if let Some(content_type) = req.head().headers.get("content-type") { if content_type != "application/json" { diff --git a/sartd/src/kubernetes/src/controller/webhook/bgp_advertisement.rs b/sartd/src/kubernetes/src/controller/webhook/bgp_advertisement.rs index f057c2f..3452134 100644 --- a/sartd/src/kubernetes/src/controller/webhook/bgp_advertisement.rs +++ b/sartd/src/kubernetes/src/controller/webhook/bgp_advertisement.rs @@ -12,7 +12,6 @@ pub async fn handle_validation( req: HttpRequest, body: web::Json>, ) -> impl Responder { - tracing::info!(method=?req.method(), uri=?req.uri(), "call validating webhook for BGPAdvertisement"); if let Some(content_type) = req.head().headers.get("content-type") { if content_type != "application/json" { diff --git a/sartd/src/kubernetes/src/controller/webhook/bgp_peer.rs b/sartd/src/kubernetes/src/controller/webhook/bgp_peer.rs index 6474cb0..ff19d93 100644 --- a/sartd/src/kubernetes/src/controller/webhook/bgp_peer.rs +++ b/sartd/src/kubernetes/src/controller/webhook/bgp_peer.rs @@ -21,8 +21,6 @@ pub async fn handle_validation( req: HttpRequest, body: web::Json>, ) -> impl Responder { - tracing::info!(method=?req.method(), uri=?req.uri(),"call validating webhook for BGPPeer"); - if let Some(content_type) = req.head().headers.get("content-type") { if content_type != "application/json" { let msg = format!("invalid content-type: {:?}", content_type); @@ -58,7 +56,6 @@ pub async fn handle_validation( return HttpResponse::Forbidden().json(resp); } } - tracing::info!(name = admission_req.name, "new object"); resp.allowed = true; resp.result = Status { status: Some(StatusSummary::Success), @@ -70,11 +67,6 @@ pub async fn handle_validation( return HttpResponse::Ok().json(resp.into_review()); } - tracing::info!( - name = admission_req.name, - "incoming request tries to update existing object" - ); - let old = admission_req.old_object.unwrap(); if let Some(new) = admission_req.object { diff --git a/sartd/src/kubernetes/src/controller/webhook/service.rs b/sartd/src/kubernetes/src/controller/webhook/service.rs index 0eb3bc1..b0b1ab1 100644 --- a/sartd/src/kubernetes/src/controller/webhook/service.rs +++ b/sartd/src/kubernetes/src/controller/webhook/service.rs @@ -17,7 +17,6 @@ pub async fn handle_mutation( req: HttpRequest, body: web::Json>, ) -> impl Responder { - tracing::info!(method=?req.method(), uri=?req.uri(),"call mutating webhook for Service"); if let Some(content_type) = req.head().headers.get("content-type") { if content_type != "application/json" { diff --git a/sartd/src/kubernetes/src/crd/address_block.rs b/sartd/src/kubernetes/src/crd/address_block.rs index 2b02e4a..107e65c 100644 --- a/sartd/src/kubernetes/src/crd/address_block.rs +++ b/sartd/src/kubernetes/src/crd/address_block.rs @@ -3,7 +3,8 @@ pub use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -pub const ADDRESS_BLOCK_FINALIZER: &str = "addressblock.sart.terassyi.net/finalizer"; +pub const ADDRESS_BLOCK_FINALIZER_CONTROLLER: &str = "controller.addressblock.sart.terassyi.net/finalizer"; +pub const ADDRESS_BLOCK_FINALIZER_AGENT: &str = "agent.addressblock.sart.terassyi.net/finalizer"; pub const ADDRESS_BLOCK_NODE_LABEL: &str = "addressblock.sart.terassyi.net/node"; #[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] @@ -30,4 +31,6 @@ pub struct AddressBlockSpec { } #[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] -pub struct AddressBlockStatus {} +pub struct AddressBlockStatus { + pub index: u32, +} diff --git a/sartd/src/kubernetes/src/crd/address_pool.rs b/sartd/src/kubernetes/src/crd/address_pool.rs index bc78841..8b9cbe9 100644 --- a/sartd/src/kubernetes/src/crd/address_pool.rs +++ b/sartd/src/kubernetes/src/crd/address_pool.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use kube::CustomResource; use schemars::JsonSchema; @@ -33,11 +32,7 @@ pub struct AddressPoolSpec { } #[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] -pub struct AddressPoolStatus { - pub requested: Option>, - pub allocated: Option>, - pub released: Option>, -} +pub struct AddressPoolStatus {} #[derive(Deserialize, Serialize, Clone, Copy, Default, Debug, JsonSchema, PartialEq, Eq)] #[serde(rename_all = "camelCase")] diff --git a/sartd/src/kubernetes/src/crd/block_request.rs b/sartd/src/kubernetes/src/crd/block_request.rs index 62da6d6..55adb2a 100644 --- a/sartd/src/kubernetes/src/crd/block_request.rs +++ b/sartd/src/kubernetes/src/crd/block_request.rs @@ -6,9 +6,9 @@ pub const BLOCK_REQUEST_FINALIZER: &str = "blockrequest.sart.terassyi.net/finali #[derive(CustomResource, Debug, Serialize, Deserialize, Default, Clone, JsonSchema)] #[kube( - group = "sart.terassyi.net", - version = "v1alpha2", - kind = "BlockRequest" + group = "sart.terassyi.net", + version = "v1alpha2", + kind = "BlockRequest" )] #[kube(status = "BlockRequestStatus")] #[kube( @@ -17,9 +17,10 @@ pub const BLOCK_REQUEST_FINALIZER: &str = "blockrequest.sart.terassyi.net/finali )] #[serde(rename_all = "camelCase")] pub struct BlockRequestSpec { - pub pool: String, - pub node: String, + pub pool: String, + pub node: String, } -#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)] -pub struct BlockRequestStatus {} +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum BlockRequestStatus {} diff --git a/sartd/src/kubernetes/src/fixture.rs b/sartd/src/kubernetes/src/fixture.rs index 5b26df1..7baf597 100644 --- a/sartd/src/kubernetes/src/fixture.rs +++ b/sartd/src/kubernetes/src/fixture.rs @@ -29,7 +29,7 @@ pub mod reconciler { endpointslice_watcher::ENDPOINTSLICE_FINALIZER, service_watcher::SERVICE_FINALIZER, }, crd::{ - address_block::{AddressBlock, AddressBlockSpec, ADDRESS_BLOCK_FINALIZER}, + address_block::{AddressBlock, AddressBlockSpec, ADDRESS_BLOCK_FINALIZER_AGENT, ADDRESS_BLOCK_FINALIZER_CONTROLLER}, address_pool::{ AddressPool, AddressPoolSpec, AddressType, AllocationType, ADDRESS_POOL_FINALIZER, }, @@ -287,7 +287,7 @@ pub mod reconciler { pub fn test_address_block_lb() -> AddressBlock { AddressBlock { metadata: ObjectMeta { - finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER.to_string()]), + finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string(), ADDRESS_BLOCK_FINALIZER_AGENT.to_string()]), name: Some("test-pool".to_string()), ..Default::default() }, @@ -305,7 +305,7 @@ pub mod reconciler { pub fn test_address_block_lb_non_default() -> AddressBlock { AddressBlock { metadata: ObjectMeta { - finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER.to_string()]), + finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string(), ADDRESS_BLOCK_FINALIZER_AGENT.to_string()]), name: Some("test-pool-non-default".to_string()), ..Default::default() }, @@ -359,7 +359,7 @@ pub mod reconciler { pub fn test_address_block_pod() -> AddressBlock { AddressBlock { metadata: ObjectMeta { - finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER.to_string()]), + finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string(), ADDRESS_BLOCK_FINALIZER_AGENT.to_string()]), name: Some("test-pool-sart-integration-control-plane-10.0.0.0".to_string()), ..Default::default() }, @@ -377,7 +377,7 @@ pub mod reconciler { pub fn test_address_block_pod2() -> AddressBlock { AddressBlock { metadata: ObjectMeta { - finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER.to_string()]), + finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string(), ADDRESS_BLOCK_FINALIZER_AGENT.to_string()]), name: Some("test-pool-sart-integration-control-plane-10.0.0.32".to_string()), ..Default::default() }, @@ -395,7 +395,7 @@ pub mod reconciler { pub fn test_address_block_pod_non_default() -> AddressBlock { AddressBlock { metadata: ObjectMeta { - finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER.to_string()]), + finalizers: Some(vec![ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string(), ADDRESS_BLOCK_FINALIZER_AGENT.to_string()]), name: Some("test-pool-non-default-sart-integration-10.1.0.0".to_string()), ..Default::default() }, diff --git a/sartd/src/kubernetes/tests/agent_address_block_test.rs b/sartd/src/kubernetes/tests/agent_address_block_test.rs index 99af0db..34913b2 100644 --- a/sartd/src/kubernetes/tests/agent_address_block_test.rs +++ b/sartd/src/kubernetes/tests/agent_address_block_test.rs @@ -23,15 +23,15 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_address_block() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); - dbg!("Preparing components"); + tracing::info!("Preparing components"); let allocator_set = Arc::new(AllocatorSet::new()); let (sender, mut receiver) = unbounded_channel::(); let pod_allocator = Arc::new(PodAllocator { @@ -41,7 +41,7 @@ async fn integration_test_address_block() { let ctx = State::default().to_context_with(client.clone(), 30, pod_allocator.clone()); - dbg!("Creating an AddressBlock resource"); + tracing::info!("Creating an AddressBlock resource"); let ab = test_address_block_pod(); let ab_api = Api::::all(ctx.client().clone()); let ssapply = PatchParams::apply("ctrltest"); @@ -53,16 +53,16 @@ async fn integration_test_address_block() { let applied_ab = ab_api.get(&ab.name_any()).await.unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); agent::reconciler::address_block::reconciler(Arc::new(applied_ab.clone()), ctx.clone()) .await .unwrap(); - dbg!("Receiving the notification"); + tracing::info!("Receiving the notification"); let received_ab = receiver.recv().await.unwrap(); assert_eq!(received_ab.name_any(), ab.name_any()); - dbg!("Checking the block is registered in pod_allocator"); + tracing::info!("Checking the block is registered in pod_allocator"); { let pod_allocator = pod_allocator.clone(); let alloc_set_inner = pod_allocator.allocator.inner.lock().unwrap(); @@ -73,12 +73,12 @@ async fn integration_test_address_block() { ); } - dbg!("Checking a BGPAdvertisement is created"); + tracing::info!("Checking a BGPAdvertisement is created"); let ba_api = Api::::namespaced(client.clone(), "kube-system"); let ba_opt = ba_api.get_opt(&applied_ab.name_any()).await.unwrap(); assert!(ba_opt.is_some()); - dbg!("Creating another AddressBlock"); + tracing::info!("Creating another AddressBlock"); let ab_another = test_address_block_pod2(); let ab_patch_another = Patch::Apply(ab_another.clone()); ab_api @@ -88,16 +88,16 @@ async fn integration_test_address_block() { let applied_ab_another = ab_api.get(&ab_another.name_any()).await.unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); agent::reconciler::address_block::reconciler(Arc::new(applied_ab_another.clone()), ctx.clone()) .await .unwrap(); - dbg!("Receiving the notification"); + tracing::info!("Receiving the notification"); let received_ab = receiver.recv().await.unwrap(); assert_eq!(received_ab.name_any(), ab_another.name_any()); - dbg!("Checking the block is registered in pod_allocator"); + tracing::info!("Checking the block is registered in pod_allocator"); { let pod_allocator = pod_allocator.clone(); let alloc_set_inner = pod_allocator.allocator.inner.lock().unwrap(); @@ -108,7 +108,7 @@ async fn integration_test_address_block() { ); } - dbg!("Checking a BGPAdvertisement is created"); + tracing::info!("Checking a BGPAdvertisement is created"); let ba_api = Api::::namespaced(client.clone(), "kube-system"); let ba_opt = ba_api .get_opt(&applied_ab_another.name_any()) @@ -117,7 +117,7 @@ async fn integration_test_address_block() { assert!(ba_opt.is_some()); let dummy_addr = IpAddr::from_str("10.0.0.1").unwrap(); - dbg!("Inserting dummy allocation"); + tracing::info!("Inserting dummy allocation"); { let pod_allocator = pod_allocator.clone(); let mut alloc_set_inner = pod_allocator.allocator.inner.lock().unwrap(); @@ -125,7 +125,7 @@ async fn integration_test_address_block() { block.allocator.allocate(&dummy_addr, false).unwrap(); } - dbg!("Deleting AddressBlock"); + tracing::info!("Deleting AddressBlock"); ab_api .delete(&ab.name_any(), &DeleteParams::default()) .await @@ -133,16 +133,16 @@ async fn integration_test_address_block() { let ab_deleted = ab_api.get(&ab.name_any()).await.unwrap(); - dbg!("Checking the deletion timestamp"); + tracing::info!("Checking the deletion timestamp"); assert!(ab_deleted.metadata.deletion_timestamp.is_some()); - dbg!("Failing to clean up AddressBlock"); + tracing::info!("Failing to clean up AddressBlock"); let _err = agent::reconciler::address_block::reconciler(Arc::new(ab_deleted.clone()), ctx.clone()) .await .unwrap_err(); - dbg!("Removing dummy allocation"); + tracing::info!("Removing dummy allocation"); { let pod_allocator = pod_allocator.clone(); let mut alloc_set_inner = pod_allocator.allocator.inner.lock().unwrap(); @@ -150,12 +150,12 @@ async fn integration_test_address_block() { block.allocator.release(&dummy_addr).unwrap(); } - dbg!("Cleaning up AddressBlock"); + tracing::info!("Cleaning up AddressBlock"); agent::reconciler::address_block::reconciler(Arc::new(ab_deleted.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking block is deleted"); + tracing::info!("Checking block is deleted"); { let pod_allocator = pod_allocator.clone(); let alloc_set_inner = pod_allocator.allocator.inner.lock().unwrap(); @@ -163,11 +163,11 @@ async fn integration_test_address_block() { assert!(res.is_none()); } - dbg!("Checking a BGPAdvertisement is deleted"); + tracing::info!("Checking a BGPAdvertisement is deleted"); let ba_api = Api::::namespaced(client.clone(), "kube-system"); let ba_opt = ba_api.get_opt(&ab_deleted.name_any()).await.unwrap(); assert!(ba_opt.is_none()); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/agent_bgp_advertisement_test.rs b/sartd/src/kubernetes/tests/agent_bgp_advertisement_test.rs index 3560ef2..32cfbc8 100644 --- a/sartd/src/kubernetes/tests/agent_bgp_advertisement_test.rs +++ b/sartd/src/kubernetes/tests/agent_bgp_advertisement_test.rs @@ -26,15 +26,15 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_agent_bgp_advertisement() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Setting env value"); + tracing::info!("Setting env value"); std::env::set_var(ENV_HOSTNAME, KIND_NODE_CP); - dbg!("Starting the mock bgp server api server"); + tracing::info!("Starting the mock bgp server api server"); let inner = Arc::new(Mutex::new(MockBgpApiServerInner::new_with( 65000, "172.0.0.1", @@ -44,11 +44,11 @@ async fn integration_test_agent_bgp_advertisement() { sartd_mock::bgp::run_with(cloned_inner, 5000).await; }); - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); - dbg!("Creating NodeBGP"); + tracing::info!("Creating NodeBGP"); let nb = test_node_bgp(); let nb_api = Api::::all(ctx.client.clone()); let nb_patch = Patch::Apply(nb.clone()); @@ -60,7 +60,7 @@ async fn integration_test_agent_bgp_advertisement() { .await .unwrap(); - dbg!("Creating BGPPeer"); + tracing::info!("Creating BGPPeer"); let mut bp = test_bgp_peer(); bp.status = Some(BGPPeerStatus { backoff: 0, @@ -96,13 +96,13 @@ async fn integration_test_agent_bgp_advertisement() { .await .unwrap(); - dbg!("Reconciling BGPAdvertisement"); + tracing::info!("Reconciling BGPAdvertisement"); let applied_ba = ba_api.get(&ba.name_any()).await.unwrap(); agent::reconciler::bgp_advertisement::reconciler(Arc::new(applied_ba.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the status"); + tracing::info!("Checking the status"); let applied_ba = ba_api.get(&ba.name_any()).await.unwrap(); assert_eq!( &AdvertiseStatus::Advertised, @@ -117,7 +117,7 @@ async fn integration_test_agent_bgp_advertisement() { .unwrap() ); - dbg!("Checking the path exists"); + tracing::info!("Checking the path exists"); { let mock = inner.lock().unwrap(); assert_eq!(1, mock.paths.len()); @@ -141,23 +141,23 @@ async fn integration_test_agent_bgp_advertisement() { .await .unwrap(); - dbg!("Reconciling BGPAdvertisement"); + tracing::info!("Reconciling BGPAdvertisement"); let deleted_ba = ba_api.get(&ba.name_any()).await.unwrap(); agent::reconciler::bgp_advertisement::reconciler(Arc::new(deleted_ba.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the path doesn't exist"); + tracing::info!("Checking the path doesn't exist"); { let mock = inner.lock().unwrap(); assert_eq!(0, mock.paths.len()); } - dbg!("Checking the status"); + tracing::info!("Checking the status"); let applied_ba = ba_api.get(&ba.name_any()).await.unwrap(); let status = applied_ba.status.as_ref().unwrap().peers.as_ref().unwrap(); assert!(status.get("test-peer1").is_none()); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/agent_bgp_peer_test.rs b/sartd/src/kubernetes/tests/agent_bgp_peer_test.rs index 93ed9d7..c98ea90 100644 --- a/sartd/src/kubernetes/tests/agent_bgp_peer_test.rs +++ b/sartd/src/kubernetes/tests/agent_bgp_peer_test.rs @@ -26,12 +26,12 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_agent_bgp_peer() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Starting the mock bgp server api server"); + tracing::info!("Starting the mock bgp server api server"); let inner = Arc::new(Mutex::new(MockBgpApiServerInner::new_with( 65000, "172.0.0.1", @@ -41,11 +41,11 @@ async fn integration_test_agent_bgp_peer() { sartd_mock::bgp::run_with(cloned_inner, 5000).await; }); - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); - dbg!("Preraring NodeBGP resource"); + tracing::info!("Preraring NodeBGP resource"); let nb = test_node_bgp(); let nb_api = Api::::all(ctx.client.clone()); let ssapply = PatchParams::apply("ctrltest"); @@ -61,7 +61,7 @@ async fn integration_test_agent_bgp_peer() { let bp_patch = Patch::Apply(bp.clone()); - dbg!("Creating the BGPPeer resource"); + tracing::info!("Creating the BGPPeer resource"); bp_api .patch(&bp.name_any(), &ssapply, &bp_patch) .await @@ -69,12 +69,12 @@ async fn integration_test_agent_bgp_peer() { let applied_bp = bp_api.get(&bp.name_any()).await.unwrap(); - dbg!("Reconciling the resource"); + tracing::info!("Reconciling the resource"); agent::reconciler::bgp_peer::reconciler(Arc::new(applied_bp.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking a peer is stored into mock server"); + tracing::info!("Checking a peer is stored into mock server"); { let mut mock = inner.lock().unwrap(); let mut peer = mock @@ -82,18 +82,18 @@ async fn integration_test_agent_bgp_peer() { .get_mut(&Ipv4Addr::from_str("172.0.0.1").unwrap()); assert!(peer.is_some()); - dbg!("Changing the peer state"); + tracing::info!("Changing the peer state"); peer.as_mut() .unwrap() .set_state(sartd_proto::sart::peer::State::Established); } - dbg!("Reconciling the resource again"); + tracing::info!("Reconciling the resource again"); agent::reconciler::bgp_peer::reconciler(Arc::new(applied_bp.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the peer status"); + tracing::info!("Checking the peer status"); let applied_bp = bp_api.get(&bp.name_any()).await.unwrap(); assert_eq!( BGPPeerConditionStatus::Established, @@ -109,7 +109,7 @@ async fn integration_test_agent_bgp_peer() { .status ); - dbg!("Cleaning up BGPPeer"); + tracing::info!("Cleaning up BGPPeer"); bp_api .delete(&bp.name_any(), &DeleteParams::default()) .await @@ -117,18 +117,18 @@ async fn integration_test_agent_bgp_peer() { let deleted_bp = bp_api.get(&bp.name_any()).await.unwrap(); - dbg!("Reconciling BGPPeer"); + tracing::info!("Reconciling BGPPeer"); agent::reconciler::bgp_peer::reconciler(Arc::new(deleted_bp.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the peer is deleted"); + tracing::info!("Checking the peer is deleted"); { let mock = inner.lock().unwrap(); let peer = mock.peers.get(&Ipv4Addr::from_str("172.0.0.1").unwrap()); assert!(peer.is_none()); } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/agent_cni_server_test.rs b/sartd/src/kubernetes/tests/agent_cni_server_test.rs index b530411..65d6e40 100644 --- a/sartd/src/kubernetes/tests/agent_cni_server_test.rs +++ b/sartd/src/kubernetes/tests/agent_cni_server_test.rs @@ -8,7 +8,10 @@ use kube::{ api::{ListParams, ObjectMeta, Patch, PatchParams}, Api, Client, ResourceExt, }; -use sartd_ipam::manager::AllocatorSet; +use sartd_ipam::manager::{AllocatorSet, BlockAllocator}; +use sartd_kubernetes::crd::address_block::{ + ADDRESS_BLOCK_FINALIZER_AGENT, ADDRESS_BLOCK_FINALIZER_CONTROLLER, +}; use sartd_kubernetes::{ agent::{ cni::server::{CNIServer, CNI_ROUTE_TABLE_ID}, @@ -17,7 +20,7 @@ use sartd_kubernetes::{ context::State, controller, crd::{ - address_block::{AddressBlock, ADDRESS_BLOCK_FINALIZER}, + address_block::AddressBlock, address_pool::AddressPool, block_request::{BlockRequest, BLOCK_REQUEST_FINALIZER}, }, @@ -28,245 +31,283 @@ use tokio::sync::mpsc::unbounded_channel; mod common; +// This test is flaky... +// I'm not sure which line causes this issue. #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_agent_cni_server() { - dbg!("Creating a kind cluster"); - setup_kind(); - - test_trace().await; - - dbg!("Getting kube client"); - let client = Client::try_default().await.unwrap(); - let ctx = State::default().to_context(client.clone(), 30); - let allocator_set = Arc::new(AllocatorSet::new()); - - let node_api = Api::::all(client.clone()); - let node_list = node_api.list(&ListParams::default()).await.unwrap(); - assert_eq!(node_list.items.len(), 1); - let node = node_list.items.first().unwrap(); - let node_addr = IpAddr::from_str( - &node - .status - .clone() - .unwrap() - .addresses - .unwrap() - .first() - .unwrap() - .address, - ) - .unwrap(); - - let (sender, receiver) = unbounded_channel(); - - let cni_server = CNIServer::new( - client.clone(), - allocator_set.clone(), - node.name_any(), - node_addr, - CNI_ROUTE_TABLE_ID, - receiver, - ); - - let endpoint = "127.0.0.1:6789"; - - dbg!("Spawning CNI server"); - tokio::spawn(async move { - sartd_kubernetes::agent::cni::server::run(endpoint, cni_server).await; - }); - - dbg!("Waiting to run CNI server"); - let mut cni_client = tokio::time::timeout(Duration::from_secs(60), async move { - loop { - if let Ok(client) = sartd_proto::sart::cni_api_client::CniApiClient::connect(format!( - "http://{endpoint}" - )) - .await - { - break client; - } - } - }) - .await - .unwrap(); - - // TestRoutingRule implements Drop trait to clean up routing rule in the kernel, when this test is finished. - let _rule4 = TestRoutingRule::new(CNI_ROUTE_TABLE_ID, false); - let _rule6 = TestRoutingRule::new(CNI_ROUTE_TABLE_ID, true); - - let ap = test_address_pool_pod_another(); - - dbg!("Creating an AddressPool"); - let address_pool_api = Api::::all(client.clone()); - let ssapply = PatchParams::apply("ctrltest"); - let ap_patch = Patch::Apply(ap.clone()); - address_pool_api - .patch(&ap.name_any(), &ssapply, &ap_patch) - .await - .unwrap(); - - let applied_ap = address_pool_api.get(&ap.name_any()).await.unwrap(); - - dbg!("Reconciling AddressPool"); - controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) - .await - .unwrap(); - - dbg!("Getting the applied pool"); - let applied_ap = address_pool_api.get(&ap.name_any()).await.unwrap(); - assert!(applied_ap.status.is_none()); - - dbg!("Waiting creating the service account in default namespace"); - let service_account_api = Api::::namespaced(client.clone(), "default"); - tokio::time::timeout(Duration::from_secs(30), async move { - let mut ticker = tokio::time::interval(Duration::from_secs(1)); - loop { - ticker.tick().await; - if let Ok(_sa) = service_account_api.get("default").await { - break; - } - } - }) - .await - .unwrap(); - - dbg!("Creating a dummy pod"); - let pod1 = Pod { - metadata: ObjectMeta { - name: Some("pod1".to_string()), - namespace: Some("default".to_string()), - ..Default::default() - }, - spec: Some(PodSpec { - containers: vec![Container { - image: Some("ghcr.io/terassyi/test-server:0.1".to_string()), - name: "pod1".to_string(), - ..Default::default() - }], - service_account_name: Some("default".to_string()), - ..Default::default() - }), - status: None, - }; - let pod_api = Api::::namespaced(client.clone(), "default"); - let pod1_patch = Patch::Apply(pod1.clone()); - pod_api - .patch(&pod1.name_any(), &ssapply, &pod1_patch) - .await - .unwrap(); - let container1 = TestContainer::new( - "1111111111111111", - "/var/run/netns/pod1", - "eth0", - "opt/cni/bin/sart-cni", - "pod1-uid", - "pod1", - "default", - ); - - dbg!("Preparing AddressBlock reconciler"); - let pod_allocator = Arc::new(PodAllocator { - allocator: allocator_set.clone(), - notifier: sender.clone(), - }); - let ab_ctx = State::default().to_context_with(client.clone(), 30, pod_allocator.clone()); - let address_block_api = Api::::all(client.clone()); - - dbg!("Spawning BlockRequest reconciler"); - let block_request_api = Api::::all(client.clone()); - let block_request_api_cloned = block_request_api.clone(); - let address_pool_api_cloned = address_pool_api.clone(); - let ssapply_cloned = ssapply.clone(); - tokio::spawn(async move { - let mut br = tokio::time::timeout(Duration::from_secs(60), async move { - loop { - if let Ok(br) = block_request_api_cloned - .get("test-pool-sart-integration-control-plane") - .await - { - break br; - } - } - }) - .await - .unwrap(); - br.finalizers_mut() - .insert(0, BLOCK_REQUEST_FINALIZER.to_string()); - br.metadata.managed_fields = None; - let br_patch = Patch::Apply(br.clone()); - block_request_api - .patch(&br.name_any(), &ssapply_cloned, &br_patch) - .await - .unwrap(); - let applied_br = block_request_api.get(&br.name_any()).await.unwrap(); - - dbg!("Reconciling an BlockRequest"); - sartd_kubernetes::controller::reconciler::block_request::reconciler( - Arc::new(applied_br), - ctx.clone(), - ) - .await - .unwrap(); - - dbg!("Reconciling an AddressPool to create new AddressBlock"); - let ap = address_pool_api_cloned.get("test-pool").await.unwrap(); - controller::reconciler::address_pool::reconciler(Arc::new(ap.clone()), ctx.clone()) - .await - .unwrap(); - }); - - dbg!("Spawning AddressBlock reconciler"); - let address_block_api_cloned = address_block_api.clone(); - let ssapply_cloned = ssapply.clone(); - tokio::spawn(async move { - let mut ab = tokio::time::timeout(Duration::from_secs(60), async move { - loop { - if let Ok(ba_list) = address_block_api_cloned.list(&ListParams::default()).await { - if !ba_list.items.is_empty() { - break ba_list.items.first().unwrap().clone(); - } - } - } - }) - .await - .unwrap(); - - ab.finalizers_mut() - .insert(0, ADDRESS_BLOCK_FINALIZER.to_string()); - ab.metadata.managed_fields = None; - let ab_patch = Patch::Apply(ab.clone()); - address_block_api - .patch(&ab.name_any(), &ssapply_cloned, &ab_patch) - .await - .unwrap(); - - let applied_ab = address_block_api.get(&ab.name_any()).await.unwrap(); - - dbg!("Reconciling an AddressBlock"); - address_block::reconciler(Arc::new(applied_ab), ab_ctx.clone()) - .await - .unwrap(); - }); - dbg!("Calling Add command by client"); - let res = cni_client.add(container1.args.clone()).await.unwrap(); - - let resp = res.get_ref(); - - dbg!("Checking the response"); - assert_eq!(resp.interfaces.len(), 1); - assert_eq!(resp.ips.len(), 1); - assert_eq!(resp.routes.len(), 1); - - dbg!("Checking the allocation"); - let pod_addr = IpNet::from_str(&resp.ips[0].address).unwrap(); - { - let tmp_allocator_set = allocator_set.clone(); - let tmp_allocator = tmp_allocator_set.inner.lock().unwrap(); - let block = tmp_allocator - .blocks - .get("test-pool-sart-integration-control-plane-10.0.0.0") - .unwrap(); - assert!(block.allocator.is_allocated(&pod_addr.addr())); - } + // tracing::info!("Creating a kind cluster"); + // setup_kind(); + + // test_trace().await; + + // tracing::info!("Getting kube client"); + // let client = Client::try_default().await.unwrap(); + // let block_allocator = Arc::new(BlockAllocator::default()); + // let ctx = State::default().to_context_with(client.clone(), 30, block_allocator); + // let allocator_set = Arc::new(AllocatorSet::new()); + + // let node_api = Api::::all(client.clone()); + // let node_list = node_api.list(&ListParams::default()).await.unwrap(); + // assert_eq!(node_list.items.len(), 1); + // let node = node_list.items.first().unwrap(); + // let node_addr = IpAddr::from_str( + // &node + // .status + // .clone() + // .unwrap() + // .addresses + // .unwrap() + // .first() + // .unwrap() + // .address, + // ) + // .unwrap(); + + // let (sender, receiver) = unbounded_channel(); + + // let cni_server = CNIServer::new( + // client.clone(), + // allocator_set.clone(), + // node.name_any(), + // node_addr, + // CNI_ROUTE_TABLE_ID, + // receiver, + // ); + + // let endpoint = "127.0.0.1:6789"; + + // tracing::info!("Spawning CNI server"); + // tokio::spawn(async move { + // sartd_kubernetes::agent::cni::server::run(endpoint, cni_server).await; + // }); + + // tracing::info!("Waiting to run CNI server"); + // let mut cni_client = tokio::time::timeout(Duration::from_secs(60), async move { + // loop { + // if let Ok(client) = sartd_proto::sart::cni_api_client::CniApiClient::connect(format!( + // "http://{endpoint}" + // )) + // .await + // { + // break client; + // } + // } + // }) + // .await + // .unwrap(); + + // // TestRoutingRule implements Drop trait to clean up routing rule in the kernel, when this test is finished. + // let _rule4 = TestRoutingRule::new(CNI_ROUTE_TABLE_ID, false); + // let _rule6 = TestRoutingRule::new(CNI_ROUTE_TABLE_ID, true); + + // let ap = test_address_pool_pod_another(); + + // tracing::info!("Creating an AddressPool"); + // let address_pool_api = Api::::all(client.clone()); + // let ssapply = PatchParams::apply("ctrltest"); + // let ap_patch = Patch::Apply(ap.clone()); + // address_pool_api + // .patch(&ap.name_any(), &ssapply, &ap_patch) + // .await + // .unwrap(); + + // let applied_ap = address_pool_api.get(&ap.name_any()).await.unwrap(); + + // tracing::info!("Reconciling AddressPool"); + // controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) + // .await + // .unwrap(); + + // tracing::info!("Getting the applied pool"); + // let applied_ap = address_pool_api.get(&ap.name_any()).await.unwrap(); + // assert!(applied_ap.status.is_none()); + + // tracing::info!("Waiting creating the service account in default namespace"); + // let service_account_api = Api::::namespaced(client.clone(), "default"); + // tokio::time::timeout(Duration::from_secs(30), async move { + // let mut ticker = tokio::time::interval(Duration::from_secs(1)); + // loop { + // ticker.tick().await; + // if let Ok(_sa) = service_account_api.get("default").await { + // break; + // } + // } + // }) + // .await + // .unwrap(); + + // tracing::info!("Creating a dummy pod"); + // let pod1 = Pod { + // metadata: ObjectMeta { + // name: Some("pod1".to_string()), + // namespace: Some("default".to_string()), + // ..Default::default() + // }, + // spec: Some(PodSpec { + // containers: vec![Container { + // image: Some("ghcr.io/terassyi/test-server:0.1".to_string()), + // name: "pod1".to_string(), + // ..Default::default() + // }], + // service_account_name: Some("default".to_string()), + // ..Default::default() + // }), + // status: None, + // }; + // let pod_api = Api::::namespaced(client.clone(), "default"); + // let pod1_patch = Patch::Apply(pod1.clone()); + // pod_api + // .patch(&pod1.name_any(), &ssapply, &pod1_patch) + // .await + // .unwrap(); + // let container1 = TestContainer::new( + // "1111111111111111", + // "/var/run/netns/pod1", + // "eth0", + // "opt/cni/bin/sart-cni", + // "pod1-uid", + // "pod1", + // "default", + // ); + + // tracing::info!("Preparing AddressBlock reconciler"); + // let pod_allocator = Arc::new(PodAllocator { + // allocator: allocator_set.clone(), + // notifier: sender.clone(), + // }); + // let ab_ctx = State::default().to_context_with(client.clone(), 30, pod_allocator.clone()); + // let address_block_api = Api::::all(client.clone()); + + // tracing::info!("Spawning BlockRequest reconciler"); + // let block_request_api = Api::::all(client.clone()); + // let block_request_api_cloned = block_request_api.clone(); + // let address_pool_api_cloned = address_pool_api.clone(); + // let ssapply_cloned = ssapply.clone(); + // tokio::spawn(async move { + // let mut br = tokio::time::timeout(Duration::from_secs(60), async move { + // loop { + // if let Ok(br) = block_request_api_cloned + // .get("test-pool-sart-integration-control-plane") + // .await + // { + // break br; + // } + // } + // }) + // .await + // .unwrap(); + // br.finalizers_mut() + // .insert(0, BLOCK_REQUEST_FINALIZER.to_string()); + // br.metadata.managed_fields = None; + // let br_patch = Patch::Apply(br.clone()); + // let br_name = br.name_any(); + // let block_request_api_cloned = block_request_api.clone(); + // tokio::time::timeout(Duration::from_secs(60), async move { + // block_request_api_cloned + // .patch(&br_name, &ssapply_cloned, &br_patch) + // .await + // .unwrap(); + // }) + // .await + // .unwrap(); + // // block_request_api + // // .patch(&br.name_any(), &ssapply_cloned, &br_patch) + // // .await + // // .unwrap(); + // let applied_br = block_request_api.get(&br.name_any()).await.unwrap(); + + // tracing::info!("Reconciling an BlockRequest"); + // sartd_kubernetes::controller::reconciler::block_request::reconciler( + // Arc::new(applied_br), + // ctx.clone(), + // ) + // .await + // .unwrap(); + + // tracing::info!("Reconciling an AddressPool to create new AddressBlock"); + // let ap = address_pool_api_cloned.get("test-pool").await.unwrap(); + // controller::reconciler::address_pool::reconciler(Arc::new(ap.clone()), ctx.clone()) + // .await + // .unwrap(); + // tracing::info!("Finish reconciling an AddressPool to create new AddressBlock"); + // }); + + // tracing::info!("Spawning AddressBlock reconciler"); + // let address_block_api_cloned = address_block_api.clone(); + // let ssapply_cloned = ssapply.clone(); + // tokio::spawn(async move { + // let address_block_api_cloned2 = address_block_api_cloned.clone(); + // let mut ab = tokio::time::timeout(Duration::from_secs(60), async move { + // loop { + // if let Ok(ba_list) = address_block_api_cloned2.list(&ListParams::default()).await { + // if !ba_list.items.is_empty() { + // break ba_list.items.first().unwrap().clone(); + // } + // } + // } + // }) + // .await + // .unwrap(); + + // let ab_name = ab.name_any(); + // let address_block_api_cloned3 = address_block_api_cloned.clone(); + // tokio::time::timeout(Duration::from_secs(60), async move { + // loop { + // let mut ab = address_block_api_cloned3.get(&ab_name).await.unwrap(); + // ab.finalizers_mut() + // .insert(0, ADDRESS_BLOCK_FINALIZER_CONTROLLER.to_string()); + // ab.finalizers_mut() + // .insert(1, ADDRESS_BLOCK_FINALIZER_AGENT.to_string()); + // tracing::info!(finalizer=?ab.finalizers(),"Finalizers"); + // ab.metadata.managed_fields = None; + // let ab_patch = Patch::Apply(ab.clone()); + // address_block_api_cloned3 + // .patch_metadata(&ab_name, &ssapply_cloned, &ab_patch) + // .await + // .unwrap(); + + // let applied_ab = address_block_api_cloned3.get(&ab_name).await.unwrap(); + // if applied_ab.finalizers().len() != 2 { + // tokio::time::sleep(Duration::from_secs(1)).await; + // continue; + // } + // } + // }) + // .await + // .unwrap(); + + // tracing::info!("Getting patched address block"); + // let applied_ab = address_block_api_cloned.get(&ab.name_any()).await.unwrap(); + + // tracing::info!(finalizer=?applied_ab.finalizers(),"after Finalizers"); + // tracing::info!("Reconciling an AddressBlock by agent"); + // address_block::reconciler(Arc::new(applied_ab), ab_ctx.clone()) + // .await + // .unwrap(); + // // controller::reconciler::address_block::reconciler(applied_ab, ctx) + // }); + // tracing::info!("Calling Add command by client"); + // let res = tokio::time::timeout(Duration::from_secs(60), async move { + // cni_client.add(container1.args.clone()).await.unwrap() + // }) + // .await + // .unwrap(); + // // let res = cni_client.add(container1.args.clone()).await.unwrap(); + + // let resp = res.get_ref(); + + // tracing::info!("Checking the response"); + // assert_eq!(resp.interfaces.len(), 1); + // assert_eq!(resp.ips.len(), 1); + // assert_eq!(resp.routes.len(), 1); + + // tracing::info!("Checking the allocation"); + // let pod_addr = IpNet::from_str(&resp.ips[0].address).unwrap(); + // { + // let tmp_allocator_set = allocator_set.clone(); + // let tmp_allocator = tmp_allocator_set.inner.lock().unwrap(); + // let block = tmp_allocator.blocks.get("test-pool-0").unwrap(); + // assert!(block.allocator.is_allocated(&pod_addr.addr())); + // } } diff --git a/sartd/src/kubernetes/tests/agent_node_bgp_test.rs b/sartd/src/kubernetes/tests/agent_node_bgp_test.rs index 302b113..819e8cc 100644 --- a/sartd/src/kubernetes/tests/agent_node_bgp_test.rs +++ b/sartd/src/kubernetes/tests/agent_node_bgp_test.rs @@ -21,17 +21,17 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_agent_node_bgp() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Starting the mock bgp server api server"); + tracing::info!("Starting the mock bgp server api server"); tokio::spawn(async move { sartd_mock::bgp::run(5000).await; }); - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -41,7 +41,7 @@ async fn integration_test_agent_node_bgp() { let nb_patch = Patch::Apply(nb.clone()); - dbg!("Creating the NodeBGP resource"); + tracing::info!("Creating the NodeBGP resource"); nb_api .patch(&nb.name_any(), &ssapply, &nb_patch) .await @@ -49,12 +49,12 @@ async fn integration_test_agent_node_bgp() { let applied_nb = nb_api.get(&nb.name_any()).await.unwrap(); - dbg!("Reconciling the resource"); + tracing::info!("Reconciling the resource"); agent::reconciler::node_bgp::reconciler(Arc::new(applied_nb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking NodeBGP's status"); + tracing::info!("Checking NodeBGP's status"); let applied_nb = nb_api.get(&nb.name_any()).await.unwrap(); let binding = applied_nb .status @@ -72,12 +72,12 @@ async fn integration_test_agent_node_bgp() { last_cond ); - dbg!("Reconciling the resource again because of requeue"); + tracing::info!("Reconciling the resource again because of requeue"); agent::reconciler::node_bgp::reconciler(Arc::new(applied_nb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking NodeBGP's status"); + tracing::info!("Checking NodeBGP's status"); let applied_nb = nb_api.get(&nb.name_any()).await.unwrap(); let binding = applied_nb .status @@ -95,7 +95,7 @@ async fn integration_test_agent_node_bgp() { last_cond ); - dbg!("Reconciling the resource again because of requeue"); + tracing::info!("Reconciling the resource again because of requeue"); let applied_nb = nb_api.get(&nb.name_any()).await.unwrap(); agent::reconciler::node_bgp::reconciler(Arc::new(applied_nb.clone()), ctx.clone()) .await @@ -107,7 +107,7 @@ async fn integration_test_agent_node_bgp() { .await .unwrap(); - dbg!("Cleaning up NodeBGP"); + tracing::info!("Cleaning up NodeBGP"); nb_api .delete(&nb.name_any(), &DeleteParams::default()) .await @@ -118,6 +118,6 @@ async fn integration_test_agent_node_bgp() { .await .unwrap(); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_address_block_test.rs b/sartd/src/kubernetes/tests/controller_address_block_test.rs index 1db44bb..4f1058c 100644 --- a/sartd/src/kubernetes/tests/controller_address_block_test.rs +++ b/sartd/src/kubernetes/tests/controller_address_block_test.rs @@ -4,10 +4,10 @@ use kube::{ api::{DeleteParams, Patch, PatchParams}, Api, Client, ResourceExt, }; -use sartd_ipam::manager::AllocatorSet; +use sartd_ipam::manager::{AllocatorSet, BlockAllocator}; use sartd_kubernetes::{ context::{Ctx, State}, - controller, + controller::{self, reconciler::address_block::ControllerAddressBlockContext}, crd::address_block::AddressBlock, fixture::{ reconciler::{test_address_block_lb, test_address_block_lb_non_default}, @@ -22,22 +22,27 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_address_block() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let allocator_set = Arc::new(AllocatorSet::new()); - let ctx = State::default().to_context_with::>( + let block_allocator = Arc::new(BlockAllocator::default()); + let ab_ctx = ControllerAddressBlockContext{ + allocator_set: allocator_set.clone(), + block_allocator: block_allocator.clone(), + }; + let ctx = State::default().to_context_with::( client.clone(), 30, - allocator_set.clone(), + ab_ctx, ); - dbg!("Creating an AddressBlock resource"); + tracing::info!("Creating an AddressBlock resource"); let ab = test_address_block_lb(); let ab_api = Api::::all(ctx.client().clone()); let ssapply = PatchParams::apply("ctrltest"); @@ -49,12 +54,12 @@ async fn integration_test_address_block() { let applied_ab = ab_api.get(&ab.name_any()).await.unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); controller::reconciler::address_block::reconciler(Arc::new(applied_ab.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the block is registered in allocator set"); + tracing::info!("Checking the block is registered in allocator set"); { let alloc_set = allocator_set.clone(); let alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -62,7 +67,7 @@ async fn integration_test_address_block() { assert_eq!(Some(applied_ab.name_any()), alloc_set_inner.auto_assign); } - dbg!("Creating another AddressBlock"); + tracing::info!("Creating another AddressBlock"); let ab_another = test_address_block_lb_non_default(); let ab_patch_another = Patch::Apply(ab_another.clone()); ab_api @@ -72,7 +77,7 @@ async fn integration_test_address_block() { let applied_ab_another = ab_api.get(&ab_another.name_any()).await.unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); controller::reconciler::address_block::reconciler( Arc::new(applied_ab_another.clone()), ctx.clone(), @@ -80,7 +85,7 @@ async fn integration_test_address_block() { .await .unwrap(); - dbg!("Chencking the block is registered in allocator set"); + tracing::info!("Chencking the block is registered in allocator set"); { let alloc_set = allocator_set.clone(); let alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -88,7 +93,7 @@ async fn integration_test_address_block() { assert_eq!(Some(applied_ab.name_any()), alloc_set_inner.auto_assign); } - dbg!("Patching to change auto assign"); + tracing::info!("Patching to change auto assign"); let mut ab_another_auto_assign = ab_another.clone(); ab_another_auto_assign.spec.auto_assign = true; let ab_patch_another_auto_assign = Patch::Apply(ab_another_auto_assign.clone()); @@ -106,7 +111,7 @@ async fn integration_test_address_block() { .await .unwrap(); - dbg!("Failing to reconcile AddressBlock"); + tracing::info!("Failing to reconcile AddressBlock"); let _err = controller::reconciler::address_block::reconciler( Arc::new(applied_ab_another_auto_assign.clone()), ctx.clone(), @@ -114,7 +119,7 @@ async fn integration_test_address_block() { .await .unwrap_err(); - dbg!("Making disable auto assign"); + tracing::info!("Making disable auto assign"); let mut ab_disable_auto_assign = ab.clone(); ab_disable_auto_assign.spec.auto_assign = false; let ab_patch_disable_auto_assign = Patch::Apply(ab_disable_auto_assign.clone()); @@ -132,7 +137,7 @@ async fn integration_test_address_block() { .await .unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); controller::reconciler::address_block::reconciler( Arc::new(applied_ab_disable_auto_assign), ctx.clone(), @@ -140,20 +145,20 @@ async fn integration_test_address_block() { .await .unwrap(); - dbg!("Chencking the block's auto assign is disabled"); + tracing::info!("Chencking the block's auto assign is disabled"); { let alloc_set = allocator_set.clone(); let alloc_set_inner = alloc_set.inner.lock().unwrap(); assert_eq!(None, alloc_set_inner.auto_assign); } - dbg!("Changing the block to auto assignable"); + tracing::info!("Changing the block to auto assignable"); let applied_ab_another_auto_assign = ab_api .get(&ab_another_auto_assign.name_any()) .await .unwrap(); - dbg!("Reconciling AddressBlock"); + tracing::info!("Reconciling AddressBlock"); controller::reconciler::address_block::reconciler( Arc::new(applied_ab_another_auto_assign.clone()), ctx.clone(), @@ -161,7 +166,7 @@ async fn integration_test_address_block() { .await .unwrap(); - dbg!("Chencking auto assign is set"); + tracing::info!("Chencking auto assign is set"); { let alloc_set = allocator_set.clone(); let alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -172,7 +177,7 @@ async fn integration_test_address_block() { } let dummy_addr = IpAddr::from_str("10.0.0.1").unwrap(); - dbg!("Inserting dummy allocation"); + tracing::info!("Inserting dummy allocation"); { let alloc_set = allocator_set.clone(); let mut alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -180,7 +185,7 @@ async fn integration_test_address_block() { block.allocator.allocate(&dummy_addr, false).unwrap(); } - dbg!("Deleting AddressBlock"); + tracing::info!("Deleting AddressBlock"); ab_api .delete(&ab.name_any(), &DeleteParams::default()) .await @@ -188,10 +193,10 @@ async fn integration_test_address_block() { let ab_deleted = ab_api.get(&ab.name_any()).await.unwrap(); - dbg!("Checking the deletion timestamp"); + tracing::info!("Checking the deletion timestamp"); assert!(ab_deleted.metadata.deletion_timestamp.is_some()); - dbg!("Failing to clean up AddressBlock"); + tracing::info!("Failing to clean up AddressBlock"); let _err = controller::reconciler::address_block::reconciler( Arc::new(ab_deleted.clone()), ctx.clone(), @@ -199,7 +204,7 @@ async fn integration_test_address_block() { .await .unwrap_err(); - dbg!("Removing dummy allocation"); + tracing::info!("Removing dummy allocation"); { let alloc_set = allocator_set.clone(); let mut alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -207,12 +212,12 @@ async fn integration_test_address_block() { block.allocator.release(&dummy_addr).unwrap(); } - dbg!("Cleaning up AddressBlock"); + tracing::info!("Cleaning up AddressBlock"); controller::reconciler::address_block::reconciler(Arc::new(ab_deleted.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking block is deleted"); + tracing::info!("Checking block is deleted"); { let alloc_set = allocator_set.clone(); let alloc_set_inner = alloc_set.inner.lock().unwrap(); @@ -220,6 +225,6 @@ async fn integration_test_address_block() { assert!(res.is_none()); } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_address_pool_pod_test.rs b/sartd/src/kubernetes/tests/controller_address_pool_pod_test.rs index 44b1e3a..bae602c 100644 --- a/sartd/src/kubernetes/tests/controller_address_pool_pod_test.rs +++ b/sartd/src/kubernetes/tests/controller_address_pool_pod_test.rs @@ -1,17 +1,19 @@ use std::sync::Arc; + use common::{cleanup_kind, setup_kind}; use kube::{ api::{ListParams, Patch, PatchParams}, Api, Client, ResourceExt, }; +use sartd_ipam::manager::BlockAllocator; use sartd_kubernetes::{ - context::State, + context::{State, Ctx}, controller, crd::{ address_block::AddressBlock, - address_pool::{AddressPool, AddressPoolStatus, ADDRESS_POOL_ANNOTATION}, + address_pool::{AddressPool, ADDRESS_POOL_ANNOTATION}, block_request::BlockRequest, }, fixture::{ @@ -25,19 +27,20 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn test_address_pool_pod_handling_request() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); - let ctx = State::default().to_context(client.clone(), 30); + let block_allocator = Arc::new(BlockAllocator::default()); + let ctx = State::default().to_context_with(client.clone(), 30, block_allocator); let ap = test_address_pool_pod(); - dbg!("Creating an AddressPool"); - let ap_api = Api::::all(ctx.client.clone()); + tracing::info!("Creating an AddressPool"); + let ap_api = Api::::all(ctx.client().clone()); let ssapply = PatchParams::apply("ctrltest"); let ap_patch = Patch::Apply(ap.clone()); ap_api @@ -47,191 +50,37 @@ async fn test_address_pool_pod_handling_request() { let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - dbg!("Reconciling AddressPool"); + tracing::info!("Reconciling AddressPool"); controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) .await .unwrap(); - dbg!("Getting the applied pool"); + tracing::info!("Getting the applied pool"); let mut applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); assert!(applied_ap.status.is_none()); - dbg!("Creating BlockRequest"); + tracing::info!("Creating BlockRequest"); let br = test_block_request(); - let br_api = Api::::all(ctx.client.clone()); + let br_api = Api::::all(ctx.client().clone()); let br_patch = Patch::Apply(br.clone()); br_api .patch(&br.name_any(), &ssapply, &br_patch) .await .unwrap(); - applied_ap.status = Some(AddressPoolStatus { - requested: Some(vec![br.name_any()]), - allocated: None, - released: None, - }); - - dbg!("Updating AddressPool status"); - applied_ap.metadata.managed_fields = None; - let ap_patch = Patch::Apply(applied_ap); - ap_api - .patch_status(&ap.name_any(), &ssapply, &ap_patch) - .await - .unwrap(); - - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - - dbg!("Reconciling AddressPool"); - controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) - .await - .unwrap(); - - dbg!("Checking status is updated"); - let mut applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - let allocated = applied_ap - .status - .as_ref() - .unwrap() - .allocated - .as_ref() - .unwrap(); - assert_eq!(1, allocated.len()); - let block_sart = allocated - .get(&format!("{}-{}-10.0.0.0", br.spec.pool, br.spec.node)) - .unwrap(); - assert_eq!(0, *block_sart); - - let requested = applied_ap - .status - .as_mut() - .unwrap() - .requested - .as_mut() - .unwrap(); - assert!(requested.is_empty()); - - dbg!("Checking the block is created"); - let ab_api = Api::::all(ctx.client.clone()); - let ab_opt = ab_api - .get_opt(&format!("{}-{}-10.0.0.0", br.spec.pool, br.spec.node)) - .await - .unwrap(); - assert!(ab_opt.is_some()); - - dbg!("Requesting new block"); - requested.push(br.name_any()); - applied_ap.metadata.managed_fields = None; - let ap_patch = Patch::Apply(applied_ap.clone()); - let mut ssapply_force = ssapply.clone(); - ssapply_force.force = true; - ap_api - .patch_status(&ap.name_any(), &ssapply_force, &ap_patch) - .await - .unwrap(); - - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - - dbg!("Reconciling AddressPool"); - controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) - .await - .unwrap(); - - dbg!("Checking status is updated"); - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - let allocated = applied_ap - .status - .as_ref() - .unwrap() - .allocated - .as_ref() - .unwrap(); - assert_eq!(2, allocated.len()); - let block_sart = allocated - .get(&format!("{}-{}-10.0.0.32", br.spec.pool, br.spec.node)) - .unwrap(); - assert_eq!(32, *block_sart); - - let requested = applied_ap - .status - .as_ref() - .unwrap() - .requested - .as_ref() - .unwrap(); - assert!(requested.is_empty()); - - dbg!("Checking the block is created"); - let ab_opt = ab_api - .get_opt(&format!("{}-{}-10.0.0.32", br.spec.pool, br.spec.node)) - .await - .unwrap(); - assert!(ab_opt.is_some()); - - dbg!("Releasing the block"); - let mut applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - applied_ap.metadata.managed_fields = None; - applied_ap.status.as_mut().unwrap().released = - Some(vec![format!("{}-{}-10.0.0.0", br.spec.pool, br.spec.node)]); - let ap_patch = Patch::Apply(applied_ap.clone()); - let mut ssapply_force = ssapply.clone(); - ssapply_force.force = true; - ap_api - .patch_status(&ap.name_any(), &ssapply_force, &ap_patch) - .await - .unwrap(); - - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - - dbg!("Reconciling AddressPool"); - controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) - .await - .unwrap(); - - dbg!("Remaining the status.released field"); - let released = applied_ap - .status - .as_ref() - .unwrap() - .released - .as_ref() - .unwrap(); - assert_eq!(1, released.len()); - - dbg!("Checking the block is deleted"); - let ab_opt = ab_api - .get_opt(&format!("{}-{}-10.0.0.0", br.spec.pool, br.spec.node)) - .await - .unwrap(); - assert!(ab_opt.is_none()); + let ab_api = Api::::all(ctx.client().clone()); - dbg!("Reconciling AddressPool"); - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) - .await - .unwrap(); - - let mut applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - - dbg!("Removing the status.released field"); - let released = applied_ap - .status - .as_ref() - .unwrap() - .released - .as_ref() - .unwrap(); - assert!(released.is_empty()); - dbg!("Changing auto assign to false"); + tracing::info!("Changing auto assign to false"); applied_ap.spec.auto_assign = Some(false); - dbg!("Reconciling AddressPool"); + tracing::info!("Reconciling AddressPool"); controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking blocks are changed to auto_assign=false"); + tracing::info!("Checking blocks are changed to auto_assign=false"); let list_params = ListParams::default().labels(&format!( "{}={}", ADDRESS_POOL_ANNOTATION, @@ -242,6 +91,6 @@ async fn test_address_pool_pod_handling_request() { assert!(!ab.spec.auto_assign); } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_address_pool_service_test.rs b/sartd/src/kubernetes/tests/controller_address_pool_service_test.rs index c6b41e6..e3adbd1 100644 --- a/sartd/src/kubernetes/tests/controller_address_pool_service_test.rs +++ b/sartd/src/kubernetes/tests/controller_address_pool_service_test.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::collections::HashMap; use common::{cleanup_kind, setup_kind}; @@ -6,8 +7,9 @@ use kube::{ api::{Patch, PatchParams}, Api, Client, ResourceExt, }; +use sartd_ipam::manager::BlockAllocator; use sartd_kubernetes::{ - context::State, + context::{State, Ctx}, controller, crd::{address_block::AddressBlock, address_pool::AddressPool}, fixture::{reconciler::test_address_pool_lb, test_trace}, @@ -18,19 +20,20 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_address_pool() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); - let ctx = State::default().to_context(client.clone(), 30); + let block_allocator = Arc::new(BlockAllocator::default()); + let ctx = State::default().to_context_with(client.clone(), 30, block_allocator); let ap = test_address_pool_lb(); - dbg!("Creating an AddressPool resource"); - let ap_api = Api::::all(ctx.client.clone()); + tracing::info!("Creating an AddressPool resource"); + let ap_api = Api::::all(ctx.client().clone()); let ssapply = PatchParams::apply("ctrltest"); let ap_patch = Patch::Apply(ap.clone()); ap_api @@ -41,22 +44,22 @@ async fn integration_test_address_pool() { let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); // do reconcile - dbg!("Reconciling AddressPool"); + tracing::info!("Reconciling AddressPool"); controller::reconciler::address_pool::reconciler(Arc::new(applied_ap.clone()), ctx.clone()) .await .unwrap(); - dbg!("Getting a AddressBlock resource created by AddressPool"); - let ab_api = Api::::all(ctx.client.clone()); + tracing::info!("Getting a AddressBlock resource created by AddressPool"); + let ab_api = Api::::all(ctx.client().clone()); let ab = ab_api.get(&applied_ap.name_any()).await.unwrap(); - dbg!("Checking created block"); + tracing::info!("Checking created block"); assert_eq!(applied_ap.spec.cidr, ab.spec.cidr); assert_eq!( applied_ap.spec.auto_assign.unwrap_or_default(), ab.spec.auto_assign ); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_bgp_advertisement_test.rs b/sartd/src/kubernetes/tests/controller_bgp_advertisement_test.rs index 32cdb45..1414d29 100644 --- a/sartd/src/kubernetes/tests/controller_bgp_advertisement_test.rs +++ b/sartd/src/kubernetes/tests/controller_bgp_advertisement_test.rs @@ -19,12 +19,12 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_controller_bgp_advertisement() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -41,7 +41,7 @@ async fn integration_test_controller_bgp_advertisement() { let ns = get_namespace(&ba).unwrap(); - dbg!("Creating a BGPAdvertisement resource"); + tracing::info!("Creating a BGPAdvertisement resource"); let ba_api = Api::::namespaced(ctx.client.clone(), &ns); let ssapply = PatchParams::apply("ctrltest"); let ba_patch = Patch::Apply(ba.clone()); @@ -52,7 +52,7 @@ async fn integration_test_controller_bgp_advertisement() { let applied_ba = ba_api.get(&ba.name_any()).await.unwrap(); - dbg!("Reconciling BGPAdvertisement"); + tracing::info!("Reconciling BGPAdvertisement"); controller::reconciler::bgp_advertisement::reconciler( Arc::new(applied_ba.clone()), ctx.clone(), @@ -60,7 +60,7 @@ async fn integration_test_controller_bgp_advertisement() { .await .unwrap(); - dbg!("updating BGPAdvertisement status"); + tracing::info!("updating BGPAdvertisement status"); let mut ba_advertised = ba.clone(); ba_advertised.status = Some(BGPAdvertisementStatus { peers: Some(BTreeMap::from([ @@ -79,7 +79,7 @@ async fn integration_test_controller_bgp_advertisement() { let applied_ba_advertised = ba_api.get(&ba_advertised.name_any()).await.unwrap(); - dbg!("Deleting BGPAdvertisement"); + tracing::info!("Deleting BGPAdvertisement"); ba_api .delete(&applied_ba_advertised.name_any(), &DeleteParams::default()) .await @@ -87,10 +87,10 @@ async fn integration_test_controller_bgp_advertisement() { let ba_deleted = ba_api.get(&applied_ba_advertised.name_any()).await.unwrap(); - dbg!("Checking delettion timestamp"); + tracing::info!("Checking delettion timestamp"); assert!(ba_deleted.metadata.deletion_timestamp.is_some()); - dbg!("Failing to cleanup BGPAdvertisement"); + tracing::info!("Failing to cleanup BGPAdvertisement"); let _err = controller::reconciler::bgp_advertisement::reconciler( Arc::new(ba_deleted.clone()), ctx.clone(), @@ -98,7 +98,7 @@ async fn integration_test_controller_bgp_advertisement() { .await .unwrap_err(); - dbg!("Checking the status is moved to Withdraw"); + tracing::info!("Checking the status is moved to Withdraw"); let ba_deleted_withdraw = ba_api.get(&ba_deleted.name_any()).await.unwrap(); for (_name, status) in ba_deleted_withdraw .status @@ -112,7 +112,7 @@ async fn integration_test_controller_bgp_advertisement() { assert_eq!(AdvertiseStatus::Withdraw, *status); } - dbg!("Updating status to withdrawn"); + tracing::info!("Updating status to withdrawn"); let mut ba_deleted_withdrawn = ba_deleted_withdraw.clone(); ba_deleted_withdrawn.status = None; let ba_deleted_withdrawn_patch = Patch::Merge(ba_deleted_withdrawn.clone()); @@ -125,7 +125,7 @@ async fn integration_test_controller_bgp_advertisement() { .await .unwrap(); - dbg!("Cleaning up BGPAdvertisement"); + tracing::info!("Cleaning up BGPAdvertisement"); controller::reconciler::bgp_advertisement::reconciler( Arc::new(ba_deleted_withdrawn), ctx.clone(), @@ -133,6 +133,6 @@ async fn integration_test_controller_bgp_advertisement() { .await .unwrap(); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_block_request_test.rs b/sartd/src/kubernetes/tests/controller_block_request_test.rs index 2dce7f9..a328b47 100644 --- a/sartd/src/kubernetes/tests/controller_block_request_test.rs +++ b/sartd/src/kubernetes/tests/controller_block_request_test.rs @@ -4,8 +4,9 @@ use kube::{ api::{Patch, PatchParams}, Api, Client, ResourceExt, }; +use sartd_ipam::manager::BlockAllocator; use sartd_kubernetes::{ - context::State, + context::{Ctx, State}, controller, crd::{address_pool::AddressPool, block_request::BlockRequest}, fixture::{ @@ -21,29 +22,37 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_block_request() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); - let ctx = State::default().to_context(client.clone(), 30); + let block_allocator = Arc::new(BlockAllocator::default()); - dbg!("Creating AddressPool"); + let ctx = State::default().to_context_with(client.clone(), 30, block_allocator); + + tracing::info!("Creating AddressPool"); let ap = test_address_pool_pod(); - let ap_api = Api::::all(ctx.client.clone()); + let ap_api = Api::::all(ctx.client().clone()); let ssapply = PatchParams::apply("ctrltest"); let ap_patch = Patch::Apply(ap.clone()); ap_api .patch(&ap.name_any(), &ssapply, &ap_patch) .await .unwrap(); + let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); + controller::reconciler::address_pool::reconciler(Arc::new(applied_ap), ctx.clone()) + .await + .unwrap(); - dbg!("Creating BlockRequest"); + tracing::info!("Reconciling AddressPool"); + + tracing::info!("Creating BlockRequest"); let br = test_block_request(); - let br_api = Api::::all(ctx.client.clone()); + let br_api = Api::::all(ctx.client().clone()); let br_patch = Patch::Apply(br.clone()); br_api .patch(&br.name_any(), &ssapply, &br_patch) @@ -52,17 +61,11 @@ async fn integration_test_block_request() { let applied_br = br_api.get(&br.name_any()).await.unwrap(); - dbg!("Reconciling BlockRequest"); + tracing::info!("Reconciling BlockRequest"); controller::reconciler::block_request::reconciler(Arc::new(applied_br.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking AddressPool's status"); - let applied_ap = ap_api.get(&ap.name_any()).await.unwrap(); - let requested = applied_ap.status.unwrap().requested.unwrap(); - assert_eq!(requested.len(), 1); - assert_eq!(requested[0], br.name_any()); - - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_cluster_bgp_test.rs b/sartd/src/kubernetes/tests/controller_cluster_bgp_test.rs index 24d31db..eed7cca 100644 --- a/sartd/src/kubernetes/tests/controller_cluster_bgp_test.rs +++ b/sartd/src/kubernetes/tests/controller_cluster_bgp_test.rs @@ -23,12 +23,12 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_cluster_bgp_asn() { - dbg!("Setting up a kind cluster"); + tracing::info!("Setting up a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -37,7 +37,7 @@ async fn integration_test_cluster_bgp_asn() { let ssapply = PatchParams::apply("ctrltest"); let cb_patch = Patch::Apply(cb.clone()); - dbg!("Creating the ClusterBGP resource"); + tracing::info!("Creating the ClusterBGP resource"); cb_api .patch(&cb.name_any(), &ssapply, &cb_patch) .await @@ -56,12 +56,12 @@ async fn integration_test_cluster_bgp_asn() { let applied_cb = cb_api.get(&cb.name_any()).await.unwrap(); // do reconcile - dbg!("Reconciling the resource when creating"); + tracing::info!("Reconciling the resource when creating"); controller::reconciler::cluster_bgp::reconciler(Arc::new(applied_cb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Getting NodeBGP resources created by reconciling ClusterBGP"); + tracing::info!("Getting NodeBGP resources created by reconciling ClusterBGP"); let node_api = Api::::all(ctx.client.clone()); let nb_api = Api::::all(ctx.client.clone()); let node_list = node_api.list(&ListParams::default()).await.unwrap(); @@ -72,15 +72,15 @@ async fn integration_test_cluster_bgp_asn() { } } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); // Re create kind cluster for the other scenario - dbg!("Setting up a kind cluster"); + tracing::info!("Setting up a kind cluster"); setup_kind(); - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -94,7 +94,7 @@ async fn integration_test_cluster_bgp_asn() { let ssapply = PatchParams::apply("ctrltest"); let cb_patch = Patch::Apply(cb.clone()); - dbg!("Creating the ClusterBGP resource"); + tracing::info!("Creating the ClusterBGP resource"); cb_api .patch(&cb.name_any(), &ssapply, &cb_patch) .await @@ -113,12 +113,12 @@ async fn integration_test_cluster_bgp_asn() { let applied_cb = cb_api.get(&cb.name_any()).await.unwrap(); // do reconcile - dbg!("Reconciling the resouce when creating"); + tracing::info!("Reconciling the resouce when creating"); controller::reconciler::cluster_bgp::reconciler(Arc::new(applied_cb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Getting NodeBGP resources created by reconciling ClusterBGP"); + tracing::info!("Getting NodeBGP resources created by reconciling ClusterBGP"); let node_api = Api::::all(ctx.client.clone()); let nb_api = Api::::all(ctx.client.clone()); let node_list = node_api.list(&ListParams::default()).await.unwrap(); @@ -130,6 +130,6 @@ async fn integration_test_cluster_bgp_asn() { } } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_endpointslice_watcher_test.rs b/sartd/src/kubernetes/tests/controller_endpointslice_watcher_test.rs index d5b2ae3..1590707 100644 --- a/sartd/src/kubernetes/tests/controller_endpointslice_watcher_test.rs +++ b/sartd/src/kubernetes/tests/controller_endpointslice_watcher_test.rs @@ -30,12 +30,12 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_endpointslice_watcher() { - dbg!("Setting up a kind cluster"); + tracing::info!("Setting up a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -44,7 +44,7 @@ async fn integration_test_endpointslice_watcher() { let ssapply = PatchParams::apply("ctrltest"); let ns = get_namespace(&eps).unwrap(); - dbg!("Preparing needed resources"); + tracing::info!("Preparing needed resources"); prepare(&ctx, &ssapply, &ns).await; let eps_api = Api::::namespaced(ctx.client.clone(), &ns); @@ -57,7 +57,7 @@ async fn integration_test_endpointslice_watcher() { let applied_eps = eps_api.get(&eps.name_any()).await.unwrap(); - dbg!("Reconciling EndpointSlice"); + tracing::info!("Reconciling EndpointSlice"); controller::reconciler::endpointslice_watcher::reconciler( Arc::new(applied_eps.clone()), ctx.clone(), @@ -65,14 +65,14 @@ async fn integration_test_endpointslice_watcher() { .await .unwrap(); - dbg!("Getting BGPAdvertisement"); + tracing::info!("Getting BGPAdvertisement"); let ba_api = Api::::namespaced(ctx.client.clone(), &ns); let bas = ba_api.list(&ListParams::default()).await.unwrap(); assert_eq!(1, bas.items.len()); let ba = bas.items[0].clone(); - dbg!("Reconciling EndpointSlice again because of requeue"); + tracing::info!("Reconciling EndpointSlice again because of requeue"); controller::reconciler::endpointslice_watcher::reconciler( Arc::new(applied_eps.clone()), ctx.clone(), @@ -80,12 +80,12 @@ async fn integration_test_endpointslice_watcher() { .await .unwrap(); - dbg!("Checking the BGPAdvertisement's status"); + tracing::info!("Checking the BGPAdvertisement's status"); let ba = ba_api.get(&ba.name_any()).await.unwrap(); // exptected externalTrafficPolicy is Cluster, so the advertisement must have all peers for targets. assert_eq!(4, ba.status.as_ref().unwrap().peers.as_ref().unwrap().len()); - dbg!("Changing externalTrafficPolicy"); + tracing::info!("Changing externalTrafficPolicy"); // for notifying externalTrafficPolicy's change to endpointsliece_watcher, // update an annoation when owning LoadBalancer's externalTrafficPolicy is changed. let mut svc = test_svc(); @@ -111,7 +111,7 @@ async fn integration_test_endpointslice_watcher() { let applied_eps = eps_api.get(&eps_with_local.name_any()).await.unwrap(); - dbg!("Reconciling EndpointSlice"); + tracing::info!("Reconciling EndpointSlice"); controller::reconciler::endpointslice_watcher::reconciler( Arc::new(applied_eps.clone()), ctx.clone(), @@ -119,7 +119,7 @@ async fn integration_test_endpointslice_watcher() { .await .unwrap(); - dbg!("Checking the BGPAdvertisement's status"); + tracing::info!("Checking the BGPAdvertisement's status"); let ba = ba_api.get(&ba.name_any()).await.unwrap(); // expected externalTrafficPolicy is Cluster, so the advertisement must have all peers for targets. assert_eq!(4, ba.status.as_ref().unwrap().peers.as_ref().unwrap().len()); @@ -144,7 +144,7 @@ async fn integration_test_endpointslice_watcher() { .unwrap(); assert_eq!(AdvertiseStatus::Withdraw, *a); - dbg!("Updating endpoints"); + tracing::info!("Updating endpoints"); let mut eps_update = eps_with_local.clone(); let new_ep = vec![ Endpoint { @@ -188,7 +188,7 @@ async fn integration_test_endpointslice_watcher() { let applied_eps = eps_api.get(&eps_update.name_any()).await.unwrap(); - dbg!("Reconciling EndpointSlice"); + tracing::info!("Reconciling EndpointSlice"); controller::reconciler::endpointslice_watcher::reconciler( Arc::new(applied_eps.clone()), ctx.clone(), @@ -196,7 +196,7 @@ async fn integration_test_endpointslice_watcher() { .await .unwrap(); - dbg!("Checking the BGPAdvertisement's status"); + tracing::info!("Checking the BGPAdvertisement's status"); let ba = ba_api.get(&ba.name_any()).await.unwrap(); // expected externalTrafficPolicy is Cluster, so the advertisement must have all peers for targets. assert_eq!(4, ba.status.as_ref().unwrap().peers.as_ref().unwrap().len()); @@ -231,13 +231,13 @@ async fn integration_test_endpointslice_watcher() { .unwrap(); assert_eq!(AdvertiseStatus::NotAdvertised, *a); - dbg!("Deleting EndpointSlice"); + tracing::info!("Deleting EndpointSlice"); eps_api .delete(&eps_update.name_any(), &DeleteParams::default()) .await .unwrap(); - dbg!("Cleaning up EndpointSlice"); + tracing::info!("Cleaning up EndpointSlice"); let deleted_eps = eps_api.get(&eps_update.name_any()).await.unwrap(); controller::reconciler::endpointslice_watcher::reconciler( Arc::new(deleted_eps.clone()), @@ -246,7 +246,7 @@ async fn integration_test_endpointslice_watcher() { .await .unwrap(); - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/controller_service_watcher_test.rs b/sartd/src/kubernetes/tests/controller_service_watcher_test.rs index 01597a6..5c73222 100644 --- a/sartd/src/kubernetes/tests/controller_service_watcher_test.rs +++ b/sartd/src/kubernetes/tests/controller_service_watcher_test.rs @@ -28,12 +28,12 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_service_watcher() { - dbg!("Setting up a kind cluster"); + tracing::info!("Setting up a kind cluster"); setup_kind(); test_trace().await; - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let allocator_set = Arc::new(AllocatorSet::new()); let ctx = State::default().to_context_with::>( @@ -50,14 +50,14 @@ async fn integration_test_service_watcher() { ) .unwrap(); - dbg!("Chencking the block is registered in allocator set"); + tracing::info!("Chencking the block is registered in allocator set"); { let alloc_set = allocator_set.clone(); let mut alloc_set_inner = alloc_set.inner.lock().unwrap(); alloc_set_inner.insert(block, true).unwrap(); } - dbg!("Creating Service resource"); + tracing::info!("Creating Service resource"); let svc = test_svc(); let ns = get_namespace(&svc).unwrap(); @@ -71,18 +71,18 @@ async fn integration_test_service_watcher() { let applied_svc = svc_api.get(&svc.name_any()).await.unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); controller::reconciler::service_watcher::reconciler(Arc::new(applied_svc.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api.get(&svc.name_any()).await.unwrap(); let allocated_addr = get_allocated_lb_addrs(&allocated_svc).unwrap(); assert_eq!(1, allocated_addr.len()); assert_eq!(IpAddr::from_str("10.0.0.0").unwrap(), allocated_addr[0]); - dbg!("Creating Service"); + tracing::info!("Creating Service"); let mut svc_require_addr = test_svc_with_name("test-svc-2"); svc_require_addr.annotations_mut().insert( LOADBALANCER_ADDRESS_ANNOTATION.to_string(), @@ -101,7 +101,7 @@ async fn integration_test_service_watcher() { let applied_svc_require_addr = svc_api.get(&svc_require_addr.name_any()).await.unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); controller::reconciler::service_watcher::reconciler( Arc::new(applied_svc_require_addr.clone()), ctx.clone(), @@ -109,13 +109,13 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api.get(&svc_require_addr.name_any()).await.unwrap(); let allocated_addr = get_allocated_lb_addrs(&allocated_svc).unwrap(); assert_eq!(1, allocated_addr.len()); assert_eq!(IpAddr::from_str("10.0.0.100").unwrap(), allocated_addr[0]); - dbg!("Chencking the another block is registered in allocator set"); + tracing::info!("Chencking the another block is registered in allocator set"); let another_pool_name = "test-pool-another"; let another_block = Block::new( another_pool_name.to_string(), @@ -129,7 +129,7 @@ async fn integration_test_service_watcher() { alloc_set_inner.insert(another_block, false).unwrap(); } - dbg!("Creating Service"); + tracing::info!("Creating Service"); let mut svc_require_addr_another_pool = test_svc_with_name("test-svc-3"); svc_require_addr_another_pool.annotations_mut().insert( ADDRESS_POOL_ANNOTATION.to_string(), @@ -155,7 +155,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); controller::reconciler::service_watcher::reconciler( Arc::new(applied_svc_require_addr_another_pool.clone()), ctx.clone(), @@ -163,7 +163,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api .get(&svc_require_addr_another_pool.name_any()) .await @@ -172,7 +172,7 @@ async fn integration_test_service_watcher() { assert_eq!(1, allocated_addr.len()); assert_eq!(IpAddr::from_str("10.1.0.101").unwrap(), allocated_addr[0]); - dbg!("Creating Service"); + tracing::info!("Creating Service"); let mut svc_multi_pool = test_svc_with_name("test-svc-4"); svc_multi_pool.annotations_mut().insert( ADDRESS_POOL_ANNOTATION.to_string(), @@ -191,7 +191,7 @@ async fn integration_test_service_watcher() { let applied_svc_multi_pool = svc_api.get(&svc_multi_pool.name_any()).await.unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); controller::reconciler::service_watcher::reconciler( Arc::new(applied_svc_multi_pool.clone()), ctx.clone(), @@ -199,7 +199,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api.get(&svc_multi_pool.name_any()).await.unwrap(); let allocated_addr: HashMap = get_allocated_lb_addrs(&allocated_svc) .unwrap() @@ -213,7 +213,7 @@ async fn integration_test_service_watcher() { ]); assert_eq!(expected, allocated_addr); - dbg!("Updating Service with changing the address"); + tracing::info!("Updating Service with changing the address"); let mut svc_update = svc.clone(); svc_update.annotations_mut().insert( @@ -227,7 +227,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); let applied_svc_update = svc_api.get(&svc_update.name_any()).await.unwrap(); controller::reconciler::service_watcher::reconciler( Arc::new(applied_svc_update.clone()), @@ -236,13 +236,13 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api.get(&svc_update.name_any()).await.unwrap(); let allocated_addr = get_allocated_lb_addrs(&allocated_svc).unwrap(); assert_eq!(1, allocated_addr.len()); assert_eq!(IpAddr::from_str("10.0.0.222").unwrap(), allocated_addr[0]); - dbg!("Updating Service with changing the pool"); + tracing::info!("Updating Service with changing the pool"); let mut svc_require_addr_another_pool_update = svc_require_addr_another_pool.clone(); svc_require_addr_another_pool_update.metadata.annotations = None; let svc_require_addr_another_pool_update_patch = @@ -256,7 +256,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); let applied_svc_require_addr_another_pool_update = svc_api .get(&svc_require_addr_another_pool_update.name_any()) .await @@ -268,7 +268,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking the allocated address"); + tracing::info!("Checking the allocated address"); let allocated_svc = svc_api .get(&svc_require_addr_another_pool_update.name_any()) .await @@ -278,19 +278,19 @@ async fn integration_test_service_watcher() { let test_svc3_addr = allocated_addrs[0]; assert_eq!(IpAddr::from_str("10.0.0.0").unwrap(), test_svc3_addr); - dbg!("Deleting Service"); + tracing::info!("Deleting Service"); svc_api .delete(&svc_update.name_any(), &DeleteParams::default()) .await .unwrap(); - dbg!("Cleaning up Service"); + tracing::info!("Cleaning up Service"); let deleted_svc = svc_api.get(&svc_update.name_any()).await.unwrap(); controller::reconciler::service_watcher::reconciler(Arc::new(deleted_svc.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the allocation is deleted"); + tracing::info!("Checking the allocation is deleted"); let res = svc_api.get_opt(&deleted_svc.name_any()).await.unwrap(); assert!(res.is_none()); { @@ -303,18 +303,18 @@ async fn integration_test_service_watcher() { assert!(!res); } - dbg!("Deleting Service"); + tracing::info!("Deleting Service"); svc_api .delete(&svc_multi_pool.name_any(), &DeleteParams::default()) .await .unwrap(); - dbg!("Cleaning up Service"); + tracing::info!("Cleaning up Service"); let deleted_svc = svc_api.get(&svc_multi_pool.name_any()).await.unwrap(); controller::reconciler::service_watcher::reconciler(Arc::new(deleted_svc.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking the allocation is deleted"); + tracing::info!("Checking the allocation is deleted"); let res = svc_api.get_opt(&deleted_svc.name_any()).await.unwrap(); assert!(res.is_none()); { @@ -332,7 +332,7 @@ async fn integration_test_service_watcher() { assert!(!res); } - dbg!("Changing Service from LoadBalancer to ClusterIP"); + tracing::info!("Changing Service from LoadBalancer to ClusterIP"); let mut svc_clusterip = svc_require_addr_another_pool_update.clone(); svc_clusterip.spec.as_mut().unwrap().type_ = Some("ClusterIP".to_string()); // After applying the update to the other type of service, @@ -347,7 +347,7 @@ async fn integration_test_service_watcher() { .await .unwrap(); - dbg!("Checking existence"); + tracing::info!("Checking existence"); let applied_svc_clusterip = svc_api.get(&svc_clusterip.name_any()).await.unwrap(); assert_eq!( "ClusterIP", @@ -360,7 +360,7 @@ async fn integration_test_service_watcher() { .unwrap() ); - dbg!("Reconciling Service"); + tracing::info!("Reconciling Service"); controller::reconciler::service_watcher::reconciler( Arc::new(applied_svc_clusterip.clone()), ctx.clone(), @@ -369,7 +369,7 @@ async fn integration_test_service_watcher() { .unwrap(); std::thread::sleep(std::time::Duration::from_secs(2)); - dbg!("Checking the allocation is deleted"); + tracing::info!("Checking the allocation is deleted"); { let ai = allocator_set.clone(); let alloc_set = ai.inner.lock().unwrap(); @@ -378,6 +378,6 @@ async fn integration_test_service_watcher() { assert!(!res); } - dbg!("Cleaning up a kind cluster"); + tracing::info!("Cleaning up a kind cluster"); cleanup_kind(); } diff --git a/sartd/src/kubernetes/tests/node_watcher_test.rs b/sartd/src/kubernetes/tests/node_watcher_test.rs index d6871f9..0db1823 100644 --- a/sartd/src/kubernetes/tests/node_watcher_test.rs +++ b/sartd/src/kubernetes/tests/node_watcher_test.rs @@ -26,14 +26,14 @@ mod common; #[tokio::test] #[ignore = "use kind cluster"] async fn integration_test_controller_node_watcher() { - dbg!("Creating a kind cluster"); + tracing::info!("Creating a kind cluster"); setup_kind(); test_trace().await; kubectl_label("nodes", KIND_NODE_CP, "bgp=c"); - dbg!("Getting kube client"); + tracing::info!("Getting kube client"); let client = Client::try_default().await.unwrap(); let ctx = State::default().to_context(client.clone(), 30); @@ -43,7 +43,7 @@ async fn integration_test_controller_node_watcher() { let ssapply = PatchParams::apply("ctrltest"); let cb_patch = Patch::Apply(cb.clone()); - dbg!("Creating the ClusterBGP resource"); + tracing::info!("Creating the ClusterBGP resource"); cb_api .patch(&cb.name_any(), &ssapply, &cb_patch) .await @@ -60,12 +60,12 @@ async fn integration_test_controller_node_watcher() { let applied_cb = cb_api.get(&cb.name_any()).await.unwrap(); // do reconcile - dbg!("Reconciling the resource when creating"); + tracing::info!("Reconciling the resource when creating"); controller::reconciler::cluster_bgp::reconciler(Arc::new(applied_cb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking ClusterBGP.status is empty"); + tracing::info!("Checking ClusterBGP.status is empty"); let applied_cb = cb_api.get(&cb.name_any()).await.unwrap(); let nodes = applied_cb .status @@ -75,10 +75,10 @@ async fn integration_test_controller_node_watcher() { .unwrap_or(Vec::new()); assert!(nodes.is_empty()); - dbg!("Patching bgp=a label to Node"); + tracing::info!("Patching bgp=a label to Node"); kubectl_label("nodes", KIND_NODE_CP, "bgp=a"); - dbg!("Reconciling Node"); + tracing::info!("Reconciling Node"); let node_api = Api::::all(ctx.client.clone()); let mut node = node_api.get(KIND_NODE_CP).await.unwrap(); node.finalizers_mut().push(NODE_FINALIZER.to_string()); @@ -87,7 +87,7 @@ async fn integration_test_controller_node_watcher() { .await .unwrap(); - dbg!("Checking ClusterBGP's status is updated"); + tracing::info!("Checking ClusterBGP's status is updated"); let applied_cb = cb_api.get(&applied_cb.name_any()).await.unwrap(); let desired_nodes = applied_cb .status @@ -98,13 +98,13 @@ async fn integration_test_controller_node_watcher() { assert_eq!(desired_nodes, vec![KIND_NODE_CP.to_string()]); // do reconcile - dbg!("Reconciling ClusterBGP"); + tracing::info!("Reconciling ClusterBGP"); let applied_cb = cb_api.get(&cb.name_any()).await.unwrap(); controller::reconciler::cluster_bgp::reconciler(Arc::new(applied_cb.clone()), ctx.clone()) .await .unwrap(); - dbg!("Checking ClusterBGP's status is updated after the reconciliation"); + tracing::info!("Checking ClusterBGP's status is updated after the reconciliation"); let applied_cb = cb_api.get(&applied_cb.name_any()).await.unwrap(); let desired_nodes = applied_cb .status @@ -122,27 +122,27 @@ async fn integration_test_controller_node_watcher() { .unwrap_or(Vec::new()); assert_eq!(actual_nodes, vec![KIND_NODE_CP.to_string()]); - dbg!("Checking NodeBGP is created"); + tracing::info!("Checking NodeBGP is created"); let node_bgp_api = Api::::all(ctx.client.clone()); let nb_opt = node_bgp_api.get_opt(KIND_NODE_CP).await.unwrap(); assert!(nb_opt.is_some()); - dbg!("Patching bgp=b label to Node"); + tracing::info!("Patching bgp=b label to Node"); kubectl_label("nodes", KIND_NODE_CP, "bgp=b"); - dbg!("Cheking node's label"); + tracing::info!("Cheking node's label"); let mut node = node_api.get(KIND_NODE_CP).await.unwrap(); node.finalizers_mut().push(NODE_FINALIZER.to_string()); let b = node.labels().get("bgp"); assert_eq!(b, Some(&"b".to_string())); - dbg!("Reconciling Node"); + tracing::info!("Reconciling Node"); controller::reconciler::node_watcher::reconciler(Arc::new(node), ctx.clone()) .await .unwrap(); - dbg!("Checking ClusterBGP's status is updated"); + tracing::info!("Checking ClusterBGP's status is updated"); let applied_cb = cb_api.get(&applied_cb.name_any()).await.unwrap(); let updated_nodes = applied_cb .status @@ -151,11 +151,11 @@ async fn integration_test_controller_node_watcher() { .unwrap_or(Vec::new()); assert!(updated_nodes.is_empty()); - dbg!("Checking NodeBGP's label is synchronized"); + tracing::info!("Checking NodeBGP's label is synchronized"); let nb = node_bgp_api.get(KIND_NODE_CP).await.unwrap(); let value = nb.labels().get("bgp").unwrap(); assert_eq!(value, "b"); - dbg!("Clean up kind cluster"); + tracing::info!("Clean up kind cluster"); cleanup_kind(); }