Skip to content
This repository has been archived by the owner on Oct 20, 2023. It is now read-only.

Commit

Permalink
Revert "Revert "Streamlined block template updating""
Browse files Browse the repository at this point in the history
This reverts commit ae123a1.
  • Loading branch information
Oliver Weichhold committed Mar 22, 2018
1 parent c24b393 commit d2bd667
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 104 deletions.
111 changes: 55 additions & 56 deletions src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
using System.Globalization;
using System.Linq;
using System.Net;
using System.Reactive;
using System.Reactive.Linq;
using System.Text;
using System.Threading.Tasks;
Expand Down Expand Up @@ -105,10 +106,14 @@ protected virtual void SetupJobUpdates()
return;

jobRebroadcastTimeout = TimeSpan.FromSeconds(Math.Max(1, poolConfig.JobRebroadcastTimeout));

var sources = new List<IObservable<bool>>();
var cancelTimeout = new List<IObservable<bool>>();

var blockSubmission = blockSubmissionSubject.Synchronize();
var pollTimerRestart = blockSubmissionSubject.Synchronize();

var triggers = new List<IObservable<(bool Force, string Via)>>
{
blockSubmission.Select(x=> (false, "Block-submission"))
};

// collect ports
var zmq = poolConfig.Daemons
.Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs<BitcoinDaemonEndpointConfigExtra>()?.ZmqBlockNotifySocket))
Expand All @@ -123,70 +128,62 @@ protected virtual void SetupJobUpdates()

if (zmq.Count > 0)
{
logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}");

var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2)
.Select(frames =>
{
// We just take the second frame's raw data and turn it into a hex string.
// If that string changes, we got an update (DistinctUntilChanged)
var result = frames[1].ToHexString();
frames.Dispose();
return result;
})
.DistinctUntilChanged()
.Select(_ => Observable.FromAsync(() => UpdateJob(false, "ZMQ pub/sub")))
.Concat()
.Publish()
.RefCount();

sources.Add(newJobsPubSub);
cancelTimeout.Add(newJobsPubSub);
logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}");

var blockNotify = daemon.ZmqSubscribe(zmq, 2)
.Select(frames =>
{
// We just take the second frame's raw data and turn it into a hex string.
// If that string changes, we got an update (DistinctUntilChanged)
var result = frames[1].ToHexString();
frames.Dispose();
return result;
})
.DistinctUntilChanged()
.Select(_ => (false, "ZMQ pub/sub"))
.Publish()
.RefCount();

pollTimerRestart = Observable.Merge(
blockSubmission,
blockNotify.Select(_ => Unit.Default))
.Publish()
.RefCount();

triggers.Add(blockNotify);
}

if (poolConfig.BlockRefreshInterval > 0)
{
// periodically update block-template from daemon
var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling")))
.Concat()
.Where(isNew => isNew)
.Publish()
.RefCount();

sources.Add(newJobsPolled);
cancelTimeout.Add(newJobsPolled);
// periodically update block-template
triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.TakeUntil(pollTimerRestart)
.Select(_ => (false, "RPC polling"))
.Repeat());
}

else
{
// poll for the first successful update after which polling is suspended forever
var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling")))
.Concat()
.Where(isNew => isNew)
.Take(1)
.Publish()
.RefCount();

sources.Add(newJobsPolled);
cancelTimeout.Add(newJobsPolled);
// get initial blocktemplate
triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(_ => (false, "Initial template"))
.TakeWhile(_ => !hasInitialBlockTemplate));
}

// if there haven't been any new jobs for a while, force an update
var cancelRebroadcast = cancelTimeout.Count > 0 ?
cancelTimeout.Count > 1 ? Observable.Merge(cancelTimeout) : cancelTimeout.First() :
Observable.Never<bool>();

sources.Add(Observable.Timer(jobRebroadcastTimeout)
.TakeUntil(cancelRebroadcast) // cancel timeout if an actual new job has been detected
.Do(_ => logger.Debug(() => $"[{LogCat}] No new blocks for {jobRebroadcastTimeout.TotalSeconds} seconds - updating transactions & rebroadcasting work"))
.Select(x => Observable.FromAsync(() => UpdateJob(true, "Job-Refresh")))
.Concat()
// periodically update transactions for current template
triggers.Add(Observable.Timer(jobRebroadcastTimeout)
.TakeUntil(pollTimerRestart)
.Select(_ => (true, "Job-Refresh"))
.Repeat());

Jobs = Observable.Merge(sources)
.Select(GetJobParamsForStratum);
Jobs = Observable.Merge(triggers)
.Select(x => Observable.FromAsync(() => UpdateJob(x.Force, x.Via)))
.Concat()
.Where(isNew => isNew)
.Do(_ => hasInitialBlockTemplate = true)
.Select(GetJobParamsForStratum)
.Publish()
.RefCount();
}

protected virtual async Task<DaemonResponse<TBlockTemplate>> GetBlockTemplateAsync()
Expand Down Expand Up @@ -498,6 +495,8 @@ public virtual async Task<Share> SubmitShareAsync(StratumClient worker, object s
if (share.IsBlockCandidate)
{
logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{share.BlockHash}] submitted by {minerName}");

blockSubmissionSubject.OnNext(Unit.Default);

// persist the coinbase transaction-hash to allow the payment processor
// to verify later on that the pool has received the reward for the block
Expand Down
5 changes: 5 additions & 0 deletions src/MiningCore/Blockchain/JobManagerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

using System;
using System.Globalization;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;
using Autofac;
Expand Down Expand Up @@ -48,6 +51,8 @@ protected JobManagerBase(IComponentContext ctx)
protected object jobLock = new object();
protected ILogger logger;
protected PoolConfig poolConfig;
protected bool hasInitialBlockTemplate = false;
protected Subject<Unit> blockSubmissionSubject = new Subject<Unit>();

protected virtual string LogCat { get; } = "Job Manager";

Expand Down
98 changes: 50 additions & 48 deletions src/MiningCore/Blockchain/Monero/MoneroJobManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,8 @@ public async Task<Share> SubmitShareAsync(StratumClient worker,
{
logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{blobHash.Substring(0, 6)}] submitted by {context.MinerName}");

blockSubmissionSubject.OnNext(Unit.Default);

share.TransactionConfirmationData = blobHash;
}

Expand Down Expand Up @@ -507,11 +509,16 @@ private void ConfigureRewards()
protected virtual void SetupJobUpdates()
{
if (poolConfig.EnableInternalStratum == false)
return;

var sources = new List<IObservable<bool>>();
var cancelTimeout = new List<IObservable<bool>>();
return;

var blockSubmission = blockSubmissionSubject.Synchronize();
var pollTimerRestart = blockSubmissionSubject.Synchronize();

var triggers = new List<IObservable<string>>
{
blockSubmission.Select(x=> "Block-submission")
};

// collect ports
var zmq = poolConfig.Daemons
.Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs<MoneroDaemonEndpointConfigExtra>()?.ZmqBlockNotifySocket))
Expand All @@ -526,58 +533,53 @@ protected virtual void SetupJobUpdates()

if (zmq.Count > 0)
{
logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}");

var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2)
.Select(frames =>
{
// We just take the second frame's raw data and turn it into a hex string.
// If that string changes, we got an update (DistinctUntilChanged)
logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}");

var blockNotify = daemon.ZmqSubscribe(zmq, 2)
.Select(frames =>
{
// We just take the second frame's raw data and turn it into a hex string.
// If that string changes, we got an update (DistinctUntilChanged)
var result = frames[1].ToHexString();
frames.Dispose();
return result;
})
.DistinctUntilChanged()
.Select(_ => Observable.FromAsync(() => UpdateJob("ZMQ pub/sub")))
.Concat()
.Publish()
.RefCount();

sources.Add(newJobsPubSub);
cancelTimeout.Add(newJobsPubSub);
}

if (poolConfig.BlockRefreshInterval > 0)
{
// periodically update block-template from daemon
var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling")))
.Concat()
.Where(isNew => isNew)
.Publish()
.RefCount();

sources.Add(newJobsPolled);
cancelTimeout.Add(newJobsPolled);
})
.DistinctUntilChanged()
.Select(_ => "ZMQ pub/sub")
.Publish()
.RefCount();

pollTimerRestart = Observable.Merge(
blockSubmission,
blockNotify.Select(_ => Unit.Default))
.Publish()
.RefCount();

triggers.Add(blockNotify);
}

else
{
// poll for the first successful update after which polling is suspended forever
var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling")))
.Concat()
.Where(isNew => isNew)
.Take(1)
.Publish()
.RefCount();

sources.Add(newJobsPolled);
cancelTimeout.Add(newJobsPolled);
if (poolConfig.BlockRefreshInterval > 0)
{
// periodically update block-template
triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval))
.TakeUntil(pollTimerRestart)
.Select(_ => "RPC polling")
.Repeat());
}

else
{
// get initial blocktemplate
triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(_ => "Initial template")
.TakeWhile(_=> !hasInitialBlockTemplate));
}

Blocks = Observable.Merge(sources)
Blocks = Observable.Merge(triggers)
.Select(via => Observable.FromAsync(() => UpdateJob(via)))
.Concat()
.Where(isNew => isNew)
.Do(_=> hasInitialBlockTemplate = true)
.Select(_ => Unit.Default)
.Publish()
.RefCount();
Expand Down

0 comments on commit d2bd667

Please sign in to comment.