-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cut tagger stream event responses into chunks of 4MB max size each #30192
Cut tagger stream event responses into chunks of 4MB max size each #30192
Conversation
7fa0a76
to
83a1cb7
Compare
83a1cb7
to
61bdcf0
Compare
Test changes on VMUse this command from test-infra-definitions to manually test this PR changes on a VM: inv create-vm --pipeline-id=47031106 --os-family=ubuntu Note: This applies to commit a5c6aca |
// The size of each item is calculated using computeSize | ||
// | ||
// This function assumes that the size of each single item of the initial slice is not larger than maxChunkSize | ||
func splitBySize[T any](slice []T, maxChunkSize int, computeSize func(T) int) [][]T { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
declared this generic function in order to make it easy to unit test the split functionality
grpc.MaxRecvMsgSize(maxMessageSize), | ||
grpc.MaxSendMsgSize(maxMessageSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default message size in grpc is 4MB.
I added these explicitly here in order to make sure the tagger server gets the same message max size when splitting the response into chunks.
grpc.MaxSendMsgSize(maxMessageSize), | ||
grpc.MaxRecvMsgSize(maxMessageSize), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default message size in grpc is 4MB.
I added these explicitly here in order to make sure the tagger server gets the same message max size when splitting the response into chunks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for files owned by ASC
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @adel121
change looks good. I added 2 small comments
comp/api/api/apiimpl/server_cmd.go
Outdated
@@ -48,18 +48,21 @@ func (server *apiServer) startCMDServer( | |||
|
|||
// gRPC server | |||
authInterceptor := grpcutil.AuthInterceptor(parseToken) | |||
const maxMessageSize = 4 * 1024 * 1024 // 4 MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nits: move const
definition on the beginning of the file (after line 33)
} | ||
|
||
// splitEvents splits the array of events to chunks with at most maxChunkSize each | ||
func splitEvents(events []*pb.StreamTagsEvent, maxChunkSize int) [][]*pb.StreamTagsEvent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this function works, and because it is a slice of slice of *pb.StreamTagsEvent
so the memory use to create the new slice should not be to large.
However for memory optimisation, I'm wondering if has been able to see if you can return an iterator instead of creating new slices like slices.Chunk function is doing: https://pkg.go.dev/slices#Chunk
It would help to create sequentially the new slice when create the proto messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterators were introduced in golang v1.23.0, the slices Chunk method that you mention was itself added in 1.23.0.
Currently, we are on v1.22.0 on the agent.
IMO I don't think it is worth it to upgrade the go version now to do this change.
We can create a card to make the change once the agent golang version is upgraded so that we don't forget to do this optimisation.
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes perfect
Thanks for looking at it 🙇
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you can add a comment (TODO) in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this fix and optimisation 👍
Regression Detector |
/merge |
🚂 MergeQueue: pull request added to the queue The median merge time in Use |
What does this PR do?
This PR modifies the remote tagger server so that it streams the tagger events in chunks with each chunk having a size of 4 MB at most.
Motivation
Avoid failure of communication between client and server on large clusters on which the size of the message might exceed 4MB.
Describe how to test/QA your changes
Ensure that the remote tagger still works as expected.
Possible Drawbacks / Trade-offs
Additional Notes