Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): change gRPC writes to use bi-directional streams #8930

Merged
merged 6 commits into from
Nov 8, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1583,9 +1583,9 @@ func (w *gRPCWriter) queryProgress() (int64, error) {
return persistedSize, err
}

// uploadBuffer opens a bi-directional Write stream and uploads the buffer at
// the given offset, and will mark the write as finished if we are done
// receiving data from the user. The resulting write offset after uploading the
// uploadBuffer uploads the buffer at the given offset using a bi-directional
// Write stream. It will open a new stream if necessary (on the first call or
// after resuming from failure). The resulting write offset after uploading the
// buffer is returned, as well as well as the final Object if the upload is
// completed.
//
Expand Down Expand Up @@ -1670,16 +1670,19 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
if err == io.EOF {
// err was io.EOF. The client-side of a stream only gets an EOF on Send
// when the backend closes the stream and wants to return an error
// status. Closing the stream receives the status as an error.
err = w.stream.CloseSend()

// Drop the stream reference as a new one will need to be created
w.stream = nil
// status.

// Receive from the stream Recv() until it returns a non-nil error
// to receive the server's status as an error. We may get multiple
// messages before the error due to buffering.
err = nil
for err == nil {
_, err = w.stream.Recv()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in another comment, I don't see why we would make this change; stream.CloseAndRecv should do what we want?


// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
// If not retriable, falling through will return the error received.
if shouldRetry(err) {
// TODO: Add test case for failure modes of querying progress.
writeOffset, err = w.determineOffset(start)
Expand All @@ -1688,6 +1691,9 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this happen outside of the if ShouldRetry() conditional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it doesn't matter too much either way here. If we don't go into the conditional (ie. shouldRetry == false) it'll return from uploadBuffer with the err and we won't reuse the stream. If we do go into the conditional, I don't think it matters if it happened before or during. Unless I'm missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right but it seems more readable to drop the stream reference immediately after the error if that's the reason we are doing it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this line


// Continue sending requests, opening a new stream and resending
// any bytes not yet persisted as per QueryWriteStatus
continue
Expand Down Expand Up @@ -1718,6 +1724,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// Done sending data (remainingDataFitsInSingleReq should == true if we
// reach this code). Receive from the stream to confirm the persisted data.
resp, err := w.stream.Recv()

// Retriable errors mean we should start over and attempt to
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
Expand All @@ -1728,30 +1735,35 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
return nil, 0, err
}
sent = int(writeOffset) - int(start)

// Drop the stream reference as a new one will need to be created.
w.stream = nil
tritone marked this conversation as resolved.
Show resolved Hide resolved

continue
}
if err != nil {
return nil, 0, err
}

// Confirm the persisted data
// Confirm the persisted data if we have not finished uploading the object.
if !lastWriteOfEntireObject {
if resp.GetPersistedSize() != writeOffset {
tritone marked this conversation as resolved.
Show resolved Hide resolved
// retry
// Retry if not all bytes were persisted.
writeOffset = resp.GetPersistedSize()
sent = int(writeOffset) - int(start)
continue
}
} else {
// If the object is done uploading, close the send stream and check for errors.
// If the object is done uploading, close the send stream to receive
// from the stream without blocking.
err = w.stream.CloseSend()
if err != nil {
tritone marked this conversation as resolved.
Show resolved Hide resolved
return nil, 0, err
}

// Stream receives do not block once send is closed, but we may not
// receive the response with the object right away; loop until we
// receive the object or error out
// receive the object or error out.
var obj *storagepb.Object
for obj == nil {
resp, err := w.stream.Recv()
Expand All @@ -1762,6 +1774,14 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
obj = resp.GetResource()
}

// Even though we received the object response, continue reading
// until we receive a non-nil error, to ensure the stream does not
// leak even if the context isn't cancelled. See:
// https://github.com/grpc/grpc-go/commit/365770fcbd7dfb9d921cb44827ede770f33be44f
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there docs that we can link to that officially document the "stream protocol" rather than just a commit message?

If not I think it'd be good to get a review from someone on the gRPC Go team if possible, given that these contracts are a little convoluted seemingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the actual docs: https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream

I'll update it in the comment as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking at these docs I'm wondering if we can use the CloseAndRecv helper instead to avoid writing these loops? We already do this here:

resp, err := w.stream.CloseAndRecv()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have that helper function for the bidi stream

for err == nil {
_, err = w.stream.Recv()
}

return obj, writeOffset, nil
}

Expand Down
Loading