Skip to content

Commit

Permalink
Don't send notifications for first contact list of old users
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Aug 29, 2024
1 parent 17f6fe0 commit fd3f56d
Show file tree
Hide file tree
Showing 8 changed files with 286 additions and 107 deletions.
18 changes: 9 additions & 9 deletions src/domain/follow_change.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use chrono::{DateTime, Utc};
use nostr_sdk::prelude::*;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Serialize, Serializer};
use std::fmt;

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, PartialOrd, Ord)]
Expand All @@ -14,8 +14,8 @@ pub enum ChangeType {
#[serde(rename_all = "camelCase")]
pub struct FollowChange {
pub change_type: ChangeType,
#[serde(serialize_with = "serialize_at_as_i64")]
pub at: DateTime<Utc>,
pub previous_at: Option<DateTime<Utc>>,
pub follower: PublicKey,
pub friendly_follower: Option<String>,
pub followee: PublicKey,
Expand All @@ -37,7 +37,6 @@ impl FollowChange {
Self {
change_type: ChangeType::Followed,
at,
previous_at: None,
follower,
friendly_follower: None,
followee,
Expand All @@ -49,19 +48,13 @@ impl FollowChange {
Self {
change_type: ChangeType::Unfollowed,
at,
previous_at: None,
follower,
friendly_follower: None,
followee,
friendly_followee: None,
}
}

pub fn with_last_seen_contact_list_at(mut self, maybe_at: Option<DateTime<Utc>>) -> Self {
self.previous_at = maybe_at;
self
}

pub fn with_friendly_follower(mut self, name: String) -> Self {
self.friendly_follower = Some(name);
self
Expand All @@ -73,6 +66,13 @@ impl FollowChange {
}
}

fn serialize_at_as_i64<S>(at: &DateTime<Utc>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_i64(at.timestamp())
}

impl fmt::Display for FollowChange {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
Expand Down
25 changes: 14 additions & 11 deletions src/follow_change_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::domain::follow_change::FollowChange;
use crate::google_publisher::GooglePublisher;
use crate::google_pubsub_client::GooglePubSubClient;
use crate::refresh_friendly_id::refresh_friendly_id;
use crate::relay_subscriber::GetEventsOf;
use crate::repo::{Repo, RepoTrait};
use crate::worker_pool::{WorkerTask, WorkerTaskItem};
use nostr_sdk::prelude::*;
Expand All @@ -12,17 +13,20 @@ use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
use tracing::debug;
/// Fetches friendly ids and then sends follow change to google pubsub
pub struct FollowChangeHandler {
pub struct FollowChangeHandler<T: GetEventsOf> {
repo: Arc<Repo>,
google_publisher: GooglePublisher,
nostr_client: Client,
nostr_client: Arc<T>,
timeout_secs: u64,
}

impl FollowChangeHandler {
impl<T> FollowChangeHandler<T>
where
T: GetEventsOf,
{
pub async fn new(
repo: Arc<Repo>,
nostr_client: Client,
nostr_client: Arc<T>,
cancellation_token: CancellationToken,
settings: &Settings,
) -> Result<Self> {
Expand All @@ -45,21 +49,20 @@ impl FollowChangeHandler {
}
}

impl WorkerTask<FollowChange> for FollowChangeHandler {
impl<T: GetEventsOf> WorkerTask<FollowChange> for FollowChangeHandler<T> {
async fn call(
&self,
worker_task_item: WorkerTaskItem<FollowChange>,
) -> Result<(), Box<dyn Error>> {
let WorkerTaskItem {
item: mut follow_change,
} = worker_task_item;

// Fetch friendly IDs for the pubkeys or get it from DB if it takes more
// than timeout_secs. Whatever if found through the network is cached.
let (friendly_follower, friendly_followee) = tokio::select!(
result = fetch_friendly_ids(
&self.repo,
&self.nostr_client,
self.nostr_client.clone(),
&follow_change
) => result,
result = get_friendly_ids_from_db(&self.repo, &follow_change, self.timeout_secs) => result
Expand All @@ -81,14 +84,14 @@ impl WorkerTask<FollowChange> for FollowChangeHandler {
}

/// Get pubkey info from Nostr metadata or nip05 servers
async fn fetch_friendly_ids(
async fn fetch_friendly_ids<T: GetEventsOf>(
repo: &Arc<Repo>,
nostr_client: &Client,
nostr_client: Arc<T>,
follow_change: &FollowChange,
) -> (String, String) {
let (friendly_follower, friendly_followee) = tokio::join!(
refresh_friendly_id(repo, nostr_client, &follow_change.follower),
refresh_friendly_id(repo, nostr_client, &follow_change.followee),
refresh_friendly_id(repo, &nostr_client, &follow_change.follower),
refresh_friendly_id(repo, &nostr_client, &follow_change.followee),
);

(friendly_follower, friendly_followee)
Expand Down
107 changes: 94 additions & 13 deletions src/follows_differ.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::refresh_friendly_id::{fetch_account_info, AccountInfo};
use crate::relay_subscriber::GetEventsOf;
use crate::repo::RepoTrait;
use crate::{
domain::{follow::Follow, follow_change::FollowChange},
Expand All @@ -17,21 +19,29 @@ struct FollowsDiff {
exists_in_latest_contact_list: bool,
}

pub struct FollowsDiffer<T>
pub struct FollowsDiffer<T, U>
where
T: RepoTrait + Sync + Send,
U: GetEventsOf + Sync + Send,
{
repo: Arc<T>,
nostr_client: Arc<U>,
follow_change_sender: Sender<FollowChange>,
}

impl<T> FollowsDiffer<T>
impl<T, U> FollowsDiffer<T, U>
where
T: RepoTrait + Sync + Send,
U: GetEventsOf + Sync + Send,
{
pub fn new(repo: Arc<T>, follow_change_sender: Sender<FollowChange>) -> Self {
pub fn new(
repo: Arc<T>,
nostr_client: Arc<U>,
follow_change_sender: Sender<FollowChange>,
) -> Self {
Self {
repo,
nostr_client,
follow_change_sender,
}
}
Expand Down Expand Up @@ -81,6 +91,14 @@ where
let mut unfollowed_counter = 0;
let mut unchanged = 0;

let send_notifications = should_send_notifications(
&self.nostr_client,
maybe_last_seen_contact_list_at,
follower,
event_created_at,
)
.await?;

for (followee, diff) in follows_diff {
match diff.stored_follow {
Some(mut stored_follow) => {
Expand All @@ -90,10 +108,12 @@ where
unchanged += 1;
} else {
self.repo.delete_follow(&followee, follower).await?;
let follow_change =
FollowChange::new_unfollowed(event_created_at, *follower, followee)
.with_last_seen_contact_list_at(maybe_last_seen_contact_list_at);
self.send_follow_change(follow_change)?;

if send_notifications {
let follow_change =
FollowChange::new_unfollowed(event_created_at, *follower, followee);
self.send_follow_change(follow_change)?;
}
unfollowed_counter += 1;
}
}
Expand All @@ -106,10 +126,12 @@ where
created_at: event_created_at,
};
self.repo.upsert_follow(&follow).await?;
let follow_change =
FollowChange::new_followed(event_created_at, *follower, followee)
.with_last_seen_contact_list_at(maybe_last_seen_contact_list_at);
self.send_follow_change(follow_change)?;

if send_notifications {
let follow_change =
FollowChange::new_followed(event_created_at, *follower, followee);
self.send_follow_change(follow_change)?;
}
followed_counter += 1;
} else {
debug!("Skipping self-follow for {}", followee);
Expand All @@ -128,10 +150,12 @@ where
}

const ONE_DAY_DURATION: Duration = Duration::from_secs(60 * 60 * 24);
const ONE_WEEK_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 7);

impl<T> WorkerTask<Box<Event>> for FollowsDiffer<T>
impl<T, U> WorkerTask<Box<Event>> for FollowsDiffer<T, U>
where
T: RepoTrait + Sync + Send,
U: GetEventsOf + Sync + Send,
{
async fn call(&self, worker_task_item: WorkerTaskItem<Box<Event>>) -> Result<()> {
let WorkerTaskItem { item: event } = worker_task_item;
Expand Down Expand Up @@ -192,6 +216,39 @@ where
}
}

/// An heuristic to decide if we should send notifications for a contact list
async fn should_send_notifications<T: GetEventsOf>(
nostr_client: &Arc<T>,
maybe_last_seen_contact_list: Option<DateTime<Utc>>,
follower: &PublicKey,
event_created_at: DateTime<Utc>,
) -> Result<bool> {
if maybe_last_seen_contact_list.is_none() {
// This is the first time we see a contact list from this follower.
let AccountInfo { created_at, .. } = fetch_account_info(nostr_client, follower).await;
let Some(follower_created_at) = created_at else {
return Ok(true);
};

if (event_created_at - follower_created_at).to_std()? > ONE_WEEK_DURATION {
// If there's a big gap from the time of creation of the follower to
// the current contact list, then we assume that most of the follows
// are from long ago and we skip creating follow changes for this
// one to avoid noise, only used to update the DB and have the
// initial contact list for upcoming diffs.

info!(
"Skipping notifications for first seen list of an old account: {}",
follower
);

return Ok(false);
}
}

Ok(true)
}

fn log_line(
follower: PublicKey,
followed_counter: usize,
Expand Down Expand Up @@ -251,6 +308,7 @@ mod tests {
use crate::domain::follow::Follow;
use crate::repo::RepoError;
use chrono::{Duration, Utc};
use nostr_sdk::PublicKey;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::UNIX_EPOCH;
Expand All @@ -259,10 +317,29 @@ mod tests {
use tokio::time::sleep;

#[derive(Default)]

struct MockNostrClient;
impl GetEventsOf for MockNostrClient {
async fn get_events_of(
&self,
_filter: Vec<Filter>,
_timeout: Option<core::time::Duration>,
) -> Result<Vec<Event>, nostr_sdk::client::Error> {
Ok(vec![])
}
}
struct MockRepo {
follows: Arc<Mutex<HashMap<PublicKey, (Vec<Follow>, Option<DateTime<Utc>>)>>>,
}

impl Default for MockRepo {
fn default() -> Self {
Self {
follows: Arc::new(Mutex::new(HashMap::new())),
}
}
}

impl RepoTrait for MockRepo {
async fn maybe_update_last_contact_list_at(
&self,
Expand Down Expand Up @@ -805,7 +882,11 @@ mod tests {
) -> Result<Vec<FollowChange>> {
let (follow_change_sender, _) = channel(100);
let repo = Arc::new(MockRepo::default());
let follows_differ = FollowsDiffer::new(repo.clone(), follow_change_sender.clone());
let follows_differ = FollowsDiffer::new(
repo.clone(),
Arc::new(MockNostrClient),
follow_change_sender.clone(),
);

let mut follow_change_receiver = follow_change_sender.subscribe();
let follow_changes: Arc<Mutex<Vec<FollowChange>>> = Arc::new(Mutex::new(Vec::new()));
Expand Down
11 changes: 6 additions & 5 deletions src/google_pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,11 @@ impl PublishEvents for GooglePubSubClient {

let pubsub_messages = pubsub_messages?;

info!(
"{} Google PubSub messages after filtering",
pubsub_messages.len()
);

if pubsub_messages.is_empty() {
return Ok(());
}

let len = pubsub_messages.len();
let request = PublishRequest {
topic: self.google_full_topic.clone(),
messages: pubsub_messages,
Expand All @@ -113,6 +109,11 @@ impl PublishEvents for GooglePubSubClient {
.await
.map_err(GooglePublisherError::PublishError)?;

info!(
"Published {} messages to Google PubSub {}",
len, self.google_full_topic
);

Ok(())
}
}
10 changes: 7 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,14 @@ async fn main() -> Result<()> {
let repo = Arc::new(Repo::new(graph));

info!("Initializing workers for follower list diff calculation");
let shared_nostr_client = Arc::new(create_client());
let (follow_change_sender, _) =
broadcast::channel::<FollowChange>(settings.follow_change_channel_size);
let follows_differ_worker = FollowsDiffer::new(repo.clone(), follow_change_sender.clone());
let follows_differ_worker = FollowsDiffer::new(
repo.clone(),
shared_nostr_client.clone(),
follow_change_sender.clone(),
);
let cancellation_token = CancellationToken::new();
let (event_sender, event_receiver) =
broadcast::channel::<Box<Event>>(settings.event_channel_size);
Expand All @@ -70,7 +75,6 @@ async fn main() -> Result<()> {
)?;

info!("Starting follower change processing task");
let shared_nostr_client = create_client();
let follow_change_handler = FollowChangeHandler::new(
repo.clone(),
shared_nostr_client.clone(),
Expand All @@ -94,7 +98,7 @@ async fn main() -> Result<()> {
.kind(Kind::ContactList)];

let nostr_sub = start_nostr_subscription(
shared_nostr_client,
shared_nostr_client.clone(),
[settings.relay].into(),
filters,
event_sender,
Expand Down
Loading

0 comments on commit fd3f56d

Please sign in to comment.