Skip to content
This repository has been archived by the owner on Sep 4, 2020. It is now read-only.

Commit

Permalink
init health-scope controller
Browse files Browse the repository at this point in the history
  • Loading branch information
wonderflow committed Oct 9, 2019
1 parent 3336313 commit 80cd339
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 2 deletions.
21 changes: 21 additions & 0 deletions health_scope/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "health_scope"
version = "0.1.0"
authors = ["天元 <[email protected]>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
kube = { version = "0.15.0", features = ["openapi"] }
k8s-openapi = { version = "0.5.0", features = ["v1_15"] }
serde = "1.0"
serde_derive = "1.0"
serde_json = "1.0"
failure = "0.1.5"
spectral = "0.6"
reqwest = "0.9"
log = "0.4"
env_logger = "0.6.1"
hyper = "0.12"
clap = "~2.33"
12 changes: 12 additions & 0 deletions health_scope/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Health Scope Controller

Health Scope Controller used for periodically check health scope crd and check the health of all the components related.

## What will health scope controller do?

1. periodically check all component health and update the CR status.
2. serve as a http server, to output aggregated health information.

## How to install?

1.
17 changes: 17 additions & 0 deletions health_scope/crd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: healthscope.core.hydra.io
spec:
group: core.hydra.io
versions:
- name: v1alpha1
served: true
storage: true
scope: Namespaced
names:
plural: healthscopes
singular: healthscope
kind: HealthScope
shortNames:
- health
175 changes: 175 additions & 0 deletions health_scope/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
use clap::{App, Arg};
use env_logger;
use failure::{format_err, Error};
use hyper::rt::Future;
use hyper::service::service_fn_ok;
use hyper::{Body, Method, Response, Server, StatusCode};
use kube::api::{Informer, ListParams, Object, ObjectList, RawApi, Reflector, WatchEvent};
use kube::{client::APIClient, config::incluster_config, config::load_kube_config};
use log::{debug, error, info};

use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1beta1::{
CustomResourceDefinitionSpec as CrdSpec, CustomResourceDefinitionStatus as CrdStatus,
};
use scylla::instigator::{Instigator, COMPONENT_CRD, CONFIG_CRD, CONFIG_GROUP, CONFIG_VERSION};
use scylla::schematic::{component::Component, configuration::OperationalConfiguration, Status};

const DEFAULT_NAMESPACE: &str = "default";

fn kubeconfig() -> kube::Result<kube::config::Configuration> {
// If env var is set, use in cluster config
if std::env::var("KUBERNETES_PORT").is_ok() {
return incluster_config();
}
load_kube_config()
}

type KubeComponent = Object<Component, Status>;
type KubeOpsConfig = Object<OperationalConfiguration, Status>;

fn main() -> Result<(), Error> {
let flags = App::new("scylla")
.version(env!("CARGO_PKG_VERSION"))
.arg(
Arg::with_name("metrics-addr")
.short("m")
.long("metrics-addr")
.default_value(":8080")
.help("The address the metric endpoint binds to."),
)
.get_matches();
let metrics_addr = "0.0.0.0".to_owned() + flags.value_of("metrics-addr").unwrap();

env_logger::init();
info!("starting server");

let top_ns = std::env::var("KUBERNETES_NAMESPACE").unwrap_or_else(|_| DEFAULT_NAMESPACE.into());
let top_cfg = kubeconfig().expect("Load default kubeconfig");

// There is probably a better way to do this than to create two clones, but there is a potential
// thread safety issue here.
let cfg_watch = top_cfg.clone();
let client = APIClient::new(top_cfg);

precheck_crds(&client)?;

let component_resource = RawApi::customResource(COMPONENT_CRD)
.within(top_ns.as_str())
.group(CONFIG_GROUP)
.version(CONFIG_VERSION);

let component_cache: Reflector<KubeComponent> =
Reflector::raw(client.clone(), component_resource.clone()).timeout(10);
let reflector = component_cache.clone();
if let Err(err) = component_cache.init() {
error!("Component init error: {}", err);
}

// Watch for configuration objects to be added, and react to those.
let configuration_watch = std::thread::spawn(move || {
let ns = top_ns.clone();
let client = APIClient::new(cfg_watch);
let resource = RawApi::customResource(CONFIG_CRD)
.within(ns.as_str())
.group(CONFIG_GROUP)
.version(CONFIG_VERSION);
//init all the existing objects at initiate, this should be done by informer
let req = resource.list(&ListParams::default()).unwrap();
if let Ok(cfgs) = client.request::<ObjectList<KubeOpsConfig>>(req) {
for cfg in cfgs.items {
let event = WatchEvent::Added(cfg);
if let Err(res) = handle_event(&client, event, ns.clone()) {
// Log the error and continue. In the future, should probably
// re-queue data in some cases.
error!("Error processing event: {:?}", res)
};
}
}

// This listens for new items, and then processes them as they come in.
let informer: Informer<KubeOpsConfig> =
Informer::raw(client.clone(), resource.clone()).init()?;
loop {
informer.poll()?;
debug!("loop");

// Clear out the event queue
while let Some(event) = informer.pop() {
if let Err(res) = handle_event(&client, event, ns.clone()) {
// Log the error and continue. In the future, should probably
// re-queue data in some cases.
error!("Error processing event: {:?}", res)
};
info!("Handled event");
}
}
});

// Cache all of the components.
let component_watch = std::thread::spawn(move || loop {
if let Err(res) = reflector.poll() {
error!("Component polling error: {}", res);
};
});
info!("Watcher is running");

std::thread::spawn(move || {
let addr = metrics_addr.parse().unwrap();
info!("Health server is running on {}", addr);
hyper::rt::run(
Server::bind(&addr)
.serve(|| {
service_fn_ok(|_req| match (_req.method(), _req.uri().path()) {
(&Method::GET, "/health") => {
debug!("health check");
Response::new(Body::from("OK"))
}
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from(""))
.unwrap(),
})
})
.map_err(|e| eprintln!("health server error: {}", e)),
);
})
.join()
.unwrap();

component_watch.join().expect("component watcher crashed");
configuration_watch.join().unwrap()
}

/// This takes an event off the stream and delegates it to the instigator, calling the correct verb.
fn handle_event(
cli: &APIClient,
event: WatchEvent<KubeOpsConfig>,
namespace: String,
) -> Result<(), Error> {
let inst = Instigator::new(cli.clone(), namespace);
match event {
WatchEvent::Added(o) => inst.add(o),
WatchEvent::Modified(o) => inst.modify(o),
WatchEvent::Deleted(o) => inst.delete(o),
WatchEvent::Error(e) => Err(format_err!("APIError: {:?}", e)),
}
}

type CrdObj = Object<CrdSpec, CrdStatus>;
fn precheck_crds(client: &APIClient) -> Result<(), failure::Error> {
let crds = vec![
"operationalconfigurations",
"traits",
"componentschematics",
"scopes",
];
for crd in crds.iter() {
let req = RawApi::v1beta1CustomResourceDefinition()
.get(format!("{}.core.hydra.io", crd).as_str())?;
if let Err(e) = client.request::<CrdObj>(req) {
error!("Error prechecking CRDs: {}", e);
return Err(failure::format_err!("Missing CRD {}", crd));
}
}
Ok(())
}
1 change: 0 additions & 1 deletion src/instigator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
configuration::{ApplicationConfiguration, ComponentConfiguration, ScopeBinding},
parameter::{resolve_parameters, resolve_values, ParameterValue},
scopes::{self, Health, HydraScope, Network},
traits::{Autoscaler, Empty, HydraTrait, Ingress, ManualScaler, TraitBinding},
variable::{get_variable_values, resolve_variables},
OAMStatus, Status,
},
Expand Down
2 changes: 1 addition & 1 deletion src/schematic/configuration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub struct ScopeBinding {
pub parameter_values: Option<Vec<ParameterValue>>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ScopeRef {
pub name: String,
Expand Down

0 comments on commit 80cd339

Please sign in to comment.