Skip to content

Commit

Permalink
fix: set max decode size of proto message (#2275)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <[email protected]>
Co-authored-by: Yashash H L <[email protected]>
  • Loading branch information
kohlisid and yhl25 authored Dec 12, 2024
1 parent 346f2a7 commit fea792b
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 7 deletions.
8 changes: 4 additions & 4 deletions rust/numaflow-core/src/monovertex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub(crate) async fn start_forwarder(
let mut source_grpc_client =
SourceClient::new(create_rpc_channel(source_config.socket_path.clone().into()).await?)
.max_encoding_message_size(source_config.grpc_max_message_size)
.max_encoding_message_size(source_config.grpc_max_message_size);
.max_decoding_message_size(source_config.grpc_max_message_size);

wait_until_source_ready(&cln_token, &mut source_grpc_client).await?;
Some(source_grpc_client)
Expand Down Expand Up @@ -92,7 +92,7 @@ pub(crate) async fn start_forwarder(
let mut sink_grpc_client =
SinkClient::new(create_rpc_channel(udsink_config.socket_path.clone().into()).await?)
.max_encoding_message_size(udsink_config.grpc_max_message_size)
.max_encoding_message_size(udsink_config.grpc_max_message_size);
.max_decoding_message_size(udsink_config.grpc_max_message_size);

wait_until_sink_ready(&cln_token, &mut sink_grpc_client).await?;
Some(sink_grpc_client)
Expand Down Expand Up @@ -125,7 +125,7 @@ pub(crate) async fn start_forwarder(
create_rpc_channel(fb_sink_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(fb_sink_config.grpc_max_message_size)
.max_encoding_message_size(fb_sink_config.grpc_max_message_size);
.max_decoding_message_size(fb_sink_config.grpc_max_message_size);

wait_until_sink_ready(&cln_token, &mut fb_sink_grpc_client).await?;
Some(fb_sink_grpc_client)
Expand Down Expand Up @@ -164,7 +164,7 @@ pub(crate) async fn start_forwarder(
create_rpc_channel(transformer_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(transformer_config.grpc_max_message_size)
.max_encoding_message_size(transformer_config.grpc_max_message_size);
.max_decoding_message_size(transformer_config.grpc_max_message_size);

wait_until_transformer_ready(&cln_token, &mut transformer_grpc_client).await?;
Some(transformer_grpc_client.clone())
Expand Down
4 changes: 2 additions & 2 deletions rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ async fn create_source_type(
create_rpc_channel(udsource_config.socket_path.clone().into()).await?,
)
.max_encoding_message_size(udsource_config.grpc_max_message_size)
.max_encoding_message_size(udsource_config.grpc_max_message_size);
.max_decoding_message_size(udsource_config.grpc_max_message_size);
wait_until_source_ready(&cln_token, &mut source_grpc_client).await?;
let (ud_read, ud_ack, ud_lag) = new_source(
source_grpc_client.clone(),
Expand Down Expand Up @@ -279,7 +279,7 @@ async fn create_transformer(
create_rpc_channel(ud_transformer.socket_path.clone().into()).await?,
)
.max_encoding_message_size(ud_transformer.grpc_max_message_size)
.max_encoding_message_size(ud_transformer.grpc_max_message_size);
.max_decoding_message_size(ud_transformer.grpc_max_message_size);
wait_until_transformer_ready(&cln_token, &mut transformer_grpc_client).await?;
return Ok((
Some(SourceTransformHandle::new(transformer_grpc_client.clone()).await?),
Expand Down
2 changes: 1 addition & 1 deletion rust/numaflow-core/src/shared/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub(crate) async fn create_sink_handle(
let mut sink_grpc_client =
SinkClient::new(create_rpc_channel(ud_config.socket_path.clone().into()).await?)
.max_encoding_message_size(ud_config.grpc_max_message_size)
.max_encoding_message_size(ud_config.grpc_max_message_size);
.max_decoding_message_size(ud_config.grpc_max_message_size);
wait_until_sink_ready(cln_token, &mut sink_grpc_client).await?;
// TODO: server info?

Expand Down

0 comments on commit fea792b

Please sign in to comment.