From b768be547b23bfff8ea93962ae64b19254029b6b Mon Sep 17 00:00:00 2001 From: ClaytonNorthey92 Date: Thu, 17 Oct 2024 15:51:13 -0400 Subject: [PATCH] allowing BFGs to connect to another BFG to receive keystone notifications allow a BFG to connect to another BFG. the client BFG will receive L2 Keystone notifications via the public websocket endpoint, it will then query for new keystones when it gets a notification. you must use BFG_BFG_URL + BFG_BTC_PRIVKEY to connect to another BFG. BFG will request the latest 3 keystones upon each notification then save them in the DB and broadcast the keystones to its connected clients add ON CONFLICT DO NOTHING to no-op with keystones we've already received updated readme --- README.md | 11 ++ cmd/bfgd/bfgd.go | 12 ++ database/bfgd/database_ext_test.go | 87 ++++++++- database/bfgd/postgres/postgres.go | 11 +- e2e/e2e_ext_test.go | 248 +++++++++++++++++++++++++- e2e/monitor/go.mod | 2 +- service/bfg/bfg.go | 274 +++++++++++++++++++++++++++-- service/bfg/bfg_test.go | 17 ++ 8 files changed, 626 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index fb213131..499a3669 100644 --- a/README.md +++ b/README.md @@ -140,6 +140,17 @@ go run ./integrationtest - A **PostgreSQL database**, bfgd expects the sql scripts in `./database/bfgd/scripts/` to be run to set up your schema. - A **connection to an Electrs node** on the proper Bitcoin network (testnet or mainnet). +### Running your own bfgd and PoP mining with it + +If you'd like to run your own `bfgd` and don't want to rely on Hemi Labs (or any third party) for _broadcasting transactions_, you may run `bfgd` and connect it to a _trusted_ `bfgd` run by a third party to _receive l2 keystones only_ (l2 keystones represent l2 state and are what are mined in PoP transactions). In this case, the third party `bfgd` will only send you l2 keystones, your `bfgd` can notify your local pop miner and this will broadcast them to your Electrs+bitcoind setup so you don't rely on Hemi Labs--or any third party--which may be congested. + +If you'd like to do so, use the following environment variables when running bfgd: + +* `BFG_BFG_URL`: the _trusted_ `bfgd`'s websocket url that you will connect to +* `BFG_BTC_PRIVKEY`: your btc private key. note that this can be an unfunded private key and you'll still receive l2 keystones to mine + +then you may connect your local `popmd` to your aforementioned local `bfgd` + ## ▶️ Running bssd ### 🏁 Prerequisites diff --git a/cmd/bfgd/bfgd.go b/cmd/bfgd/bfgd.go index cadeac73..afeb1ef7 100644 --- a/cmd/bfgd/bfgd.go +++ b/cmd/bfgd/bfgd.go @@ -123,6 +123,18 @@ var ( Help: "list of headers used to obtain the client IP address (requires trusted proxies)", Print: config.PrintAll, }, + "BFG_BFG_URL": config.Config{ + Value: &cfg.BFGURL, + DefaultValue: "", + Help: "public websocket address of another BFG you'd like to receive L2Keystones from", + Print: config.PrintAll, + }, + "BFG_BTC_PRIVKEY": config.Config{ + Value: &cfg.BTCPrivateKey, + DefaultValue: "", + Help: "your btc private key, this is only needed when connecting to another BFG", + Print: config.PrintSecret, + }, } ) diff --git a/database/bfgd/database_ext_test.go b/database/bfgd/database_ext_test.go index 51fc5865..6450c228 100644 --- a/database/bfgd/database_ext_test.go +++ b/database/bfgd/database_ext_test.go @@ -11,6 +11,7 @@ import ( "encoding/hex" "errors" "fmt" + "log" mathrand "math/rand/v2" "net/url" "os" @@ -475,6 +476,79 @@ func TestL2KeystoneInsertMultipleSuccess(t *testing.T) { } } +func TestL2KeystoneInsertIgnoreDuplicates(t *testing.T) { + ctx, cancel := defaultTestContext() + defer cancel() + + db, sdb, cleanup := createTestDB(ctx, t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + l2Keystone := bfgd.L2Keystone{ + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 22, + ParentEPHash: fillOutBytes("parentephash", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephash", 32), + StateRoot: fillOutBytes("stateroot", 32), + EPHash: fillOutBytes("ephash", 32), + Hash: fillOutBytes("mockhash", 32), + } + + otherL2Keystone := bfgd.L2Keystone{ + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 22, + ParentEPHash: fillOutBytes("parentephash", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephash", 32), + StateRoot: fillOutBytes("stateroot", 32), + EPHash: fillOutBytes("ephash", 32), + Hash: fillOutBytes("mockhashz", 32), + } + + err := db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{l2Keystone, otherL2Keystone}) + if err != nil { + t.Fatal(err) + } + + err = db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{l2Keystone, otherL2Keystone}) + if err != nil { + t.Fatal(err) + } + + saved, err := db.L2KeystoneByAbrevHash(ctx, [32]byte(l2Keystone.Hash)) + if err != nil { + t.Fatal(err) + } + + diff := deep.Equal(saved, &l2Keystone) + if len(diff) != 0 { + t.Fatalf("unexpected diff %s", diff) + } + + otherSaved, err := db.L2KeystoneByAbrevHash(ctx, [32]byte(otherL2Keystone.Hash)) + if err != nil { + t.Fatal(err) + } + + diff = deep.Equal(otherSaved, &otherL2Keystone) + if len(diff) != 0 { + t.Fatalf("unexpected diff %s", diff) + } + + count, err := l2KeystonesCount(ctx, sdb) + if err != nil { + t.Fatal(err) + } + + if count != 2 { + t.Fatalf("unexpected count %d", count) + } +} + func TestL2KeystoneInsertInvalidHashLength(t *testing.T) { ctx, cancel := defaultTestContext() defer cancel() @@ -915,7 +989,7 @@ func TestL2KeystoneInsertMultipleAtomicFailure(t *testing.T) { } } -func TestL2KeystoneInsertMultipleDuplicateError(t *testing.T) { +func TestL2KeystoneInsertDuplicateOK(t *testing.T) { ctx, cancel := defaultTestContext() defer cancel() @@ -949,9 +1023,18 @@ func TestL2KeystoneInsertMultipleDuplicateError(t *testing.T) { } err := db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{l2Keystone, otherL2Keystone}) - if err == nil || errors.Is(err, database.DuplicateError("")) == false { + if err != nil { t.Fatalf("received unexpected error: %s", err) } + + l2Keystones, err := db.L2KeystonesMostRecentN(ctx, 5) + if err != nil { + t.Fatal(err) + } + + if diff := deep.Equal([]bfgd.L2Keystone{l2Keystone}, l2Keystones); len(diff) != 0 { + log.Fatalf("unexpected diff %v", diff) + } } func TestPopBasisInsertNilMerklePath(t *testing.T) { diff --git a/database/bfgd/postgres/postgres.go b/database/bfgd/postgres/postgres.go index 0ff86ac9..1ccdee89 100644 --- a/database/bfgd/postgres/postgres.go +++ b/database/bfgd/postgres/postgres.go @@ -133,10 +133,12 @@ func (p *pgdb) L2KeystonesInsert(ctx context.Context, l2ks []bfgd.L2Keystone) er ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + + ON CONFLICT DO NOTHING ` for _, v := range l2ks { - result, err := tx.ExecContext(ctx, qInsertL2Keystone, v.Hash, + _, err := tx.ExecContext(ctx, qInsertL2Keystone, v.Hash, v.L1BlockNumber, v.L2BlockNumber, v.ParentEPHash, v.PrevKeystoneEPHash, v.StateRoot, v.EPHash, v.Version) if err != nil { @@ -156,13 +158,6 @@ func (p *pgdb) L2KeystonesInsert(ctx context.Context, l2ks []bfgd.L2Keystone) er } return fmt.Errorf("insert l2 keystone: %w", err) } - rows, err := result.RowsAffected() - if err != nil { - return fmt.Errorf("insert l2 keystone rows affected: %w", err) - } - if rows < 1 { - return fmt.Errorf("insert l2 keystone rows: %v", rows) - } } err = tx.Commit() diff --git a/e2e/e2e_ext_test.go b/e2e/e2e_ext_test.go index 26ca63ee..e1abc3c4 100644 --- a/e2e/e2e_ext_test.go +++ b/e2e/e2e_ext_test.go @@ -38,6 +38,7 @@ import ( btcwire "github.com/btcsuite/btcd/wire" "github.com/coder/websocket" "github.com/coder/websocket/wsjson" + "github.com/decred/dcrd/dcrec/secp256k1/v4" dcrsecp256k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" dcrecdsa "github.com/decred/dcrd/dcrec/secp256k1/v4/ecdsa" "github.com/ethereum/go-ethereum/common" @@ -283,11 +284,11 @@ func nextPort(ctx context.Context, t *testing.T) int { } } -func createBfgServerWithAuth(ctx context.Context, t *testing.T, pgUri string, electrsAddr string, btcStartHeight uint64, auth bool) (*bfg.Server, string, string, string) { +func createBfgServerWithAuth(ctx context.Context, t *testing.T, pgUri string, electrsAddr string, btcStartHeight uint64, auth bool, otherBfgUrl string) (*bfg.Server, string, string, string) { bfgPrivateListenAddress := fmt.Sprintf(":%d", nextPort(ctx, t)) bfgPublicListenAddress := fmt.Sprintf(":%d", nextPort(ctx, t)) - bfgServer, err := bfg.NewServer(&bfg.Config{ + cfg := &bfg.Config{ PrivateListenAddress: bfgPrivateListenAddress, PublicListenAddress: bfgPublicListenAddress, PgURI: pgUri, @@ -296,7 +297,19 @@ func createBfgServerWithAuth(ctx context.Context, t *testing.T, pgUri string, el PublicKeyAuth: auth, RequestLimit: bfgapi.DefaultRequestLimit, RequestTimeout: bfgapi.DefaultRequestTimeout, - }) + BFGURL: otherBfgUrl, + } + + if cfg.BFGURL != "" { + privKey, err := secp256k1.GeneratePrivateKey() + if err != nil { + t.Fatal(err) + } + + cfg.BTCPrivateKey = hex.EncodeToString(privKey.Serialize()) + } + + bfgServer, err := bfg.NewServer(cfg) if err != nil { t.Fatal(err) } @@ -323,7 +336,11 @@ func createBfgServerWithAuth(ctx context.Context, t *testing.T, pgUri string, el } func createBfgServer(ctx context.Context, t *testing.T, pgUri string, electrsAddr string, btcStartHeight uint64) (*bfg.Server, string, string, string) { - return createBfgServerWithAuth(ctx, t, pgUri, electrsAddr, btcStartHeight, false) + return createBfgServerWithAuth(ctx, t, pgUri, electrsAddr, btcStartHeight, false, "") +} + +func createBfgServerConnectedToAnother(ctx context.Context, t *testing.T, pgUri string, electrsAddr string, btcStartHeight uint64, otherBfgUrl string) (*bfg.Server, string, string, string) { + return createBfgServerWithAuth(ctx, t, pgUri, electrsAddr, btcStartHeight, false, otherBfgUrl) } func createBssServer(ctx context.Context, t *testing.T, bfgWsurl string) (*bss.Server, string, string) { @@ -3133,6 +3150,221 @@ func TestNotifyOnL2KeystonesBFGClients(t *testing.T) { wg.Wait() } +func TestNotifyOnL2KeystonesBFGClientsViaOtherBFG(t *testing.T) { + db, pgUri, sdb, cleanup := createTestDB(context.Background(), t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + otherDb, otherPgUri, otherSdb, otherCleanup := createTestDB(context.Background(), t) + defer func() { + otherDb.Close() + otherSdb.Close() + otherCleanup() + }() + + ctx, cancel := defaultTestContext() + defer cancel() + + _, _, _, bfgPublicWsUrl := createBfgServer(ctx, t, pgUri, "", 1) + _, _, _, otherBfgPublicWsUrl := createBfgServerWithAuth(ctx, t, otherPgUri, "", 1, false, bfgPublicWsUrl) + + c, _, err := websocket.Dial(ctx, otherBfgPublicWsUrl, nil) + if err != nil { + t.Fatal(err) + } + defer c.CloseNow() + + protocolConn := protocol.NewWSConn(c) + + if err := authClient.HandshakeClient(ctx, protocolConn); err != nil { + t.Fatal(err) + } + assertPing(ctx, t, c, bfgapi.CmdPingRequest) + + l2Keystone := bfgd.L2Keystone{ + Hash: fillOutBytes("somehashone", 32), + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 22, + ParentEPHash: fillOutBytes("parentephashone", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephashone", 32), + StateRoot: fillOutBytes("staterootone", 32), + EPHash: fillOutBytes("ephashone", 32), + } + + // insert the l2 keystone into the first bfg server's postgres, + // this should send a notification to the "other" bfg which should + // broadcast the notification + + if err := db.L2KeystonesInsert(ctx, []bfgd.L2Keystone{ + l2Keystone, + }); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + var v protocol.Message + if err = wsjson.Read(ctx, c, &v); err != nil { + panic(fmt.Sprintf("error reading from ws: %s", err)) + } + + if v.Header.Command == bfgapi.CmdL2KeystonesNotification { + return + } + } + }() + + wg.Wait() +} + +func TestOtherBFGSavesL2KeystonesOnNotifications(t *testing.T) { + db, pgUri, sdb, cleanup := createTestDB(context.Background(), t) + defer func() { + db.Close() + sdb.Close() + cleanup() + }() + + otherDb, otherPgUri, otherSdb, otherCleanup := createTestDB(context.Background(), t) + defer func() { + otherDb.Close() + otherSdb.Close() + otherCleanup() + }() + + ctx, cancel := defaultTestContext() + defer cancel() + + _, _, _, bfgPublicWsUrl := createBfgServer(ctx, t, pgUri, "", 1) + _, _, _, otherBfgPublicWsUrl := createBfgServerWithAuth(ctx, t, otherPgUri, "", 1, false, bfgPublicWsUrl) + + c, _, err := websocket.Dial(ctx, otherBfgPublicWsUrl, nil) + if err != nil { + t.Fatal(err) + } + defer c.CloseNow() + + protocolConn := protocol.NewWSConn(c) + + if err := authClient.HandshakeClient(ctx, protocolConn); err != nil { + t.Fatal(err) + } + assertPing(ctx, t, c, bfgapi.CmdPingRequest) + + l2Keystones := []bfgd.L2Keystone{ + { + Hash: fillOutBytes("somehash22", 32), + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 22, + ParentEPHash: fillOutBytes("parentephashone", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephashone", 32), + StateRoot: fillOutBytes("staterootone", 32), + EPHash: fillOutBytes("ephashone", 32), + }, + { + Hash: fillOutBytes("somehash23", 32), + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 23, + ParentEPHash: fillOutBytes("parentephashone", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephashone", 32), + StateRoot: fillOutBytes("staterootone", 32), + EPHash: fillOutBytes("ephashone", 32), + }, + { + Hash: fillOutBytes("somehash24", 32), + Version: 1, + L1BlockNumber: 11, + L2BlockNumber: 24, + ParentEPHash: fillOutBytes("parentephashone", 32), + PrevKeystoneEPHash: fillOutBytes("prevkeystoneephashone", 32), + StateRoot: fillOutBytes("staterootone", 32), + EPHash: fillOutBytes("ephashone", 32), + }, + } + + if err := db.L2KeystonesInsert(ctx, l2Keystones); err != nil { + t.Fatal(err) + } + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + var v protocol.Message + if err = wsjson.Read(ctx, c, &v); err != nil { + panic(fmt.Sprintf("error reading from ws: %s", err)) + } + + if v.Header.Command == bfgapi.CmdL2KeystonesNotification { + return + } + } + }() + + wg.Wait() + + wg.Add(1) + go func() { + defer wg.Done() + for { + l2KeystonesRequest := bfgapi.L2KeystonesRequest{ + NumL2Keystones: 3, + } + + if err := bfgapi.Write(ctx, protocolConn, "someid", &l2KeystonesRequest); err != nil { + panic(err) + } + + for { + command, _, payload, err := bfgapi.Read(ctx, protocolConn) + if err != nil { + panic(err) + } + + if command != bfgapi.CmdL2KeystonesResponse { + continue + } + + l2KeystonesResponse := payload.(*bfgapi.L2KeystonesResponse) + + hemiL2Keystones := make([]hemi.L2Keystone, 0, len(l2Keystones)) + for _, v := range l2Keystones { + hemiL2Keystones = append(hemiL2Keystones, hemi.L2Keystone{ + Version: uint8(v.Version), + L1BlockNumber: v.L1BlockNumber, + L2BlockNumber: v.L2BlockNumber, + ParentEPHash: api.ByteSlice(v.ParentEPHash), + PrevKeystoneEPHash: api.ByteSlice(v.PrevKeystoneEPHash), + StateRoot: api.ByteSlice(v.StateRoot), + EPHash: api.ByteSlice(v.EPHash), + }) + } + + slices.Reverse(hemiL2Keystones) + + if diff := deep.Equal(l2KeystonesResponse.L2Keystones, hemiL2Keystones); len(diff) != 0 { + panic(fmt.Sprintf("l2keystones are not equal: %v", diff)) + } + + return + } + + } + }() + + wg.Wait() +} + // TestNotifyOnNewBtcBlockBSSClients tests that upon getting a new btc block, // in this case from (mock) electrs, that a new btc notification // will be sent to all clients connected to BSS @@ -3433,7 +3665,7 @@ func TestBFGAuthNoKey(t *testing.T) { t.Fatal(err) } - _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true) + _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true, "") c, _, err := websocket.Dial(ctx, bfgPublicWsUrl, nil) if err != nil { @@ -3465,7 +3697,7 @@ func TestBFGAuthPing(t *testing.T) { t.Fatal(err) } - _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true) + _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true, "") c, _, err := websocket.Dial(ctx, bfgPublicWsUrl, nil) if err != nil { @@ -3519,7 +3751,7 @@ func TestBFGAuthPingThenRemoval(t *testing.T) { t.Fatal(err) } - _, _, bfgWsPrivateUrl, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true) + _, _, bfgWsPrivateUrl, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true, "") c, _, err := websocket.Dial(ctx, bfgPublicWsUrl, nil) if err != nil { @@ -3576,7 +3808,7 @@ func TestBFGAuthWrongKey(t *testing.T) { t.Fatal(err) } - _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true) + _, _, _, bfgPublicWsUrl := createBfgServerWithAuth(ctx, t, pgUri, "", 1, true, "") c, _, err := websocket.Dial(ctx, bfgPublicWsUrl, nil) if err != nil { diff --git a/e2e/monitor/go.mod b/e2e/monitor/go.mod index eda59c89..3a151a52 100644 --- a/e2e/monitor/go.mod +++ b/e2e/monitor/go.mod @@ -1,6 +1,6 @@ module github.com/hemilabs/heminetwork/e2e/monitor -go 1.22 +go 1.23 toolchain go1.23.0 diff --git a/service/bfg/bfg.go b/service/bfg/bfg.go index 0977ce12..dde7d348 100644 --- a/service/bfg/bfg.go +++ b/service/bfg/bfg.go @@ -22,6 +22,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/coder/websocket" "github.com/davecgh/go-spew/spew" + secp256k1 "github.com/decred/dcrd/dcrec/secp256k1/v4" "github.com/juju/loggo" "github.com/prometheus/client_golang/prometheus" @@ -38,6 +39,7 @@ import ( "github.com/hemilabs/heminetwork/hemi/pop" "github.com/hemilabs/heminetwork/service/deucalion" "github.com/hemilabs/heminetwork/service/pprof" + "github.com/hemilabs/heminetwork/version" ) // XXX this code needs to be a bit smarter when syncing bitcoin. We should @@ -57,6 +59,8 @@ const ( var log = loggo.GetLogger("bfg") +var ErrBTCPrivateKeyMissing error = errors.New("you must specify a BTC private key") + func init() { loggo.ConfigureLoggers(logLevel) } @@ -70,6 +74,7 @@ func NewDefaultConfig() *Config { PublicListenAddress: ":8383", RequestLimit: bfgapi.DefaultRequestLimit, RequestTimeout: bfgapi.DefaultRequestTimeout, + BFGURL: "", } } @@ -87,6 +92,12 @@ type btcClient interface { Close() error } +// Wrap for calling bfg commands +type bfgCmd struct { + msg any + ch chan any +} + type Config struct { BTCStartHeight uint64 EXBTCAddress string @@ -103,6 +114,8 @@ type Config struct { RequestTimeout int // in seconds RemoteIPHeaders []string TrustedProxies []string + BFGURL string + BTCPrivateKey string } type Server struct { @@ -144,6 +157,15 @@ type Server struct { l2keystonesCache []hemi.L2Keystone btcHeightCache uint64 + + bfgWG sync.WaitGroup // wait group for connecting to other bfgs + + holdoffTimeout time.Duration // Time in between connections attempt to BFG + bfgCallTimeout time.Duration + + bfgCmdCh chan bfgCmd // commands to send to bfg + + btcPrivateKey *secp256k1.PrivateKey } // metrics stores prometheus metrics. @@ -213,6 +235,13 @@ func NewServer(cfg *Config) (*Server, error) { return nil, fmt.Errorf("invalid request timeout (minimum %v): %v", minRequestTimeout, cfg.RequestTimeout) } + + if cfg.BTCPrivateKey == "" && cfg.BFGURL != "" { + return nil, errors.Join( + ErrBTCPrivateKeyMissing, + errors.New("btc private key required when connecting to another BFG"), + ) + } s := &Server{ cfg: cfg, requestLimiter: make(chan bool, cfg.RequestLimit), @@ -222,12 +251,24 @@ func NewServer(cfg *Config) (*Server, error) { metrics: newMetrics(), sessions: make(map[string]*bfgWs), checkForInvalidBlocks: make(chan struct{}), + holdoffTimeout: 6 * time.Second, + bfgCallTimeout: 20 * time.Second, + bfgCmdCh: make(chan bfgCmd), } for range cfg.RequestLimit { s.requestLimiter <- true } var err error + + if cfg.BTCPrivateKey != "" { + s.btcPrivateKey, err = bitcoin.PrivKeyFromHexString(cfg.BTCPrivateKey) + if err != nil { + return nil, err + } + + } + s.btcClient, err = electrs.NewClient(cfg.EXBTCAddress, &electrs.ClientOptions{ InitialConnections: cfg.EXBTCInitialConns, MaxConnections: cfg.EXBTCMaxConns, @@ -1402,7 +1443,7 @@ func (s *Server) handleBtcBlockNotification() error { return nil } -func (s *Server) handleL2KeystonesNotification() error { +func (s *Server) handleL2KeystonesNotification() { log.Tracef("handleL2KeystonesNotification") defer log.Tracef("handleL2KeystonesNotification exit") @@ -1414,8 +1455,6 @@ func (s *Server) handleL2KeystonesNotification() error { go writeNotificationResponse(bws, &bfgapi.L2KeystonesNotification{}) } s.mtx.Unlock() - - return nil } func hemiL2KeystoneToDb(l2ks hemi.L2Keystone) bfgd.L2Keystone { @@ -1439,25 +1478,36 @@ func hemiL2KeystonesToDb(l2ks []hemi.L2Keystone) []bfgd.L2Keystone { return dbks } +func (s *Server) refreshCacheAndNotifiyL2Keystones() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + s.refreshL2KeystoneCache(ctx) + go s.handleL2KeystonesNotification() +} + +func (s *Server) saveL2Keystones(ctx context.Context, l2k []hemi.L2Keystone) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + ks := hemiL2KeystonesToDb(l2k) + + err := s.db.L2KeystonesInsert(ctx, ks) + if err != nil { + log.Errorf("error saving keystone %v", err) + return + } + + go s.refreshCacheAndNotifiyL2Keystones() +} + func (s *Server) handleNewL2Keystones(ctx context.Context, nlkr *bfgapi.NewL2KeystonesRequest) (any, error) { log.Tracef("handleNewL2Keystones") defer log.Tracef("handleNewL2Keystones exit") response := bfgapi.NewL2KeystonesResponse{} - go func() { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - ks := hemiL2KeystonesToDb(nlkr.L2Keystones) - err := s.db.L2KeystonesInsert(ctx, ks) - if err != nil { - log.Errorf("error saving keystone %v", err) - return - } - - s.refreshL2KeystoneCache(ctx) - }() + go s.saveL2Keystones(context.Background(), nlkr.L2Keystones) return response, nil } @@ -1551,7 +1601,192 @@ func (s *Server) handleAccessPublicKeys(table string, action string, payload, pa } func (s *Server) handleL2KeystonesChange(table string, action string, payload, payloadOld any) { - go s.handleL2KeystonesNotification() + go s.refreshCacheAndNotifiyL2Keystones() +} + +func (s *Server) fetchRemoteL2Keystones(pctx context.Context) { + ctx, cancel := context.WithTimeout(pctx, 10*time.Second) + defer cancel() + + resp, err := s.callBFG(ctx, &bfgapi.L2KeystonesRequest{ + NumL2Keystones: 3, + }) + if err != nil { + log.Errorf("callBFG error: %v", err) + return + } + + l2ksr := resp.(*bfgapi.L2KeystonesResponse) + s.saveL2Keystones(ctx, l2ksr.L2Keystones) +} + +func (s *Server) handleBFGWebsocketReadUnauth(ctx context.Context, conn *protocol.Conn) { + defer s.bfgWG.Done() + + log.Tracef("handleBFGWebsocketReadUnauth") + defer log.Tracef("handleBFGWebsocketReadUnauth exit") + for { + log.Tracef("handleBFGWebsocketReadUnauth %v", "ReadConn") + cmd, _, _, err := bfgapi.ReadConn(ctx, conn) + if err != nil { + // See if we were terminated + select { + case <-ctx.Done(): + return + case <-time.After(s.holdoffTimeout): + } + continue + } + log.Tracef("handleBFGWebsocketReadUnauth %v", cmd) + + switch cmd { + case bfgapi.CmdL2KeystonesNotification: + go s.fetchRemoteL2Keystones(ctx) + default: + log.Errorf("unknown command: %v", cmd) + return + } + } +} + +func (s *Server) callBFG(parrentCtx context.Context, msg any) (any, error) { + log.Tracef("callBFG %T", msg) + defer log.Tracef("callBFG exit %T", msg) + + bc := bfgCmd{ + msg: msg, + ch: make(chan any), + } + + ctx, cancel := context.WithTimeout(parrentCtx, s.bfgCallTimeout) + defer cancel() + + // attempt to send + select { + case <-ctx.Done(): + return nil, protocol.NewInternalErrorf("callBFG send context error: %w", + ctx.Err()) + case s.bfgCmdCh <- bc: + default: + return nil, protocol.NewInternalErrorf("bfg command queue full") + } + + // Wait for response + select { + case <-ctx.Done(): + return nil, protocol.NewInternalErrorf("callBFG received context error: %w", + ctx.Err()) + case payload := <-bc.ch: + if err, ok := payload.(error); ok { + return nil, err // XXX is this an error or internal error + } + return payload, nil + } + + // Won't get here +} + +func (s *Server) handleBFGCallCompletion(parrentCtx context.Context, conn *protocol.Conn, bc bfgCmd) { + log.Tracef("handleBFGCallCompletion") + defer log.Tracef("handleBFGCallCompletion exit") + + ctx, cancel := context.WithTimeout(parrentCtx, s.bfgCallTimeout) + defer cancel() + + log.Tracef("handleBFGCallCompletion: %v", spew.Sdump(bc.msg)) + + _, _, payload, err := bfgapi.Call(ctx, conn, bc.msg) + if err != nil { + log.Errorf("handleBFGCallCompletion %T: %v", bc.msg, err) + select { + case bc.ch <- err: + default: + } + } + select { + case bc.ch <- payload: + log.Tracef("handleBFGCallCompletion returned: %v", spew.Sdump(payload)) + default: + } +} + +func (s *Server) handleBFGWebsocketCallUnauth(ctx context.Context, conn *protocol.Conn) { + defer s.bfgWG.Done() + + log.Tracef("handleBFGWebsocketCallUnauth") + defer log.Tracef("handleBFGWebsocketCallUnauth exit") + for { + select { + case <-ctx.Done(): + return + case bc := <-s.bfgCmdCh: + go s.handleBFGCallCompletion(ctx, conn, bc) + } + } +} + +func (s *Server) connectBFG(pctx context.Context) error { + log.Tracef("connectBFG") + defer log.Tracef("connectBFG exit") + + headers := http.Header{} + headers.Add("User-Agent", version.UserAgent()) + + authenticator, err := auth.NewSecp256k1AuthClient(s.btcPrivateKey) + if err != nil { + return err + } + + conn, err := protocol.NewConn(s.cfg.BFGURL, &protocol.ConnOptions{ + Authenticator: authenticator, + Headers: headers, + }) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + err = conn.Connect(ctx) + if err != nil { + return err + } + + s.bfgWG.Add(1) + go s.handleBFGWebsocketCallUnauth(ctx, conn) + + s.bfgWG.Add(1) + go s.handleBFGWebsocketReadUnauth(ctx, conn) + + // Wait for exit + s.bfgWG.Wait() + + return nil +} + +func (s *Server) bfg(ctx context.Context) { + defer s.wg.Done() + + log.Tracef("bfg") + defer log.Tracef("bfg exit") + + for { + if err := s.connectBFG(ctx); err != nil { + // Do nothing + log.Tracef("connectBFG: %v", err) + } else { + log.Infof("Connected to BFG: %s", s.cfg.BFGURL) + } + // See if we were terminated + select { + case <-ctx.Done(): + return + case <-time.After(s.holdoffTimeout): + } + + log.Debugf("Reconnecting to: %v", s.cfg.BFGURL) + } } func (s *Server) Run(pctx context.Context) error { @@ -1785,6 +2020,11 @@ func (s *Server) Run(pctx context.Context) error { log.Errorf("bitcoin client clean shutdown") }() + if s.cfg.BFGURL != "" { + s.wg.Add(1) + go s.bfg(ctx) + } + s.wg.Add(1) go s.trackBitcoin(ctx) go s.invalidBlockChecker(ctx) diff --git a/service/bfg/bfg_test.go b/service/bfg/bfg_test.go index ee3dbd92..cc135176 100644 --- a/service/bfg/bfg_test.go +++ b/service/bfg/bfg_test.go @@ -6,6 +6,7 @@ package bfg import ( "bytes" + "errors" "fmt" "net" "net/http" @@ -703,3 +704,19 @@ func TestSingleCIDR(t *testing.T) { }) } } + +func TestErrorIfNotPrivKeyConnectingToBFG(t *testing.T) { + _, err := NewServer(&Config{ + RequestLimit: 1, + RequestTimeout: 4, + BFGURL: "something", + }) + + if err == nil { + t.Fatalf("expecting error") + } + + if !errors.Is(err, ErrBTCPrivateKeyMissing) { + t.Fatalf("unexpected error: %v", err) + } +}