diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs index ecfbf3d357bae..e05859fe06e02 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/main.rs @@ -48,6 +48,8 @@ pub struct IndexerGrpcDataServiceConfig { // The address for TLS and non-TLS gRPC server to listen on. pub data_service_grpc_tls_config: Option, pub data_service_grpc_non_tls_config: Option, + // The size of the response channel that response can be buffered. + pub data_service_response_channel_size: Option, // A list of auth tokens that are allowed to access the service. pub whitelisted_auth_tokens: Vec, // File store config. @@ -91,6 +93,7 @@ impl RunnableConfig for IndexerGrpcDataServiceConfig { let server = RawDataServerWrapper::new( self.redis_read_replica_address.clone(), self.file_store_config.clone(), + self.data_service_response_channel_size, ); let svc = aptos_protos::indexer::v1::raw_data_server::RawDataServer::new(server) .send_compressed(CompressionEncoding::Gzip) diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 95251c23a79c0..bd871b7518901 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -49,9 +49,8 @@ const AHEAD_OF_CACHE_RETRY_SLEEP_DURATION_MS: u64 = 50; // TODO(larry): fix all errors treated as transient errors. const TRANSIENT_DATA_ERROR_RETRY_SLEEP_DURATION_MS: u64 = 1000; -// Up to MAX_RESPONSE_CHANNEL_SIZE response can be buffered in the channel. If the channel is full, -// the server will not fetch more data from the cache and file store until the channel is not full. -const MAX_RESPONSE_CHANNEL_SIZE: usize = 80; +// Default max response channel size. +const DEFAULT_MAX_RESPONSE_CHANNEL_SIZE: usize = 3; // The server will retry to send the response to the client and give up after RESPONSE_CHANNEL_SEND_TIMEOUT. // This is to prevent the server from being occupied by a slow client. @@ -62,16 +61,22 @@ const SHORT_CONNECTION_DURATION_IN_SECS: u64 = 10; pub struct RawDataServerWrapper { pub redis_client: Arc, pub file_store_config: IndexerGrpcFileStoreConfig, + pub data_service_response_channel_size: Option, } impl RawDataServerWrapper { - pub fn new(redis_address: String, file_store_config: IndexerGrpcFileStoreConfig) -> Self { + pub fn new( + redis_address: String, + file_store_config: IndexerGrpcFileStoreConfig, + data_service_response_channel_size: Option, + ) -> Self { Self { redis_client: Arc::new( redis::Client::open(format!("redis://{}", redis_address)) .expect("Create redis client failed."), ), file_store_config, + data_service_response_channel_size, } } } @@ -114,7 +119,10 @@ impl RawData for RawDataServerWrapper { let transactions_count = request.transactions_count; // Response channel to stream the data to the client. - let (tx, rx) = channel(MAX_RESPONSE_CHANNEL_SIZE); + let (tx, rx) = channel( + self.data_service_response_channel_size + .unwrap_or(DEFAULT_MAX_RESPONSE_CHANNEL_SIZE), + ); let mut current_version = match &request.starting_version { Some(version) => *version, None => {