Skip to content

Commit

Permalink
remove metrics from IndexerExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
Giems committed Jul 9, 2024
1 parent bcf5176 commit 5c9e8ed
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 20 deletions.
2 changes: 1 addition & 1 deletion crates/sui-bridge-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async fn start_processing_sui_checkpoints(
let mut executor = IndexerExecutor::new(
progress_store,
1, /* workflow types */
ingestion_metrics,
// ingestion_metrics,
);

let indexer_metrics_cloned = indexer_meterics.clone();
Expand Down
14 changes: 8 additions & 6 deletions crates/sui-data-ingestion-core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ pub struct IndexerExecutor<P> {
progress_store: ProgressStoreWrapper<P>,
pool_progress_sender: mpsc::Sender<(String, CheckpointSequenceNumber)>,
pool_progress_receiver: mpsc::Receiver<(String, CheckpointSequenceNumber)>,
metrics: DataIngestionMetrics,
// metrics: DataIngestionMetrics,
}

impl<P: ProgressStore> IndexerExecutor<P> {
pub fn new(progress_store: P, number_of_jobs: usize, metrics: DataIngestionMetrics) -> Self {
// pub fn new(progress_store: P, number_of_jobs: usize, metrics: DataIngestionMetrics) -> Self {
pub fn new(progress_store: P, number_of_jobs: usize) -> Self {
let (pool_progress_sender, pool_progress_receiver) =
mpsc::channel(number_of_jobs * MAX_CHECKPOINTS_IN_PROGRESS);
Self {
Expand All @@ -40,7 +41,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
progress_store: ProgressStoreWrapper::new(progress_store),
pool_progress_sender,
pool_progress_receiver,
metrics,
// metrics,
}
}

Expand Down Expand Up @@ -90,7 +91,7 @@ impl<P: ProgressStore> IndexerExecutor<P> {
gc_sender.send(seq_number).await?;
reader_checkpoint_number = seq_number;
}
self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64);
// self.metrics.data_ingestion_checkpoint.with_label_values(&[&task_name]).set(sequence_number as i64);
}
Some(checkpoint) = checkpoint_recv.recv() => {
for sender in &self.pool_senders {
Expand All @@ -114,9 +115,10 @@ pub async fn setup_single_workflow<W: Worker + 'static>(
oneshot::Sender<()>,
)> {
let (exit_sender, exit_receiver) = oneshot::channel();
let metrics = DataIngestionMetrics::new(&Registry::new());
// let metrics = DataIngestionMetrics::new(&Registry::new());
let progress_store = ShimProgressStore(initial_checkpoint_number);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
// let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
let mut executor = IndexerExecutor::new(progress_store, 1);
let worker_pool = WorkerPool::new(worker, "workflow".to_string(), concurrency);
executor.register(worker_pool).await?;
Ok((
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn create_executor_bundle() -> ExecutorBundle {
let executor = IndexerExecutor::new(
progress_store,
1,
DataIngestionMetrics::new(&Registry::new()),
// DataIngestionMetrics::new(&Registry::new()),
);
ExecutorBundle {
executor,
Expand Down
15 changes: 8 additions & 7 deletions crates/sui-data-ingestion/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,12 @@ async fn main() -> Result<()> {
let _guard = telemetry_subscribers::TelemetryConfig::new()
.with_env()
.init();
let registry_service = mysten_metrics::start_prometheus_server(
format!("{}:{}", config.metrics_host, config.metrics_port).parse()?,
);
let registry: Registry = registry_service.default_registry();
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);
// let registry_service = mysten_metrics::start_prometheus_server(
// format!("{}:{}", config.metrics_host, config.metrics_port).parse()?,
// );
// let registry: Registry = registry_service.default_registry();
// mysten_metrics::init_metrics(&registry);
// let metrics = DataIngestionMetrics::new(&registry);

let progress_store = DynamoDBProgressStore::new(
&config.progress_store.aws_access_key_id,
Expand All @@ -114,7 +114,8 @@ async fn main() -> Result<()> {
config.progress_store.table_name,
)
.await;
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
// let mut executor = IndexerExecutor::new(progress_store, config.tasks.len(), metrics);
let mut executor = IndexerExecutor::new(progress_store, config.tasks.len());
for task_config in config.tasks {
match task_config.task {
Task::Archival(archival_config) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Indexer {
("object_snapshot".to_string(), object_snapshot_watermark),
]),
1,
DataIngestionMetrics::new(&Registry::new()),
// DataIngestionMetrics::new(&Registry::new()),
);
let worker =
new_handlers::<S, T>(store, metrics, primary_watermark, cancel.clone()).await?;
Expand Down
9 changes: 5 additions & 4 deletions crates/suins-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,11 @@ async fn main() -> Result<()> {
let (_exit_sender, exit_receiver) = oneshot::channel();
let progress_store = FileProgressStore::new(PathBuf::from(backfill_progress_file_path));

let registry: Registry = start_basic_prometheus_server();
mysten_metrics::init_metrics(&registry);
let metrics = DataIngestionMetrics::new(&registry);
let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
// let registry: Registry = start_basic_prometheus_server();
// mysten_metrics::init_metrics(&registry);
// let metrics = DataIngestionMetrics::new(&registry);
// let mut executor = IndexerExecutor::new(progress_store, 1, metrics);
let mut executor = IndexerExecutor::new(progress_store, 1);

let indexer_setup =
if let (Some(registry_id), Some(subdomain_wrapper_type), Some(name_record_type)) =
Expand Down

0 comments on commit 5c9e8ed

Please sign in to comment.