Skip to content

Commit

Permalink
dkg: add minimal exchanger (#545)
Browse files Browse the repository at this point in the history
Adds minimal exchanger struct with methods to get []core.ParSignedDataSet.

category: feature
ticket: #522
  • Loading branch information
dB2510 authored May 18, 2022
1 parent 5d482b5 commit ca016ef
Show file tree
Hide file tree
Showing 2 changed files with 238 additions and 0 deletions.
114 changes: 114 additions & 0 deletions dkg/exchanger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package dkg

import (
"context"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/core/parsigdb"
"github.com/obolnetwork/charon/core/parsigex"
)

// Note: Following duty types shouldn't be confused with the duty types in core workflow. This was
// to get support from parsigex and parsigdb core components. This may subject to change when DKG
// package has its own networking and database components.
// Values of following constants should not change as it can break backwards compatibility.
const (
// dutyLock is responsible for lock hash signed partial signatures exchange and aggregation.
dutyLock core.DutyType = 101
// dutyDepositData is responsible for deposit data signed partial signatures exchange and aggregation.
// TODO(dhruv): get rid of this nolint in next PR
//nolint:deadcode,varcheck
dutyDepositData core.DutyType = 102
)

// sigData includes the fields obtained from sigdb when threshold is reached.
type sigData struct {
duty core.DutyType
pubkey core.PubKey
psigs []core.ParSignedData
}

// exchanger is responsible for exchanging partial signatures between peers on libp2p.
type exchanger struct {
sigChan chan sigData
sigex *parsigex.ParSigEx
sigdb *parsigdb.MemDB
numVals int
}

func newExchanger(tcpNode host.Host, peerIdx int, peers []peer.ID, vals int) *exchanger {
ex := &exchanger{
// threshold is len(peers) to wait until we get all the partial sigs from all the peers per DV
sigdb: parsigdb.NewMemDB(len(peers)),
sigex: parsigex.NewParSigEx(tcpNode, peerIdx, peers),
sigChan: make(chan sigData),
numVals: vals,
}

// Wiring core workflow components
ex.sigdb.SubscribeInternal(ex.sigex.Broadcast)
ex.sigdb.SubscribeThreshold(ex.pushPsigs)
ex.sigex.Subscribe(ex.sigdb.StoreExternal)

return ex
}

// exchange exhanges partial signatures of lockhash/deposit-data among dkg participants and returns all the partial
// signatures of the group according to public key of each DV.
func (e *exchanger) exchange(ctx context.Context, duty core.DutyType, set core.ParSignedDataSet) (map[core.PubKey][]core.ParSignedData, error) {
// Start the process by storing current peer's ParSignedDataSet
err := e.sigdb.StoreInternal(ctx, core.Duty{Type: duty}, set)
if err != nil {
return nil, err
}

sets := make(map[core.PubKey][]core.ParSignedData)
for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case peerSet := <-e.sigChan:
if duty != peerSet.duty {
// Do nothing if duty doesn't match
continue
}
sets[peerSet.pubkey] = peerSet.psigs
}

// We are done when we have ParSignedData of all the DVs from all each peer
if len(sets) == e.numVals {
break
}
}

return sets, nil
}

// pushPsigs is responsible for writing partial signature data to sigChan obtained from other peers.
func (e *exchanger) pushPsigs(_ context.Context, duty core.Duty, pk core.PubKey, psigs []core.ParSignedData) error {
e.sigChan <- sigData{
duty: duty.Type,
pubkey: pk,
psigs: psigs,
}

return nil
}
124 changes: 124 additions & 0 deletions dkg/exchanger_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package dkg

import (
"context"
"reflect"
"sync"
"testing"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/core"
"github.com/obolnetwork/charon/testutil"
)

// TODO(dhruv): add tests for negative scenarios (take inspiration from core/qbft/qbft_internal_test).
func TestExchanger(t *testing.T) {
ctx := context.Background()

const (
dvs = 3
nodes = 4
)

// Create pubkeys for each DV
pubkeys := make([]core.PubKey, dvs)
for i := 0; i < dvs; i++ {
pubkeys[i] = testutil.RandomCorePubKey(t)
}

// Expected data is what is desired at the end of exchange
expectedData := make(map[core.PubKey][]core.ParSignedData)
for i := 0; i < dvs; i++ {
set := make([]core.ParSignedData, nodes)
for j := 0; j < nodes; j++ {
set[j] = core.ParSignedData{
Signature: testutil.RandomCoreSignature(),
ShareIdx: j + 1,
}
}
expectedData[pubkeys[i]] = set
}

dataToBeSent := make(map[int]core.ParSignedDataSet)
for pk, psigs := range expectedData {
for _, psig := range psigs {
_, ok := dataToBeSent[psig.ShareIdx-1]
if !ok {
dataToBeSent[psig.ShareIdx-1] = make(core.ParSignedDataSet)
}
dataToBeSent[psig.ShareIdx-1][pk] = psig
}
}

var (
peers []peer.ID
hosts []host.Host
hostsInfo []peer.AddrInfo
exchangers []*exchanger
)

// Create hosts
for i := 0; i < nodes; i++ {
h := testutil.CreateHost(t, testutil.AvailableAddr(t))
info := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
hostsInfo = append(hostsInfo, info)
peers = append(peers, h.ID())
hosts = append(hosts, h)
}

// Connect each host with its peers
for i := 0; i < nodes; i++ {
for j := 0; j < nodes; j++ {
if i == j {
continue
}
hosts[i].Peerstore().AddAddrs(hostsInfo[j].ID, hostsInfo[j].Addrs, peerstore.PermanentAddrTTL)
}
}

for i := 0; i < nodes; i++ {
ex := newExchanger(hosts[i], i, peers, dvs)
exchangers = append(exchangers, ex)
}

var actual []map[core.PubKey][]core.ParSignedData
var wg sync.WaitGroup
for i := 0; i < nodes; i++ {
wg.Add(1)
go func(node int) {
defer wg.Done()

data, err := exchangers[node].exchange(ctx, dutyLock, dataToBeSent[node])
require.NoError(t, err)

actual = append(actual, data)
}(i)
}
wg.Wait()

for i := 0; i < nodes; i++ {
reflect.DeepEqual(actual[i], expectedData)
}
}

0 comments on commit ca016ef

Please sign in to comment.