diff --git a/proof/courier.go b/proof/courier.go index 6f0f6a749..ff81628c2 100644 --- a/proof/courier.go +++ b/proof/courier.go @@ -185,9 +185,6 @@ func NewCourierDispatch(cfg *CourierCfg) *URLDispatch { // address. func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL, lazyConnect bool) (Courier, error) { - - subscribers := make(map[uint64]*fn.EventReceiver[fn.Event]) - // Create new courier addr based on URL scheme. switch addr.Scheme { case HashmailCourierType: @@ -197,33 +194,11 @@ func (u *URLDispatch) NewCourier(ctx context.Context, addr *url.URL, ) case UniverseRpcCourierType: - cfg := u.cfg.UniverseRpcCfg - backoffHandler := NewBackoffHandler( - cfg.BackoffCfg, u.cfg.TransferLog, + return NewUniverseRpcCourier( + ctx, u.cfg.UniverseRpcCfg, u.cfg.TransferLog, + u.cfg.LocalArchive, addr, lazyConnect, ) - // Connect to the universe RPC server. - dialOpts, err := serverDialOpts() - if err != nil { - return nil, err - } - - serverAddr := fmt.Sprintf("%s:%s", addr.Hostname(), addr.Port()) - conn, err := grpc.Dial(serverAddr, dialOpts...) - if err != nil { - return nil, err - } - - client := unirpc.NewUniverseClient(conn) - - return &UniverseRpcCourier{ - client: client, - backoffHandle: backoffHandler, - cfg: u.cfg, - subscribers: subscribers, - rawConn: conn, - }, nil - default: return nil, fmt.Errorf("unknown courier address protocol "+ "(consider updating tapd): %v", addr.Scheme) @@ -1169,17 +1144,24 @@ type UniverseRpcCourierCfg struct { // UniverseRpcCourier is a universe RPC proof courier service handle. It // implements the Courier interface. type UniverseRpcCourier struct { - // client is the RPC client that the courier will use to interact with - // the universe RPC server. - client unirpc.UniverseClient + // cfg is the courier configuration. + cfg *UniverseRpcCourierCfg - // cfg is the general courier configuration. - cfg *CourierCfg + // addr is the address of the courier service. + addr *url.URL + + // localArchive is the local archive that the courier will use to + // store and query for proofs. + localArchive Archiver // rawConn is the raw connection that the courier will use to interact // with the remote gRPC service. rawConn *grpc.ClientConn + // client is the RPC client that the courier will use to interact with + // the universe RPC server. + client unirpc.UniverseClient + // backoffHandle is a handle to the backoff procedure used in proof // delivery. backoffHandle *BackoffHandler @@ -1193,6 +1175,82 @@ type UniverseRpcCourier struct { subscriberMtx sync.Mutex } +// NewUniverseRpcCourier creates a new universe RPC proof courier service +// handle. +func NewUniverseRpcCourier(ctx context.Context, cfg *UniverseRpcCourierCfg, + transferLog TransferLog, localArchive Archiver, addr *url.URL, + lazyConnect bool) (*UniverseRpcCourier, error) { + + courier := UniverseRpcCourier{ + cfg: cfg, + addr: addr, + localArchive: localArchive, + backoffHandle: NewBackoffHandler(cfg.BackoffCfg, transferLog), + subscribers: make(map[uint64]*fn.EventReceiver[fn.Event]), + } + + // If we're not lazy connecting, then we'll attempt to connect to the + // courier service immediately. + if !lazyConnect { + err := courier.ensureConnect(ctx) + if err != nil { + return nil, fmt.Errorf("unable to connect to courier "+ + "service during courier handle "+ + "instantiation: %w", err) + } + } + + return &courier, nil +} + +// ensureConnect ensures that the courier handle is connected to the remote +// courier service. +// +// This method does nothing if a service connection is already established. +func (c *UniverseRpcCourier) ensureConnect(ctx context.Context) error { + // If we're already connected, we'll return early. + if c.rawConn != nil && c.client != nil { + // Check the gRPC connection state to determine if the + // connection is ready. + gRpcConnState := c.rawConn.GetState() + + connStatus, err := NewCourierConnStatusFromRpcStatus( + gRpcConnState, + ) + if err != nil { + return fmt.Errorf("universe RPC courier unable to "+ + "determine connection status: %w", err) + } + + if connStatus.IsPending() { + return nil + } + } + + // At this point, we know that the connection is not ready. We'll now + // attempt to establish a new connection to the courier service. + dialOpts, err := serverDialOpts() + if err != nil { + return err + } + + // Ensure that the addr field has been set correctly. + if c.addr == nil { + return fmt.Errorf("universe RPC courier address is not set") + } + + serverAddr := fmt.Sprintf("%s:%s", c.addr.Hostname(), c.addr.Port()) + conn, err := grpc.DialContext(ctx, serverAddr, dialOpts...) + if err != nil { + return err + } + + c.client = unirpc.NewUniverseClient(conn) + c.rawConn = conn + + return nil +} + // DeliverProof attempts to delivery a proof file to the receiver. func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, recipient Recipient, annotatedProof *AnnotatedProof) error { @@ -1272,6 +1330,15 @@ func (c *UniverseRpcCourier) DeliverProof(ctx context.Context, // Setup delivery routine and start backoff procedure. deliverFunc := func() error { + // Connect to the courier service if a connection hasn't + // been established yet. + err := c.ensureConnect(ctx) + if err != nil { + return fmt.Errorf("unable to connect to "+ + "courier service during delivery "+ + "attempt: %w", err) + } + // Submit proof to courier. _, err = c.client.InsertProof(ctx, &unirpc.AssetProof{ Key: &universeKey, @@ -1327,6 +1394,15 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, // procedure. var proofBlob []byte receiveFunc := func() error { + // Connect to the courier service if a connection hasn't + // been established yet. + err := c.ensureConnect(ctx) + if err != nil { + return fmt.Errorf("unable to connect to "+ + "courier service during delivery "+ + "attempt: %w", err) + } + // Retrieve proof from courier. resp, err := c.client.QueryProof(ctx, &universeKey) if err != nil { @@ -1352,7 +1428,7 @@ func (c *UniverseRpcCourier) ReceiveProof(ctx context.Context, } proofFile, err := FetchProofProvenance( - ctx, c.cfg.LocalArchive, originLocator, fetchProof, + ctx, c.localArchive, originLocator, fetchProof, ) if err != nil { return nil, fmt.Errorf("error fetching proof provenance: %w", diff --git a/proof/courier_test.go b/proof/courier_test.go index 0c684721a..6108509c2 100644 --- a/proof/courier_test.go +++ b/proof/courier_test.go @@ -43,10 +43,9 @@ func TestUniverseRpcCourierLocalArchiveShortCut(t *testing.T) { recipient := Recipient{} courier := &UniverseRpcCourier{ - client: nil, - cfg: &CourierCfg{ - LocalArchive: localArchive, - }, + client: nil, + cfg: &UniverseRpcCourierCfg{}, + localArchive: localArchive, rawConn: nil, backoffHandle: nil, subscribers: nil,