Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: concurrent map problem and set radius at startup #213

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion p2p/discover/portal_protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ func (p *PortalProtocol) setupUDPListening() error {
func(buf []byte, addr *net.UDPAddr) (int, error) {
p.Log.Info("will send to target data", "ip", addr.IP.To4().String(), "port", addr.Port, "bufLength", len(buf))

if n, ok := p.DiscV5.cachedAddrNode[addr.String()]; ok {
if n, ok := p.DiscV5.GetCachedNode(addr.String()); ok {
//_, err := p.DiscV5.TalkRequestToID(id, addr, string(portalwire.UTPNetwork), buf)
req := &v5wire.TalkRequest{Protocol: string(portalwire.Utp), Message: buf}
p.DiscV5.sendFromAnotherThreadWithNode(n, netip.AddrPortFrom(netutil.IPToAddr(addr.IP), uint16(addr.Port)), req)
Expand Down
25 changes: 19 additions & 6 deletions p2p/discover/v5_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type UDPv5 struct {
// static fields
conn UDPConn
tab *Table
cachedIds map[enode.ID]*enode.Node
nodeMu sync.Mutex
cachedAddrNode map[string]*enode.Node
netrestrict *netutil.Netlist
priv *ecdsa.PrivateKey
Expand Down Expand Up @@ -155,7 +155,6 @@ func newUDPv5(conn UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv5, error) {
// static fields
conn: newMeteredConn(conn),
cachedAddrNode: make(map[string]*enode.Node),
cachedIds: make(map[enode.ID]*enode.Node),
localNode: ln,
db: ln.Database(),
netrestrict: cfg.NetRestrict,
Expand Down Expand Up @@ -729,8 +728,7 @@ func (t *UDPv5) send(toID enode.ID, toAddr netip.AddrPort, packet v5wire.Packet,
return nonce, err
}
if c != nil && c.Node != nil {
t.cachedIds[toID] = c.Node
t.cachedAddrNode[toAddr.String()] = c.Node
t.putCache(toAddr.String(), c.Node)
}

_, err = t.conn.WriteToUDPAddrPort(enc, toAddr)
Expand Down Expand Up @@ -793,8 +791,7 @@ func (t *UDPv5) handlePacket(rawpacket []byte, fromAddr netip.AddrPort) error {
if fromNode != nil {
// Handshake succeeded, add to table.
t.tab.addInboundNode(fromNode)
t.cachedIds[fromID] = fromNode
t.cachedAddrNode[fromAddr.String()] = fromNode
t.putCache(fromAddr.String(), fromNode)
}
if packet.Kind() != v5wire.WhoareyouPacket {
// WHOAREYOU logged separately to report errors.
Expand Down Expand Up @@ -999,3 +996,19 @@ func packNodes(reqid []byte, nodes []*enode.Node) []*v5wire.Nodes {
}
return resp
}

func (t *UDPv5) putCache(addr string, node *enode.Node) {
t.nodeMu.Lock()
defer t.nodeMu.Unlock()
if n, ok := t.cachedAddrNode[addr]; ok {
t.log.Debug("Update cached node", "old", n.ID(), "new", node.ID())
}
t.cachedAddrNode[addr] = node
}

func (t *UDPv5) GetCachedNode(addr string) (*enode.Node, bool) {
t.nodeMu.Lock()
defer t.nodeMu.Unlock()
n, ok := t.cachedAddrNode[addr]
return n, ok
}
35 changes: 34 additions & 1 deletion portalnetwork/history/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const (
deleteSql = "DELETE FROM kvstore WHERE key = (?1);"
containSql = "SELECT 1 FROM kvstore WHERE key = (?1);"
getAllOrderedByDistanceSql = "SELECT key, length(value), xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC;"
getFarthestDistanceSql = "SELECT key, xor(key, (?1)) as distance FROM kvstore ORDER BY distance DESC Limit 1;"
deleteOutOfRadiusStmt = "DELETE FROM kvstore WHERE greater(xor(key, (?1)), (?2)) = 1"
XorFindFarthestQuery = `SELECT
xor(key, (?1)) as distance
Expand Down Expand Up @@ -117,8 +118,8 @@ func NewHistoryStorage(config storage.PortalStorageConfig) (storage.ContentStora
}

err = hs.initStmts()

// Check whether we already have data, and use it to set radius
hs.setRadiusToFarthestDistance()

// necessary to test NetworkName==history because state also initialize HistoryStorage
if strings.ToLower(config.NetworkName) == "history" {
Expand Down Expand Up @@ -376,6 +377,38 @@ func (p *ContentStorage) EstimateNewRadius(currentRadius *uint256.Int) (*uint256
return currentRadius, nil
}

func (p *ContentStorage) setRadiusToFarthestDistance() {
rows, err := p.sqliteDB.Query(getFarthestDistanceSql, p.nodeId[:])
if err != nil {
p.log.Error("failed to query farthest distance ", "err", err)
return
}
defer func(rows *sql.Rows) {
if rows != nil {
return
}
err = rows.Close()
if err != nil {
p.log.Error("failed to close rows", "err", err)
}
}(rows)

if rows.Next() {
var contentId []byte
var distance []byte
err = rows.Scan(&contentId, &distance)
if err != nil {
p.log.Error("failed to scan rows for farthest distance", "err", err)
}
dis := uint256.NewInt(0)
err = dis.UnmarshalSSZ(distance)
if err != nil {
p.log.Error("failed to unmarshal ssz for farthest distance", "err", err)
}
p.radius.Store(dis)
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should first check whether the capacity is full. if capacity is not full, there is no need to update radius

func (p *ContentStorage) deleteContentFraction(fraction float64) (deleteCount int, err error) {
if fraction <= 0 || fraction >= 1 {
return deleteCount, errors.New("fraction should be between 0 and 1")
Expand Down