Skip to content

Commit

Permalink
Merge pull request #141 from dusk-network/kadcast
Browse files Browse the repository at this point in the history
Kadcast functional implementation
  • Loading branch information
autholykos authored Dec 11, 2019
2 parents 64a5ebd + 5593b62 commit 3b47dba
Show file tree
Hide file tree
Showing 12 changed files with 1,122 additions and 0 deletions.
44 changes: 44 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import (
log "github.com/sirupsen/logrus"

"github.com/dusk-network/dusk-blockchain/pkg/p2p/kadcast"
"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
)

func main() {
log.Infoln("Starting Kadcast Node!")
// Our node info.
var port uint16 = 25519
ip := [4]byte{62, 57, 180, 247}
router := kadcast.MakeRouter(ip, port)
log.Infoln("Router was created Successfully.")

// Create buffer.
queue := ring.NewBuffer(500)

// Launch PacketProcessor rutine.
go kadcast.ProcessPacket(queue, &router)

// Launch a listener for our node.
go kadcast.StartUDPListener("udp", queue, router.MyPeerInfo)

// Create BootstrapNodes Peer structs
var port1 uint16 = 25519
ip1 := [4]byte{157, 230, 219, 77}
boot1 := kadcast.MakePeer(ip1, port1)
var bootstrapNodes []kadcast.Peer
bootstrapNodes = append(bootstrapNodes[:], boot1)

// Start Bootstrapping process.
err := kadcast.InitBootstrap(&router, bootstrapNodes)
if err != nil {
log.Panic("Error during the Bootstrap Process. Job terminated.")
}

// Once the bootstrap succeeded, start the network discovery.
kadcast.StartNetworkDiscovery(&router)

select {}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
golang.org/x/net v0.0.0-20190926025831-c00fd9afed17 // indirect
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 // indirect
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gotest.tools v2.2.0+incompatible
)

go 1.13
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/graphql-go/graphql v0.7.8 h1:769CR/2JNAhLG9+aa8pfLkKdR0H+r5lsQqling5WwpU=
Expand Down Expand Up @@ -124,6 +125,7 @@ github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/robpike/ivy v0.0.0-20180326033303-3dd8a2d16657 h1:gKsXF1HCXqryC0W9Y2W4Nw4R/GAPj/SjnsXby0jZ62s=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
Expand Down Expand Up @@ -231,4 +233,6 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4 h1:/eiJrUcujPVeJ3xlSWaiNi3uSVmDGBK1pDHUHAnao1I=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
99 changes: 99 additions & 0 deletions pkg/p2p/kadcast/bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package kadcast

// MaxBucketPeers represents the maximum
//number of peers that a `bucket` can hold.
var MaxBucketPeers uint8 = 25

// bucket stores peer info of the peers that are at a certain
// distance range to the peer itself.
type bucket struct {
idLength uint8
peerCount uint8
totalPeersPassed uint64
// Should always be less than `MaxBucketPeers`
entries []Peer
// This map keeps the order of arrivals for LRU
lru map[Peer]uint64
// This map allows us to quickly see if a Peer is
// included on a entries set without iterating over
// it.
lruPresent map[Peer]bool
}

// Allocates space for a `bucket` and returns a instance
// of it with the specified `idLength`.
func makeBucket(idlen uint8) bucket {
return bucket{
idLength: idlen,
totalPeersPassed: 0,
peerCount: 0,
entries: make([]Peer, 0, MaxBucketPeers),
lru: make(map[Peer]uint64),
lruPresent: make(map[Peer]bool),
}
}

// Finds the Least Recently Used Peer on the entries set
// of the `bucket` and returns it's index on the entries
// set and the `Peer` info that is hold on it.
func (b bucket) findLRUPeerIndex() (int, uint64) {
var val = b.totalPeersPassed
i := 0
for index, p := range b.entries {
if b.lru[p] <= val {
val = b.lru[p]
i = index
}
}
return i, val
}

// Remove a `Peer` from the entries set without
// caring about the order.
// It also maps the `Peer` to false on the LRU map.
// The resulting slice of entries is then returned.
func (b *bucket) removePeerAtIndex(index int) []Peer {
// Remove peer from the lruPresent map.
b.lruPresent[b.entries[index]] = false

b.entries[index] = b.entries[len(b.entries)-1]
// We do not need to put s[i] at the end, as it will be discarded anyway
return b.entries[:len(b.entries)-1]
}

// Adds a `Peer` to the `bucket` entries list.
// It also increments the peerCount all according
// the LRU policy.
func (b *bucket) addPeer(peer Peer) {
// Check if the entries set can hold more peers.
if len(b.entries) < int(MaxBucketPeers) {
// Insert it into the set if not present
// on the current entries set.
if b.lruPresent[peer] == false {
b.entries = append(b.entries, peer)
b.peerCount++
b.lruPresent[peer] = true
}
// Store recently used peer.
b.lru[peer] = b.totalPeersPassed
b.totalPeersPassed++
return
}
// If the entries set is full, we perform
// LRU and remove a peer to include the new one.
//
// Check if peer is not already present into the
// entries set
if b.lruPresent[peer] == false {
// Search for the least recently used peer.
var index, _ = b.findLRUPeerIndex()
// Remove it from the entries set and from
// the lruPresent map.
b.entries = b.removePeerAtIndex(index)
// Add the new peer to the entries set.
b.entries = append(b.entries, peer)
b.lruPresent[peer] = true
b.totalPeersPassed++
}
b.lru[peer] = b.totalPeersPassed
}
65 changes: 65 additions & 0 deletions pkg/p2p/kadcast/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package kadcast

import (
log "github.com/sirupsen/logrus"
"net"
"time"

"github.com/dusk-network/dusk-blockchain/pkg/util/container/ring"
)

// StartUDPListener listens infinitely for UDP packet arrivals and
// executes it's processing inside a gorutine by sending
// the packets to the circularQueue.
func StartUDPListener(netw string, queue *ring.Buffer, MyPeerInfo Peer, ) {

lAddr := getLocalUDPAddress()
// Set listening port.
lAddr.Port = int(MyPeerInfo.port)
PacketConnCreation:
// listen to incoming udp packets
pc, err := net.ListenUDP(netw, &lAddr)
if err != nil {
log.Panic(err)
}
// Set initial deadline.
pc.SetDeadline(time.Now().Add(time.Minute))

// Instanciate the buffer
buffer := make([]byte, 1024)
for {
// Read UDP packet.
byteNum, uAddr, err := pc.ReadFromUDP(buffer)

if err != nil {
log.WithError(err).Warn("Error on packet read")
pc.Close()
goto PacketConnCreation
}
// Set a new deadline for the connection.
pc.SetDeadline(time.Now().Add(5 * time.Minute))
// Serialize the packet.
encodedPack := encodeRedPacket(uint16(byteNum), *uAddr, buffer[0:byteNum])
// Send the packet to the Consumer putting it on the queue.
queue.Put(encodedPack)
}
}

// Gets the local address of the sender `Peer` and the UDPAddress of the
// reciever `Peer` and sends to it a UDP Packet with the payload inside.
func sendUDPPacket(netw string, addr net.UDPAddr, payload []byte) {
localAddr := getLocalUDPAddress()
conn, err := net.DialUDP(netw, &localAddr, &addr)
if err != nil {
log.WithError(err).Warn("Could not stablish a connection with the dest Peer.")
return
}
defer conn.Close()

// Simple write
_, err = conn.Write(payload)
if err != nil {
log.WithError(err).Warn("Error while writting to the filedescriptor.")
return
}
}
Loading

0 comments on commit 3b47dba

Please sign in to comment.