Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(actix): Migrate ProjectUpstream to relay_system::Service #1727

Merged
merged 18 commits into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 10 additions & 23 deletions relay-server/src/actors/project_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use actix::{Actor, Message};
use actix_web::ResponseError;
use tokio::sync::mpsc;
use tokio::time::Instant;
Expand All @@ -11,13 +10,13 @@ use relay_metrics::{self, FlushBuckets, InsertMetrics, MergeBuckets};
use relay_quotas::RateLimits;
use relay_redis::RedisPool;
use relay_statsd::metric;
use relay_system::{compat, Addr, FromMessage, Interface, Sender, Service};
use relay_system::{Addr, FromMessage, Interface, Sender, Service};

use crate::actors::outcome::DiscardReason;
use crate::actors::processor::ProcessEnvelope;
use crate::actors::project::{Project, ProjectSender, ProjectState};
use crate::actors::project_local::{LocalProjectSource, LocalProjectSourceService};
use crate::actors::project_upstream::UpstreamProjectSource;
use crate::actors::project_upstream::{UpstreamProjectSource, UpstreamProjectSourceService};
use crate::envelope::Envelope;
use crate::service::REGISTRY;
use crate::statsd::{RelayCounters, RelayGauges, RelayHistograms, RelayTimers};
Expand Down Expand Up @@ -324,15 +323,15 @@ impl FromMessage<FlushBuckets> for ProjectCache {
struct ProjectSource {
config: Arc<Config>,
local_source: Addr<LocalProjectSource>,
upstream_source: actix::Addr<UpstreamProjectSource>,
upstream_source: Addr<UpstreamProjectSource>,
#[cfg(feature = "processing")]
redis_source: Option<RedisProjectSource>,
}

impl ProjectSource {
pub fn new(config: Arc<Config>, _redis: Option<RedisPool>) -> Self {
let local_source = LocalProjectSourceService::new(config.clone()).start();
let upstream_source = UpstreamProjectSource::new(config.clone()).start();
let upstream_source = UpstreamProjectSourceService::new(config.clone()).start();

#[cfg(feature = "processing")]
let redis_source = _redis.map(|pool| RedisProjectSource::new(config.clone(), pool));
Expand Down Expand Up @@ -387,15 +386,13 @@ impl ProjectSource {
}
};

compat::send(
self.upstream_source,
FetchProjectState {
self.upstream_source
.send(FetchProjectState {
project_key,
no_cache,
},
)
.await
.map_err(|_| ())?
})
.await
.map_err(|_| ())
}
}

Expand Down Expand Up @@ -626,7 +623,7 @@ impl Service for ProjectCacheService {
}
}

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct FetchProjectState {
/// The public key to fetch the project by.
pub project_key: ProjectKey,
Expand All @@ -635,11 +632,6 @@ pub struct FetchProjectState {
pub no_cache: bool,
}

// TODO: Remove once `UpstreamProjectSource` was moved to tokio
impl Message for FetchProjectState {
type Result = Result<Arc<ProjectState>, ()>;
}

#[derive(Clone, Debug)]
pub struct FetchOptionalProjectState {
project_key: ProjectKey,
Expand All @@ -650,8 +642,3 @@ impl FetchOptionalProjectState {
self.project_key
}
}

// TODO: Remove once `RedisProjectSource` and `LocalProjectSource` were moved to tokio
impl Message for FetchOptionalProjectState {
type Result = Option<Arc<ProjectState>>;
}
Loading