Skip to content

Commit

Permalink
perf(sn): reuse buffer for ReplicateRequest unmarshaling
Browse files Browse the repository at this point in the history
Improve unmarshaling performance by reusing buffers for ReplicateRequest in the
backup replica.

The protobuf message `github.com/kakao/varlog/proto/snpb.(ReplicateRequest)` has
two slice fields—LLSN (`[]uint64`) and Data (`[][]byte`). The backup replica
receives replicated log entries from the primary replica via the gRPC service
`github.com/kakao/varlog/proto/snpb.(ReplicatorServer).Replicate`, which sends
`ReplicateRequest` messages.

Upon receiving a `ReplicateRequest`, the backup replica unmarshals the message,
which involves growing slices for fields such as LLSN and Data. This growth
causes copy overhead whenever the slice capacities need to expand.

To address this, we introduce a new method, `ResetReuse`, for reusing slices
instead of resetting them completely. The `ResetReuse` method shrinks the slice
lengths while preserving their capacities, thus avoiding the overhead of
reallocating memory.

Example implementation:

```go
type Message struct {
    Buffer []byte
    // Other fields
}

func (m *Message) Reset() {
    *m = Message{}
}

func (m *Message) ResetReuse() {
    s := m.Buffer[:0]
    *m = Message{}
    m.Buffer = s
}
```

Risks:

This approach has potential downsides. Since the heap space consumed by the
slices is not reclaimed, the storage node's memory consumption may increase.
Currently, there is no mechanism to shrink the heap usage.

Additionally, this PR changes the generated code. The protobuf compiler can
revert it, which is contrary to our intention. To catch this mistake, this PR
includes a unit test (github.com/kakao/varlog/proto/snpb.TestReplicateRequest)
to verify that the buffer backing the slices is reused.

Resolves: #795

See also: #806
  • Loading branch information
ijsong committed Jun 15, 2024
1 parent 6a93655 commit 8ef1886
Show file tree
Hide file tree
Showing 6 changed files with 830 additions and 22 deletions.
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ $(PROTO_PBS): $(PROTO_SRCS)
$(PROTOC) $(PROTO_INCS) \
--gogo_out=plugins=grpc,Mgoogle/protobuf/empty.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,paths=source_relative:. $$src ; \
done
$(MAKE) fmt
git apply -v proto/patches/*.patch

proto-check:
$(MAKE) proto
Expand Down
16 changes: 6 additions & 10 deletions internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,12 @@ type replicationServerTask struct {
err error
}

func newReplicationServerTask(req snpb.ReplicateRequest, err error) *replicationServerTask {
rst := replicationServerTaskPool.Get().(*replicationServerTask)
rst.req = req
rst.err = err
return rst
func newReplicationServerTask() *replicationServerTask {
return replicationServerTaskPool.Get().(*replicationServerTask)
}

func (rst *replicationServerTask) release() {
rst.req = snpb.ReplicateRequest{}
rst.req.ResetReuse()
rst.err = nil
replicationServerTaskPool.Put(rst)
}
Expand All @@ -113,11 +110,10 @@ func (rs *replicationServer) recv(ctx context.Context, stream snpb.Replicator_Re
go func() {
defer wg.Done()
defer close(c)
req := &snpb.ReplicateRequest{}
for {
req.Reset()
err := stream.RecvMsg(req)
rst := newReplicationServerTask(*req, err)
rst := newReplicationServerTask()
err := stream.RecvMsg(&rst.req)
rst.err = err
select {
case c <- rst:
if err != nil {
Expand Down
Loading

0 comments on commit 8ef1886

Please sign in to comment.