Skip to content

Commit

Permalink
fix: clean up events (#2883)
Browse files Browse the repository at this point in the history
  • Loading branch information
tusharmath authored Sep 23, 2024
1 parent 955bf9c commit 9b7beba
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 57 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde = { version = "1.0.200", features = ["derive"] }
derive_more = "0.99.18"
thiserror = "1.0.59"
url = { version = "2.5.0", features = ["serde"] }
convert_case = "0.6.0"

[dependencies]
# dependencies specific to CLI must have optional = true and the dep should be added to default feature.
Expand Down Expand Up @@ -149,7 +150,7 @@ async-graphql = { workspace = true, features = [
"opentelemetry",
] }
dotenvy = "0.15.7"
convert_case = "0.6.0"
convert_case = { workspace = true }
rand = "0.8.5"
tailcall-macros = { path = "tailcall-macros" }
tailcall-tracker = { path = "tailcall-tracker", optional = true }
Expand Down
1 change: 1 addition & 0 deletions tailcall-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ async-trait = "0.1.81"
chrono = "0.4.38"
whoami = "1.5.2"
strum = "0.26.3"
convert_case = { workspace = true }
6 changes: 3 additions & 3 deletions tailcall-tracker/src/collect/ga.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ const GA_TRACKER_URL: &str = "https://www.google-analytics.com";

/// Event structure to be sent to GA
#[derive(Debug, Serialize, Deserialize)]
struct GaEvent {
struct Payload {
client_id: String,
events: Vec<Event>,
}

impl GaEvent {
impl Payload {
pub fn new(event: Event) -> Self {
Self { client_id: event.clone().client_id, events: vec![event] }
}
Expand All @@ -35,7 +35,7 @@ impl Tracker {
}
}
fn create_request(&self, event: Event) -> Result<reqwest::Request> {
let event = GaEvent::new(event);
let event = Payload::new(event);
let mut url = reqwest::Url::parse(self.base_url.as_str())?;
url.set_path("/mp/collect");
url.query_pairs_mut()
Expand Down
94 changes: 62 additions & 32 deletions tailcall-tracker/src/collect/posthog.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,82 @@
use serde::de::Error;
use std::collections::HashMap;

use chrono::NaiveDateTime;
use reqwest::header::{HeaderName, HeaderValue};
use serde::Serialize;
use serde_json::Value;

use super::super::Result;
use super::Collect;
use crate::Event;

pub struct Tracker {
api_secret: &'static str,
client_id_key: &'static str,
}

impl Tracker {
pub fn new(api_secret: &'static str, client_id_key: &'static str) -> Self {
Self { api_secret, client_id_key }
pub fn new(api_secret: &'static str) -> Self {
Self { api_secret }
}
}

#[derive(Debug, Serialize)]
struct Payload {
api_key: String,
event: String,
distinct_id: String,
properties: HashMap<String, serde_json::Value>,
timestamp: Option<NaiveDateTime>,
}

impl Payload {
fn new(api_key: String, input: Event) -> Self {
let mut properties = HashMap::new();
let distinct_id = input.client_id.to_string();
let event = input.event_name.to_string();

if let Ok(Value::Object(map)) = serde_json::to_value(input) {
for (key, value) in map {
properties.insert(key, value);
}
}

Self {
api_key,
event,
distinct_id,
properties,
timestamp: Some(chrono::Utc::now().naive_utc()),
}
}
}

impl Tracker {
fn create_request(&self, event: Event) -> Result<reqwest::Request> {
let url = reqwest::Url::parse("https://us.i.posthog.com/capture/")?;
let mut request = reqwest::Request::new(reqwest::Method::POST, url);
request.headers_mut().insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/json"),
);

let event = Payload::new(self.api_secret.to_string(), event);

let _ = request
.body_mut()
.insert(reqwest::Body::from(serde_json::to_string(&event)?));

Ok(request)
}
}

#[async_trait::async_trait]
impl Collect for Tracker {
// TODO: move http request to a dispatch
async fn collect(&self, event: Event) -> Result<()> {
let api_secret = self.api_secret;
let client_id_key = self.client_id_key;
let handle_posthog = tokio::task::spawn_blocking(move || -> Result<()> {
let client = posthog_rs::client(api_secret);
let json = serde_json::to_value(&event)?;
let mut posthog_event =
posthog_rs::Event::new(event.event_name.clone(), event.client_id);

match json {
serde_json::Value::Object(map) => {
for (mut key, value) in map {
if key == client_id_key {
key = "distinct_id".to_string();
}
posthog_event.insert_prop(key, value)?;
}
}
_ => {
return Err(
serde_json::Error::custom("Failed to serialize event for posthog").into(),
);
}
}
let request = self.create_request(event)?;
let client = reqwest::Client::new();
client.execute(request).await?;

client.capture(posthog_event)?;
Ok(())
})
.await;
handle_posthog??;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const DEFAULT_CLIENT_ID: &str = "<anonymous>";

pub struct Tracker {
collectors: Vec<Box<dyn Collect>>,
is_tracking: bool,
can_track: bool,
start_time: DateTime<Utc>,
}

Expand All @@ -37,34 +37,33 @@ impl Default for Tracker {
GA_API_SECRET.to_string(),
GA_MEASUREMENT_ID.to_string(),
));
let posthog_tracker = Box::new(posthog::Tracker::new(POSTHOG_API_SECRET, "client_id"));
let posthog_tracker = Box::new(posthog::Tracker::new(POSTHOG_API_SECRET));
let start_time = Utc::now();
let can_track = can_track();
Self {
collectors: vec![ga_tracker, posthog_tracker],
is_tracking: can_track(),
can_track,
start_time,
}
}
}

impl Tracker {
pub async fn init_ping(&'static self, duration: Duration) {
if self.is_tracking {
let mut interval = tokio::time::interval(duration);
tokio::task::spawn(async move {
loop {
interval.tick().await;
let _ = self.dispatch(EventKind::Ping).await;
}
});
}
let mut interval = tokio::time::interval(duration);
tokio::task::spawn(async move {
loop {
interval.tick().await;
let _ = self.dispatch(EventKind::Ping).await;
}
});
}

pub async fn dispatch(&'static self, event_kind: EventKind) -> Result<()> {
if self.is_tracking {
if self.can_track {
// Create a new event
let event = Event {
event_name: event_kind.to_string().to_string(),
event_name: event_kind.name(),
start_time: self.start_time,
cores: cores(),
client_id: client_id(),
Expand Down
34 changes: 29 additions & 5 deletions tailcall-tracker/src/event.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::ops::Deref;

use chrono::{DateTime, Utc};
use convert_case::{Case, Casing};
use serde::{Deserialize, Serialize};

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Event {
pub event_name: String,
pub event_name: Name,
pub start_time: DateTime<Utc>,
pub cores: usize,
pub client_id: String,
Expand All @@ -16,17 +19,38 @@ pub struct Event {
pub version: String,
}

#[derive(Clone)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Name(String);
impl From<String> for Name {
fn from(name: String) -> Self {
Self(name.to_case(Case::Snake))
}
}
impl Deref for Name {
type Target = str;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl From<Name> for String {
fn from(val: Name) -> Self {
val.0
}
}

#[derive(Debug, Clone)]
pub enum EventKind {
Ping,
Command(String),
}

impl EventKind {
pub fn to_string(&self) -> String {
pub fn name(&self) -> Name {
match self {
Self::Ping => "ping".to_string(),
Self::Command(name) => name.clone(),
Self::Ping => Name::from("ping".to_string()),
Self::Command(name) => Name::from(name.to_string()),
}
}
}
4 changes: 2 additions & 2 deletions tailcall-tracker/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod can_track;
mod collect;
mod dispatch;
mod error;
mod event;
mod tracker;
pub use dispatch::Tracker;
use error::Result;
pub use event::{Event, EventKind};
pub use tracker::Tracker;

1 comment on commit 9b7beba

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Running 30s test @ http://localhost:8000/graphql

4 threads and 100 connections

Thread Stats Avg Stdev Max +/- Stdev
Latency 11.57ms 4.53ms 132.40ms 87.90%
Req/Sec 2.19k 269.24 3.02k 82.33%

262136 requests in 30.02s, 1.31GB read

Requests/sec: 8731.05

Transfer/sec: 44.81MB

Please sign in to comment.