Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle PeerExchangeManager on the right thread #696

Merged
merged 1 commit into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/MonoTorrent.Client/MonoTorrent.Client.Modes/Mode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,13 +568,21 @@ public virtual void HandlePeerConnected (PeerId id)
AppendFastPieces (id, bundle);

id.MessageQueue.Enqueue (bundle, releaser);

foreach (var peer in Manager.Peers.ConnectedPeers)
if (peer != id && peer.PeerExchangeManager != null)
peer.PeerExchangeManager.OnAdd (id);
} else {
ConnectionManager.CleanupSocket (Manager, id);
}
}

public virtual void HandlePeerDisconnected (PeerId id)
{
foreach (var peer in Manager.Peers.ConnectedPeers)
if (peer != id && peer.PeerExchangeManager != null)
peer.PeerExchangeManager.OnDrop (id);

Manager.RaisePeerDisconnected (id);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ namespace MonoTorrent.Client
{
interface IPeerExchangeSource
{
event EventHandler<PeerConnectedEventArgs> PeerConnected;
event EventHandler<PeerDisconnectedEventArgs> PeerDisconnected;

TorrentSettings Settings { get; }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ namespace MonoTorrent.Client
/// <summary>
/// This class is used to send each minute a peer excahnge message to peer who have enable this protocol
/// </summary>
class PeerExchangeManager : IDisposable
class PeerExchangeManager
{
#region Member Variables

Expand All @@ -49,7 +49,6 @@ class PeerExchangeManager : IDisposable

readonly Queue<PeerId> added6Peers;
readonly Queue<PeerId> dropped6Peers;
bool disposed;

// Peers are about 7 bytes each (if you include the 'dotf' data)
// Calculate the max peers we can fit in the buffer.
Expand All @@ -65,32 +64,33 @@ class PeerExchangeManager : IDisposable

internal PeerExchangeManager (IPeerExchangeSource manager, PeerId id)
{
Manager = manager;
PeerId = id;
Manager = manager ?? throw new ArgumentNullException(nameof(manager));
PeerId = id ?? throw new ArgumentNullException(nameof(id));

addedPeers = new Queue<PeerId> ();
droppedPeers = new Queue<PeerId> ();

added6Peers = new Queue<PeerId> ();
dropped6Peers = new Queue<PeerId> ();
manager.PeerConnected += OnAdd;
}

internal void OnAdd (object? source, PeerConnectedEventArgs args)
internal void OnAdd (PeerId peer)
{
ClientEngine.MainLoop.CheckThread ();
// IPv4 peers will share with IPv4 peers, IPv6 share with
if (args.Peer.Uri.Scheme == "ipv4")
addedPeers.Enqueue (args.Peer);
else if (args.Peer.Uri.Scheme == "ipv6")
added6Peers.Enqueue (args.Peer);
if (peer.Peer.Info.ConnectionUri.Scheme == "ipv4")
addedPeers.Enqueue (peer);
else if (peer.Peer.Info.ConnectionUri.Scheme == "ipv6")
added6Peers.Enqueue (peer);
}

internal void OnDrop (object? source, PeerDisconnectedEventArgs args)
internal void OnDrop (PeerId peer)
{
if (args.Peer.Uri.Scheme == "ipv4")
droppedPeers.Enqueue (args.Peer);
else if (args.Peer.Uri.Scheme == "ipv6")
dropped6Peers.Enqueue (args.Peer);
ClientEngine.MainLoop.CheckThread ();
if (peer.Peer.Info.ConnectionUri.Scheme == "ipv4")
droppedPeers.Enqueue (peer);
else if (peer.Peer.Info.ConnectionUri.Scheme == "ipv6")
dropped6Peers.Enqueue (peer);
}
#endregion

Expand Down Expand Up @@ -164,15 +164,6 @@ internal void OnTick ()
return (added, addedDotF, dropped, memoryReleaser);
}

public void Dispose ()
{
if (disposed)
return;

disposed = true;
Manager.PeerConnected -= OnAdd;
}

#endregion
}
}
1 change: 0 additions & 1 deletion src/MonoTorrent.Client/MonoTorrent.Client/PeerId.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ internal void Dispose ()
Disposed = true;
Connection.SafeDispose ();
MessageQueue.Dispose ();
PeerExchangeManager?.Dispose ();
}

public override string ToString ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,25 @@ public class PeerExchangeManagerTests
class PeerExchangeSource : IPeerExchangeSource
{
public TorrentSettings Settings { get; } = new TorrentSettings ();

#pragma warning disable CS0067
public event EventHandler<PeerConnectedEventArgs> PeerConnected;
public event EventHandler<PeerDisconnectedEventArgs> PeerDisconnected;
#pragma warning restore CS0067
}

byte counter = 0;
PeerId CreatePeer () => PeerId.CreateNull (10, new InfoHash (Enumerable.Repeat<byte> (counter++, 20).ToArray ()));

[Test]
public void TestPeerExchangeManager ()
public async Task TestPeerExchangeManager ()
{
var peer = CreatePeer ();
var pex = new PeerExchangeManager (new PeerExchangeSource (), peer);
pex.OnAdd (null, new PeerConnectedEventArgs (null, CreatePeer ()));
pex.OnAdd (null, new PeerConnectedEventArgs (null, CreatePeer ()));
pex.OnAdd (null, new PeerConnectedEventArgs (null, CreatePeer ()));
pex.OnAdd (null, new PeerConnectedEventArgs (null, CreatePeer ()));
pex.OnDrop (null, new PeerDisconnectedEventArgs (null, CreatePeer ()));
pex.OnDrop (null, new PeerDisconnectedEventArgs (null, CreatePeer ()));

await ClientEngine.MainLoop;

pex.OnAdd (CreatePeer ());
pex.OnAdd (CreatePeer ());
pex.OnAdd (CreatePeer ());
pex.OnAdd (CreatePeer ());
pex.OnDrop (CreatePeer ());
pex.OnDrop (CreatePeer ());

pex.OnTick ();

Expand Down
Loading