Skip to content

Commit

Permalink
feat(otel): add instruments to containerd-shim-wasm
Browse files Browse the repository at this point in the history
this commit adds tracing instrument macros to functions in the
containerd-shim-wasm crate to capture spans of each function
including its parameters and results.

these spans can be in turn be collected using tracing-opentelemetry
and opentelemetry SDK at the shim binary level and output to collectors
like Jeager endpoint.

Signed-off-by: jiaxiao zhou <[email protected]>
  • Loading branch information
Mossaka committed May 6, 2024
1 parent 6a76c89 commit 3440fa2
Show file tree
Hide file tree
Showing 16 changed files with 111 additions and 3 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ ttrpc = "0.8.0"
wat = "1.206"
windows-sys = "0.52"
serial_test = "2"
tracing = "0.1"

[profile.release]
panic = "abort"
1 change: 1 addition & 0 deletions crates/containerd-shim-wasm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ wasmparser = "0.206.0"
tokio-stream = { version = "0.1" }
prost-types = "0.12" # should match version in containerd-shim
sha256 = { workspace = true }
tracing = { workspace = true }

[target.'cfg(unix)'.dependencies]
caps = "0.5"
Expand Down
2 changes: 2 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::mpsc::channel;
use std::sync::Arc;

use containerd_shim::{parse, run, Config};
use tracing::{instrument, Span};
use ttrpc::Server;

use crate::sandbox::manager::Shim;
Expand Down Expand Up @@ -36,6 +37,7 @@ macro_rules! revision {
};
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn shim_main<'a, I>(
name: &str,
version: &str,
Expand Down
14 changes: 14 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/containerd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use tokio::runtime::Runtime;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Code, Request};
use tracing::{instrument, Span};

use super::lease::LeaseGuard;
use crate::container::Engine;
Expand Down Expand Up @@ -53,6 +54,7 @@ pub(crate) struct WriteContent {
// sync wrapper implementation from https://tokio.rs/tokio/topics/bridging
impl Client {
// wrapper around connection that will establish a connection and create a client
#[instrument(skip_all, parent = Span::current(), level = "Info")]
pub fn connect(
address: impl AsRef<Path> + ToString,
namespace: impl ToString,
Expand All @@ -74,6 +76,7 @@ impl Client {
}

// wrapper around read that will read the entire content file
#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn read_content(&self, digest: impl ToString) -> Result<Vec<u8>> {
self.rt.block_on(async {
let req = ReadContentRequest {
Expand All @@ -95,6 +98,7 @@ impl Client {

// used in tests to clean up content
#[allow(dead_code)]
#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn delete_content(&self, digest: impl ToString) -> Result<()> {
self.rt.block_on(async {
let req = DeleteContentRequest {
Expand All @@ -110,6 +114,7 @@ impl Client {
}

// wrapper around lease that will create a lease and return a guard that will delete the lease when dropped
#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn lease(&self, reference: String) -> Result<LeaseGuard> {
self.rt.block_on(async {
let mut lease_labels = HashMap::new();
Expand Down Expand Up @@ -141,6 +146,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn save_content(
&self,
data: Vec<u8>,
Expand Down Expand Up @@ -266,6 +272,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn get_info(&self, content_digest: &str) -> Result<Info> {
self.rt.block_on(async {
let req = InfoRequest {
Expand All @@ -288,6 +295,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn update_info(&self, info: Info) -> Result<Info> {
self.rt.block_on(async {
let req = UpdateRequest {
Expand All @@ -313,6 +321,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn get_image(&self, image_name: impl ToString) -> Result<Image> {
self.rt.block_on(async {
let name = image_name.to_string();
Expand All @@ -334,6 +343,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn extract_image_content_sha(&self, image: &Image) -> Result<String> {
let digest = image
.target
Expand All @@ -349,6 +359,7 @@ impl Client {
Ok(digest)
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn get_container(&self, container_name: impl ToString) -> Result<Container> {
self.rt.block_on(async {
let id = container_name.to_string();
Expand All @@ -370,6 +381,7 @@ impl Client {
})
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn get_image_manifest_and_digest(&self, image_name: &str) -> Result<(ImageManifest, String)> {
let image = self.get_image(image_name)?;
let image_digest = self.extract_image_content_sha(&image)?;
Expand All @@ -380,6 +392,7 @@ impl Client {
// load module will query the containerd store to find an image that has an OS of type 'wasm'
// If found it continues to parse the manifest and return the layers that contains the WASM modules
// and possibly other configuration layers.
#[instrument(skip_all, parent = Span::current(), level = "Info")]
pub fn load_modules<T: Engine>(
&self,
containerd_id: impl ToString,
Expand Down Expand Up @@ -510,6 +523,7 @@ impl Client {
Ok((layers, platform))
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn read_wasm_layer(
&self,
original_config: &oci_spec::image::Descriptor,
Expand Down
2 changes: 2 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::path::{Path, PathBuf};
use std::time::Duration;

use chrono::{DateTime, Utc};
use tracing::{instrument, Span};

use super::error::Error;
use super::sync::WaitableCell;
Expand Down Expand Up @@ -136,6 +137,7 @@ pub trait Instance: 'static {

/// Waits for the instance to finish and retunrs its exit code
/// This is a blocking call.
#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn wait(&self) -> (u32, DateTime<Utc>) {
self.wait_timeout(None).unwrap()
}
Expand Down
5 changes: 5 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/instance_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@ use std::path::{Path, PathBuf};

use anyhow::{bail, Context, Result};
use serde::{Deserialize, Serialize};
use tracing::{instrument, Span};

use super::Error;

/// Return the root path for the instance.
///
/// The root path is the path to the directory containing the container's state.
#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn get_instance_root<P: AsRef<Path>>(
root_path: P,
instance_id: &str,
Expand All @@ -25,6 +27,7 @@ pub fn get_instance_root<P: AsRef<Path>>(
/// Checks if the container exists.
///
/// The root path is the path to the directory containing the container's state.
#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn instance_exists<P: AsRef<Path>>(root_path: P, container_id: &str) -> Result<bool> {
let instance_root = construct_instance_root(root_path, container_id)?;
Ok(instance_root.exists())
Expand All @@ -35,6 +38,7 @@ struct Options {
root: Option<PathBuf>,
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn determine_rootdir(
bundle: impl AsRef<Path>,
namespace: &str,
Expand All @@ -53,6 +57,7 @@ pub fn determine_rootdir(
Ok(path)
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn construct_instance_root<P: AsRef<Path>>(root_path: P, container_id: &str) -> Result<PathBuf> {
let root_path = root_path.as_ref().canonicalize().with_context(|| {
format!(
Expand Down
21 changes: 21 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/shim/cli.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::env::current_dir;
use std::fmt::Debug;
use std::sync::Arc;

use chrono::Utc;
Expand All @@ -8,6 +9,7 @@ use containerd_shim::util::write_address;
use containerd_shim::{self as shim, api, ExitSignal};
use oci_spec::runtime::Spec;
use shim::Flags;
use tracing::{instrument, Span};

use crate::sandbox::instance::Instance;
use crate::sandbox::shim::events::{RemoteEventSender, ToTimestamp};
Expand All @@ -23,13 +25,28 @@ pub struct Cli<T: Instance + Sync + Send> {
_id: String,
}

impl<I> Debug for Cli<I>
where
I: Instance + Sync + Send,
<I as Instance>::Engine: Default,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Cli {{ namespace: {:?}, containerd_address: {:?}, _id: {:?} }}",
self.namespace, self.containerd_address, self._id
)
}
}

impl<I> shim::Shim for Cli<I>
where
I: Instance + Sync + Send,
<I as Instance>::Engine: Default,
{
type T = Local<I>;

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn new(_runtime_id: &str, args: &Flags, _config: &mut shim::Config) -> Self {
Cli {
engine: Default::default(),
Expand All @@ -40,6 +57,7 @@ where
}
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn start_shim(&mut self, opts: containerd_shim::StartOpts) -> shim::Result<String> {
let dir = current_dir().map_err(|err| ShimError::Other(err.to_string()))?;
let spec = Spec::load(dir.join("config.json")).map_err(|err| {
Expand All @@ -63,10 +81,12 @@ where
Ok(address)
}

#[instrument(skip_all, parent = Span::current(), level = "Info")]
fn wait(&mut self) {
self.exit.wait();
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn create_task_service(&self, publisher: RemotePublisher) -> Self::T {
let events = RemoteEventSender::new(&self.namespace, publisher);
let exit = self.exit.clone();
Expand All @@ -80,6 +100,7 @@ where
)
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn delete_shim(&mut self) -> shim::Result<api::DeleteResponse> {
Ok(api::DeleteResponse {
exit_status: 137,
Expand Down
10 changes: 10 additions & 0 deletions crates/containerd-shim-wasm/src/sandbox/shim/instance_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::{Arc, OnceLock, RwLock};
use std::time::Duration;

use chrono::{DateTime, Utc};
use tracing::{instrument, Span};

use crate::sandbox::instance::Nop;
use crate::sandbox::shim::instance_option::InstanceOption;
Expand All @@ -16,6 +17,7 @@ pub(super) struct InstanceData<T: Instance> {
}

impl<T: Instance> InstanceData<T> {
#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn new_instance(id: impl AsRef<str>, cfg: InstanceConfig<T::Engine>) -> Result<Self> {
let id = id.as_ref().to_string();
let instance = InstanceOption::Instance(T::new(id, Some(&cfg))?);
Expand All @@ -27,6 +29,7 @@ impl<T: Instance> InstanceData<T> {
})
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn new_base(id: impl AsRef<str>, cfg: InstanceConfig<T::Engine>) -> Result<Self> {
let id = id.as_ref().to_string();
let instance = InstanceOption::Nop(Nop::new(id, None)?);
Expand All @@ -38,14 +41,17 @@ impl<T: Instance> InstanceData<T> {
})
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn pid(&self) -> Option<u32> {
self.pid.get().copied()
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn config(&self) -> &InstanceConfig<T::Engine> {
&self.cfg
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn start(&self) -> Result<u32> {
let mut s = self.state.write().unwrap();
s.start()?;
Expand All @@ -65,13 +71,15 @@ impl<T: Instance> InstanceData<T> {
res
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn kill(&self, signal: u32) -> Result<()> {
let mut s = self.state.write().unwrap();
s.kill()?;

self.instance.kill(signal)
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn delete(&self) -> Result<()> {
let mut s = self.state.write().unwrap();
s.delete()?;
Expand All @@ -86,13 +94,15 @@ impl<T: Instance> InstanceData<T> {
res
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn wait(&self) -> (u32, DateTime<Utc>) {
let res = self.instance.wait();
let mut s = self.state.write().unwrap();
*s = TaskState::Exited;
res
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
pub fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
let res = self.instance.wait_timeout(t);
if res.is_some() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::time::Duration;

use chrono::{DateTime, Utc};
use tracing::{instrument, Span};

use crate::sandbox::instance::Nop;
use crate::sandbox::{Instance, InstanceConfig, Result};
Expand All @@ -13,32 +14,37 @@ pub(super) enum InstanceOption<I: Instance> {
impl<I: Instance> Instance for InstanceOption<I> {
type Engine = ();

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn new(_id: String, _cfg: Option<&InstanceConfig<Self::Engine>>) -> Result<Self> {
// this is never called
unimplemented!();
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn start(&self) -> Result<u32> {
match self {
Self::Instance(i) => i.start(),
Self::Nop(i) => i.start(),
}
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn kill(&self, signal: u32) -> Result<()> {
match self {
Self::Instance(i) => i.kill(signal),
Self::Nop(i) => i.kill(signal),
}
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn delete(&self) -> Result<()> {
match self {
Self::Instance(i) => i.delete(),
Self::Nop(i) => i.delete(),
}
}

#[instrument(skip_all, parent = Span::current(), level= "Info")]
fn wait_timeout(&self, t: impl Into<Option<Duration>>) -> Option<(u32, DateTime<Utc>)> {
match self {
Self::Instance(i) => i.wait_timeout(t),
Expand Down
Loading

0 comments on commit 3440fa2

Please sign in to comment.