From acd6bd3acbe0a9378068d0c7e9743a892350c7ce Mon Sep 17 00:00:00 2001 From: ffranr Date: Tue, 20 Aug 2024 14:06:21 +0100 Subject: [PATCH] multi: make UniverseRPC proof courier connection attempts lazy This commit modifies the Universe RPC proof courier handler to allow connection attempts to be optionally lazy. Connection attempts are now integrated into the backoff procedure where feasible, improving the robustness of the connection handling process. --- proof/courier.go | 144 ++++++++++++++++++++++++++++++++---------- proof/courier_test.go | 7 +- 2 files changed, 113 insertions(+), 38 deletions(-) diff --git a/proof/courier.go b/proof/courier.go index 6f0f6a749..ff81628c2 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -185,9 +185,6 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch { // address. func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL, lazyConnect bool) (Courier, error) { - - subscribers := make(map[uint64]*fn.EventReceiver[fn.Event]) - // Create new courier addr based on URL scheme. switch addr.Scheme { case HashmailCourierType: @@ -197,33 +194,11 @@ func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL, ) case UniverseRpcCourierType: - cfg := u.cfg.UniverseRpcCfg - backoffHandler := NewBackoffHandler( - cfg.BackoffCfg, u.cfg.TransferLog, + return NewUniverseRpcCourier( + ctx, u.cfg.UniverseRpcCfg, u.cfg.TransferLog, + u.cfg.LocalArchive, addr, lazyConnect, ) - // Connect to the universe RPC server. - dialOpts, err := serverDialOpts() - if err != nil { - return nil, err - } - - serverAddr := fmt.Sprintf("%s:%s", addr.Hostname(), addr.Port()) - conn, err := grpc.Dial(serverAddr, dialOpts...) - if err != nil { - return nil, err - } - - client := unirpc.NewUniverseClient(conn) - - return &UniverseRpcCourier{ - client: client, - backoffHandle: backoffHandler, - cfg: u.cfg, - subscribers: subscribers, - rawConn: conn, - }, nil - default: return nil, fmt.Errorf("unknown courier address protocol "+ "(consider updating tapd): %v", addr.Scheme) @@ -1169,17 +1144,24 @@ type UniverseRpcCourierCfg struct { // UniverseRpcCourier is a universe RPC proof courier service handle. It // implements the Courier interface. type UniverseRpcCourier struct { - // client is the RPC client that the courier will use to interact with - // the universe RPC server. - client unirpc.UniverseClient + // cfg is the courier configuration. + cfg *UniverseRpcCourierCfg - // cfg is the general courier configuration. - cfg *CourierCfg + // addr is the address of the courier service. + addr *url.URL + + // localArchive is the local archive that the courier will use to + // store and query for proofs. + localArchive Archiver // rawConn is the raw connection that the courier will use to interact // with the remote gRPC service. rawConn *grpc.ClientConn + // client is the RPC client that the courier will use to interact with + // the universe RPC server. + client unirpc.UniverseClient + // backoffHandle is a handle to the backoff procedure used in proof // delivery. backoffHandle *BackoffHandler @@ -1193,6 +1175,82 @@ type UniverseRpcCourier struct { subscriberMtx sync.Mutex } +// NewUniverseRpcCourier creates a new universe RPC proof courier service +// handle. +func NewUniverseRpcCourier(ctx context.Context, cfg *UniverseRpcCourierCfg, + transferLog TransferLog, localArchive Archiver, addr *url.URL, + lazyConnect bool) (*UniverseRpcCourier, error) { + + courier := UniverseRpcCourier{ + cfg: cfg, + addr: addr, + localArchive: localArchive, + backoffHandle: NewBackoffHandler(cfg.BackoffCfg, transferLog), + subscribers: make(map[uint64]*fn.EventReceiver[fn.Event]), + } + + // If we're not lazy connecting, then we'll attempt to connect to the + // courier service immediately. + if !lazyConnect { + err := courier.ensureConnect(ctx) + if err != nil { + return nil, fmt.Errorf("unable to connect to courier "+ + "service during courier handle "+ + "instantiation: %w", err) + } + } + + return &courier, nil +} + +// ensureConnect ensures that the courier handle is connected to the remote +// courier service. +// +// This method does nothing if a service connection is already established. +func (c *UniverseRpcCourier) ensureConnect(ctx context.Context) error { + // If we're already connected, we'll return early. + if c.rawConn != nil && c.client != nil { + // Check the gRPC connection state to determine if the + // connection is ready. + gRpcConnState := c.rawConn.GetState() + + connStatus, err := NewCourierConnStatusFromRpcStatus( + gRpcConnState, + ) + if err != nil { + return fmt.Errorf("universe RPC courier unable to "+ + "determine connection status: %w", err) + } + + if connStatus.IsPending() { + return nil + } + } + + // At this point, we know that the connection is not ready. We'll now + // attempt to establish a new connection to the courier service. + dialOpts, err := serverDialOpts() + if err != nil { + return err + } + + // Ensure that the addr field has been set correctly. + if c.addr == nil { + return fmt.Errorf("universe RPC courier address is not set") + } + + serverAddr := fmt.Sprintf("%s:%s", c.addr.Hostname(), c.addr.Port()) + conn, err := grpc.DialContext(ctx, serverAddr, dialOpts...) + if err != nil { + return err + } + + c.client = unirpc.NewUniverseClient(conn) + c.rawConn = conn + + return nil +} + // DeliverProof attempts to delivery a proof file to the receiver. func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, recipient Recipient, annotatedProof *AnnotatedProof) error { @@ -1272,6 +1330,15 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, // Setup delivery routine and start backoff procedure. deliverFunc := func() error { + // Connect to the courier service if a connection hasn't + // been established yet. + err := c.ensureConnect(ctx) + if err != nil { + return fmt.Errorf("unable to connect to "+ + "courier service during delivery "+ + "attempt: %w", err) + } + // Submit proof to courier. _, err = c.client.InsertProof(ctx, &unirpc.AssetProof{ Key: &universeKey, @@ -1327,6 +1394,15 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, // procedure. var proofBlob []byte receiveFunc := func() error { + // Connect to the courier service if a connection hasn't + // been established yet. + err := c.ensureConnect(ctx) + if err != nil { + return fmt.Errorf("unable to connect to "+ + "courier service during delivery "+ + "attempt: %w", err) + } + // Retrieve proof from courier. resp, err := c.client.QueryProof(ctx, &universeKey) if err != nil { @@ -1352,7 +1428,7 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, } proofFile, err := FetchProofProvenance( - ctx, c.cfg.LocalArchive, originLocator, fetchProof, + ctx, c.localArchive, originLocator, fetchProof, ) if err != nil { return nil, fmt.Errorf("error fetching proof provenance: %w", diff --git a/proof/courier_test.go b/proof/courier_test.go index 0c684721a..6108509c2 100644 --- a/proof/courier_test.go +++ b/proof/courier_test.go @@ -43,10 +43,9 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) { recipient := Recipient{} courier := &UniverseRpcCourier{ - client: nil, - cfg: &CourierCfg{ - LocalArchive: localArchive, - }, + client: nil, + cfg: &UniverseRpcCourierCfg{}, + localArchive: localArchive, rawConn: nil, backoffHandle: nil, subscribers: nil,