Skip to content

Commit

Permalink
multi: make UniverseRPC proof courier connection attempts lazy
Browse files Browse the repository at this point in the history
This commit modifies the Universe RPC proof courier handler to allow
connection attempts to be optionally lazy. Connection attempts are now
integrated into the backoff procedure where feasible, improving the
robustness of the connection handling process.
  • Loading branch information
ffranr committed Aug 22, 2024
1 parent 5f10d7f commit acd6bd3
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 38 deletions.
144 changes: 110 additions & 34 deletions proof/courier.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch {
// address.
func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL,
lazyConnect bool) (Courier, error) {

subscribers := make(map[uint64]*fn.EventReceiver[fn.Event])

// Create new courier addr based on URL scheme.
switch addr.Scheme {
case HashmailCourierType:
Expand All @@ -197,33 +194,11 @@ func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL,
)

case UniverseRpcCourierType:
cfg := u.cfg.UniverseRpcCfg
backoffHandler := NewBackoffHandler(
cfg.BackoffCfg, u.cfg.TransferLog,
return NewUniverseRpcCourier(
ctx, u.cfg.UniverseRpcCfg, u.cfg.TransferLog,
u.cfg.LocalArchive, addr, lazyConnect,
)

// Connect to the universe RPC server.
dialOpts, err := serverDialOpts()
if err != nil {
return nil, err
}

serverAddr := fmt.Sprintf("%s:%s", addr.Hostname(), addr.Port())
conn, err := grpc.Dial(serverAddr, dialOpts...)
if err != nil {
return nil, err
}

client := unirpc.NewUniverseClient(conn)

return &UniverseRpcCourier{
client: client,
backoffHandle: backoffHandler,
cfg: u.cfg,
subscribers: subscribers,
rawConn: conn,
}, nil

default:
return nil, fmt.Errorf("unknown courier address protocol "+
"(consider updating tapd): %v", addr.Scheme)
Expand Down Expand Up @@ -1169,17 +1144,24 @@ type UniverseRpcCourierCfg struct {
// UniverseRpcCourier is a universe RPC proof courier service handle. It
// implements the Courier interface.
type UniverseRpcCourier struct {
// client is the RPC client that the courier will use to interact with
// the universe RPC server.
client unirpc.UniverseClient
// cfg is the courier configuration.
cfg *UniverseRpcCourierCfg

// cfg is the general courier configuration.
cfg *CourierCfg
// addr is the address of the courier service.
addr *url.URL

// localArchive is the local archive that the courier will use to
// store and query for proofs.
localArchive Archiver

// rawConn is the raw connection that the courier will use to interact
// with the remote gRPC service.
rawConn *grpc.ClientConn

// client is the RPC client that the courier will use to interact with
// the universe RPC server.
client unirpc.UniverseClient

// backoffHandle is a handle to the backoff procedure used in proof
// delivery.
backoffHandle *BackoffHandler
Expand All @@ -1193,6 +1175,82 @@ type UniverseRpcCourier struct {
subscriberMtx sync.Mutex
}

// NewUniverseRpcCourier creates a new universe RPC proof courier service
// handle.
func NewUniverseRpcCourier(ctx context.Context, cfg *UniverseRpcCourierCfg,
transferLog TransferLog, localArchive Archiver, addr *url.URL,
lazyConnect bool) (*UniverseRpcCourier, error) {

courier := UniverseRpcCourier{
cfg: cfg,
addr: addr,
localArchive: localArchive,
backoffHandle: NewBackoffHandler(cfg.BackoffCfg, transferLog),
subscribers: make(map[uint64]*fn.EventReceiver[fn.Event]),
}

// If we're not lazy connecting, then we'll attempt to connect to the
// courier service immediately.
if !lazyConnect {
err := courier.ensureConnect(ctx)
if err != nil {
return nil, fmt.Errorf("unable to connect to courier "+
"service during courier handle "+
"instantiation: %w", err)
}
}

return &courier, nil
}

// ensureConnect ensures that the courier handle is connected to the remote
// courier service.
//
// This method does nothing if a service connection is already established.
func (c *UniverseRpcCourier) ensureConnect(ctx context.Context) error {
// If we're already connected, we'll return early.
if c.rawConn != nil && c.client != nil {
// Check the gRPC connection state to determine if the
// connection is ready.
gRpcConnState := c.rawConn.GetState()

connStatus, err := NewCourierConnStatusFromRpcStatus(
gRpcConnState,
)
if err != nil {
return fmt.Errorf("universe RPC courier unable to "+
"determine connection status: %w", err)
}

if connStatus.IsPending() {
return nil
}
}

// At this point, we know that the connection is not ready. We'll now
// attempt to establish a new connection to the courier service.
dialOpts, err := serverDialOpts()
if err != nil {
return err
}

// Ensure that the addr field has been set correctly.
if c.addr == nil {
return fmt.Errorf("universe RPC courier address is not set")
}

serverAddr := fmt.Sprintf("%s:%s", c.addr.Hostname(), c.addr.Port())
conn, err := grpc.DialContext(ctx, serverAddr, dialOpts...)
if err != nil {
return err
}

c.client = unirpc.NewUniverseClient(conn)
c.rawConn = conn

return nil
}

// DeliverProof attempts to delivery a proof file to the receiver.
func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,
recipient Recipient, annotatedProof *AnnotatedProof) error {
Expand Down Expand Up @@ -1272,6 +1330,15 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context,

// Setup delivery routine and start backoff procedure.
deliverFunc := func() error {
// Connect to the courier service if a connection hasn't
// been established yet.
err := c.ensureConnect(ctx)
if err != nil {
return fmt.Errorf("unable to connect to "+
"courier service during delivery "+
"attempt: %w", err)
}

// Submit proof to courier.
_, err = c.client.InsertProof(ctx, &unirpc.AssetProof{
Key: &universeKey,
Expand Down Expand Up @@ -1327,6 +1394,15 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
// procedure.
var proofBlob []byte
receiveFunc := func() error {
// Connect to the courier service if a connection hasn't
// been established yet.
err := c.ensureConnect(ctx)
if err != nil {
return fmt.Errorf("unable to connect to "+
"courier service during delivery "+
"attempt: %w", err)
}

// Retrieve proof from courier.
resp, err := c.client.QueryProof(ctx, &universeKey)
if err != nil {
Expand All @@ -1352,7 +1428,7 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context,
}

proofFile, err := FetchProofProvenance(
ctx, c.cfg.LocalArchive, originLocator, fetchProof,
ctx, c.localArchive, originLocator, fetchProof,
)
if err != nil {
return nil, fmt.Errorf("error fetching proof provenance: %w",
Expand Down
7 changes: 3 additions & 4 deletions proof/courier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,9 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) {

recipient := Recipient{}
courier := &UniverseRpcCourier{
client: nil,
cfg: &CourierCfg{
LocalArchive: localArchive,
},
client: nil,
cfg: &UniverseRpcCourierCfg{},
localArchive: localArchive,
rawConn: nil,
backoffHandle: nil,
subscribers: nil,
Expand Down

0 comments on commit acd6bd3

Please sign in to comment.