Skip to content

Commit

Permalink
Merge pull request #1118 from lightninglabs/aux_signer_batching_fixes
Browse files Browse the repository at this point in the history
tapchannel: improve aux signer signal handling
  • Loading branch information
Roasbeef authored Sep 12, 2024
2 parents 566b409 + 4eb1aeb commit 2f0947d
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 63 deletions.
14 changes: 14 additions & 0 deletions internal/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/keychain"
"github.com/lightningnetwork/lnd/lnrpc/signrpc"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/stretchr/testify/require"
"golang.org/x/exp/constraints"
)
Expand Down Expand Up @@ -137,6 +138,19 @@ func RandPubKey(t testing.TB) *btcec.PublicKey {
return SchnorrPubKey(t, RandPrivKey(t))
}

func RandCommitmentKeyRing(t *testing.T) lnwallet.CommitmentKeyRing {
return lnwallet.CommitmentKeyRing{
CommitPoint: RandPubKey(t),
LocalCommitKeyTweak: RandBytes(32),
LocalHtlcKeyTweak: RandBytes(32),
LocalHtlcKey: RandPubKey(t),
RemoteHtlcKey: RandPubKey(t),
ToLocalKey: RandPubKey(t),
ToRemoteKey: RandPubKey(t),
RevocationKey: RandPubKey(t),
}
}

func RandBytes(num int) []byte {
randLock.Lock()
defer randLock.Unlock()
Expand Down
191 changes: 142 additions & 49 deletions tapchannel/auf_leaf_signer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/lightningnetwork/lnd/input"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/tlv"
"github.com/stretchr/testify/require"
)

Expand All @@ -40,10 +39,58 @@ var (
)

testTimeout = time.Second

chanState = &channeldb.OpenChannel{
ChanType: channeldb.AnchorOutputsBit |
channeldb.ScidAliasChanBit | channeldb.SingleFunderBit |
channeldb.SimpleTaprootFeatureBit |
channeldb.TapscriptRootBit,
IsInitiator: true,
}

// sig job batch size when making more that one sig job.
numSigJobs = int32(10)

// Threshold for trying to cancel or quit the aux leaf signer (allow
// the signer to complete a third of the batch).
sigJobCancelThreshold = numSigJobs / 3
)

// TestAuxLeafSigner tests the AuxLeafSigner implementation.
func TestAuxLeafSigner(t *testing.T) {
// RandAuxSigJob generates a basic aux signer job with random key material.
func RandAuxSigJob(t *testing.T, cancelChan chan struct{},
commitBlob lfn.Option[[]byte], outputIdx int32) lnwallet.AuxSigJob {

keyDesc, _ := test.RandKeyDesc(t)
keyRing := test.RandCommitmentKeyRing(t)

return lnwallet.AuxSigJob{
SignDesc: input.SignDescriptor{
KeyDesc: keyDesc,
},
BaseAuxJob: lnwallet.BaseAuxJob{
OutputIndex: outputIdx,
KeyRing: keyRing,
HTLC: lnwallet.PaymentDescriptor{
HtlcIndex: 0,
Amount: lnwire.NewMSatFromSatoshis(
354,
),
EntryType: lnwallet.Add,
},
Incoming: false,
CommitBlob: commitBlob,
HtlcLeaf: input.AuxTapLeaf{},
},
Resp: make(chan lnwallet.AuxSigJobResp, 1),
Cancel: cancelChan,
}
}

// setupAuxLeafSigner sets up an AuxLeafSigner instance and a batch of sig jobs
// to use in unit tests.
func setupAuxLeafSigner(t *testing.T, numJobs int32) (*AuxLeafSigner,
chan struct{}, *wire.MsgTx, []lnwallet.AuxSigJob) {

cfg := &LeafSignerConfig{
ChainParams: testChainParams,
Signer: &mockVirtualSigner{},
Expand All @@ -52,30 +99,8 @@ func TestAuxLeafSigner(t *testing.T) {
signer := NewAuxLeafSigner(cfg)
require.NoError(t, signer.Start())

defer func() {
require.NoError(t, signer.Stop())
}()

chanState := &channeldb.OpenChannel{
ChanType: channeldb.AnchorOutputsBit |
channeldb.ScidAliasChanBit | channeldb.SingleFunderBit |
channeldb.SimpleTaprootFeatureBit |
channeldb.TapscriptRootBit,
IsInitiator: true,
}
randInputProof := randProof(t)
commitTx := &randInputProof.AnchorTx
keyRing := lnwallet.CommitmentKeyRing{
CommitPoint: test.RandPubKey(t),
LocalCommitKeyTweak: test.RandBytes(32),
LocalHtlcKeyTweak: test.RandBytes(32),
LocalHtlcKey: test.RandPubKey(t),
RemoteHtlcKey: test.RandPubKey(t),
ToLocalKey: test.RandPubKey(t),
ToRemoteKey: test.RandPubKey(t),
RevocationKey: test.RandPubKey(t),
}

outgoingHtlcs := make(map[input.HtlcIndex][]*cmsg.AssetOutput)
outgoingHtlcs[0] = []*cmsg.AssetOutput{
cmsg.NewAssetOutput(
Expand All @@ -87,33 +112,28 @@ func TestAuxLeafSigner(t *testing.T) {
com := cmsg.NewCommitment(
nil, nil, outgoingHtlcs, nil, lnwallet.CommitAuxLeaves{},
)
cancelChan := make(chan struct{})

randKeyDesc, _ := test.RandKeyDesc(t)

jobs := []lnwallet.AuxSigJob{
{
SignDesc: input.SignDescriptor{
KeyDesc: randKeyDesc,
},
BaseAuxJob: lnwallet.BaseAuxJob{
OutputIndex: 0,
KeyRing: keyRing,
HTLC: lnwallet.PaymentDescriptor{
HtlcIndex: 0,
Amount: lnwire.NewMSatFromSatoshis(
354,
),
EntryType: lnwallet.Add,
},
Incoming: false,
CommitBlob: lfn.Some[tlv.Blob](com.Bytes()),
HtlcLeaf: input.AuxTapLeaf{},
},
Resp: make(chan lnwallet.AuxSigJobResp),
Cancel: make(chan struct{}),
},
// Constructing multiple jobs will allow us to assert that later jobs
// are cancelled successfully.
jobs := make([]lnwallet.AuxSigJob, 0, numJobs)
for idx := range numJobs {
newJob := RandAuxSigJob(
t, cancelChan, lfn.Some(com.Bytes()), idx,
)
jobs = append(jobs, newJob)
}

return signer, cancelChan, commitTx, jobs
}

// TestAuxLeafSigner tests the AuxLeafSigner implementation.
func TestAuxLeafSigner(t *testing.T) {
signer, _, commitTx, jobs := setupAuxLeafSigner(t, 1)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

Expand All @@ -131,6 +151,79 @@ func TestAuxLeafSigner(t *testing.T) {
}
}

// TestAuxLeafSignerCancel tests that the AuxLeafSigner will handle a cancel
// signal correctly, which involves skipping all remaining sig jobs.
func TestAuxLeafSignerCancel(t *testing.T) {
// Constructing multiple jobs will allow us to assert that later jobs
// are cancelled successfully.
signer, cancelChan, commitTx, jobs := setupAuxLeafSigner(t, numSigJobs)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

select {
case <-time.After(testTimeout):
t.Fatalf("timeout waiting for response")
case <-jobs[sigJobCancelThreshold].Resp:
// Send the cancel signal; jobs at the end of the batch should
// not be processed.
close(cancelChan)
}

signer.Wg.Wait()

// Once the aux signer finishes handling the batch, the last job of the
// batch should have an empty response channel. Otherwise, the signer
// failed to skip that job after the cancel channel was closed.
select {
case <-jobs[numSigJobs-1].Resp:
t.Fatalf("Job cancellation failed")
default:
}
}

// TestAuxLeafSignerCancelAndQuit tests that the AuxLeafSigner will handle a
// quit signal correctly, which involves ending sig job handling as soon as
// possible. This test also sends a cancel signal before the quit signal, to
// check that quits are handled correctly alongside other sent signals.
func TestAuxLeafSignerCancelAndQuit(t *testing.T) {
// Constructing multiple jobs will allow us to assert that later jobs
// are skipped successfully after sending the quit signal.
signer, cancelChan, commitTx, jobs := setupAuxLeafSigner(t, numSigJobs)
defer func() {
require.NoError(t, signer.Stop())
}()

err := signer.SubmitSecondLevelSigBatch(chanState, commitTx, jobs)
require.NoError(t, err)

select {
case <-time.After(testTimeout):
t.Fatalf("timeout waiting for response")
case <-jobs[sigJobCancelThreshold].Resp:
// Another component could have sent the cancel signal; we'll
// send that before the quit signal.
close(cancelChan)
time.Sleep(time.Millisecond)

// Send the quit signal; jobs at the end of the batch should not
// be processed.
require.NoError(t, signer.Stop())
}

// Once the aux signer stops, the last job of the batch should have an
// an empty response. Otherwise, the signer failed to stop as soon as
// the quit signal was sent.
select {
case <-jobs[numSigJobs-1].Resp:
t.Fatalf("Aux signer quitting failed")
default:
}
}

// mockVirtualSigner is a mock implementation of the VirtualSigner interface.
type mockVirtualSigner struct {
}
Expand Down
52 changes: 38 additions & 14 deletions tapchannel/aux_leaf_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (
"github.com/lightningnetwork/lnd/tlv"
)

// shutdownErr is used in multiple spots when exiting the sig batch processor.
var shutdownErr = fmt.Errorf("tapd is shutting down")

// VirtualPacketSigner is an interface that can be used to sign virtual packets.
type VirtualPacketSigner interface {
// SignVirtualPacket signs the virtual transaction of the given packet
Expand Down Expand Up @@ -241,43 +244,49 @@ func (s *AuxLeafSigner) processAuxSigBatch(chanState *channeldb.OpenChannel,
defer s.Wg.Done()

log.Tracef("Processing %d aux sig jobs", len(sigJobs))

for idx := range sigJobs {
sigJob := sigJobs[idx]
cancelAndErr := func(err error) {
respondErr := func(err error) {
log.Errorf("Error processing aux sig job: %v", err)

close(sigJob.Cancel)
sigJob.Resp <- lnwallet.AuxSigJobResp{
Err: err,
}
}

// If we're shutting down, we cancel the job and return.
// Check for cancel or quit signals before beginning the job.
select {
case <-sigJob.Cancel:
continue
case <-s.Quit:
cancelAndErr(fmt.Errorf("tapd is shutting down"))
respondErr(shutdownErr)
return

default:
}

// If there is no commit blob, this isn't a custom channel. We
// still need to signal the job as done though, even if we don't
// have a signature to return.
if sigJob.CommitBlob.IsNone() {
sigJob.Resp <- lnwallet.AuxSigJobResp{
select {
case sigJob.Resp <- lnwallet.AuxSigJobResp{
HtlcIndex: sigJob.HTLC.HtlcIndex,
}:
continue
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
continue
}

com, err := cmsg.DecodeCommitment(
sigJob.CommitBlob.UnsafeFromSome(),
)
if err != nil {
cancelAndErr(fmt.Errorf("error decoding commitment: "+
"%w", err))
respondErr(fmt.Errorf("error decoding commitment: %w",
err))
return
}

Expand All @@ -299,26 +308,41 @@ func (s *AuxLeafSigner) processAuxSigBatch(chanState *channeldb.OpenChannel,
// If the HTLC doesn't have any asset outputs, it's not an
// asset HTLC, so we can skip it.
if len(htlcOutputs) == 0 {
sigJob.Resp <- lnwallet.AuxSigJobResp{
select {
case sigJob.Resp <- lnwallet.AuxSigJobResp{
HtlcIndex: sigJob.HTLC.HtlcIndex,
}:
continue
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
continue
}

resp, err := s.generateHtlcSignature(
chanState, commitTx, htlcOutputs, sigJob.SignDesc,
sigJob.BaseAuxJob,
)
if err != nil {
cancelAndErr(fmt.Errorf("error generating HTLC "+
respondErr(fmt.Errorf("error generating HTLC "+
"signature: %w", err))
return
}

// Success!
log.Tracef("Generated HTLC signature for HTLC with index %d",
sigJob.HTLC.HtlcIndex)
sigJob.Resp <- resp

select {
case sigJob.Resp <- resp:
case <-sigJob.Cancel:
continue
case <-s.Quit:
respondErr(shutdownErr)
return
}
}
}

Expand Down

0 comments on commit 2f0947d

Please sign in to comment.