From 3235bd0ee9397d9ffe629fdcc76f4f2417f255c5 Mon Sep 17 00:00:00 2001 From: Oliver Weichhold Date: Thu, 22 Mar 2018 23:13:41 +0100 Subject: [PATCH] Revert "Revert "Revert "Streamlined block template updating""" This reverts commit d2bd667588048da5d45fe6b6182d6e2f92b74ba8. --- .../Blockchain/Bitcoin/BitcoinJobManager.cs | 111 +++++++++--------- src/MiningCore/Blockchain/JobManagerBase.cs | 5 - .../Blockchain/Monero/MoneroJobManager.cs | 98 ++++++++-------- 3 files changed, 104 insertions(+), 110 deletions(-) diff --git a/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs b/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs index 690a4be87..c24a8d1dc 100644 --- a/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs +++ b/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs @@ -23,7 +23,6 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Globalization; using System.Linq; using System.Net; -using System.Reactive; using System.Reactive.Linq; using System.Text; using System.Threading.Tasks; @@ -106,14 +105,10 @@ protected virtual void SetupJobUpdates() return; jobRebroadcastTimeout = TimeSpan.FromSeconds(Math.Max(1, poolConfig.JobRebroadcastTimeout)); - var blockSubmission = blockSubmissionSubject.Synchronize(); - var pollTimerRestart = blockSubmissionSubject.Synchronize(); - - var triggers = new List> - { - blockSubmission.Select(x=> (false, "Block-submission")) - }; - + + var sources = new List>(); + var cancelTimeout = new List>(); + // collect ports var zmq = poolConfig.Daemons .Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs()?.ZmqBlockNotifySocket)) @@ -128,62 +123,70 @@ protected virtual void SetupJobUpdates() if (zmq.Count > 0) { - logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - - var blockNotify = daemon.ZmqSubscribe(zmq, 2) - .Select(frames => - { - // We just take the second frame's raw data and turn it into a hex string. - // If that string changes, we got an update (DistinctUntilChanged) - var result = frames[1].ToHexString(); - frames.Dispose(); - return result; - }) - .DistinctUntilChanged() - .Select(_ => (false, "ZMQ pub/sub")) - .Publish() - .RefCount(); - - pollTimerRestart = Observable.Merge( - blockSubmission, - blockNotify.Select(_ => Unit.Default)) - .Publish() - .RefCount(); - - triggers.Add(blockNotify); + logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); + + var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2) + .Select(frames => + { + // We just take the second frame's raw data and turn it into a hex string. + // If that string changes, we got an update (DistinctUntilChanged) + var result = frames[1].ToHexString(); + frames.Dispose(); + return result; + }) + .DistinctUntilChanged() + .Select(_ => Observable.FromAsync(() => UpdateJob(false, "ZMQ pub/sub"))) + .Concat() + .Publish() + .RefCount(); + + sources.Add(newJobsPubSub); + cancelTimeout.Add(newJobsPubSub); } if (poolConfig.BlockRefreshInterval > 0) { - // periodically update block-template - triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .TakeUntil(pollTimerRestart) - .Select(_ => (false, "RPC polling")) - .Repeat()); + // periodically update block-template from daemon + var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling"))) + .Concat() + .Where(isNew => isNew) + .Publish() + .RefCount(); + + sources.Add(newJobsPolled); + cancelTimeout.Add(newJobsPolled); } else { - // get initial blocktemplate - triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)) - .Select(_ => (false, "Initial template")) - .TakeWhile(_ => !hasInitialBlockTemplate)); + // poll for the first successful update after which polling is suspended forever + var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling"))) + .Concat() + .Where(isNew => isNew) + .Take(1) + .Publish() + .RefCount(); + + sources.Add(newJobsPolled); + cancelTimeout.Add(newJobsPolled); } - // periodically update transactions for current template - triggers.Add(Observable.Timer(jobRebroadcastTimeout) - .TakeUntil(pollTimerRestart) - .Select(_ => (true, "Job-Refresh")) - .Repeat()); + // if there haven't been any new jobs for a while, force an update + var cancelRebroadcast = cancelTimeout.Count > 0 ? + cancelTimeout.Count > 1 ? Observable.Merge(cancelTimeout) : cancelTimeout.First() : + Observable.Never(); - Jobs = Observable.Merge(triggers) - .Select(x => Observable.FromAsync(() => UpdateJob(x.Force, x.Via))) + sources.Add(Observable.Timer(jobRebroadcastTimeout) + .TakeUntil(cancelRebroadcast) // cancel timeout if an actual new job has been detected + .Do(_ => logger.Debug(() => $"[{LogCat}] No new blocks for {jobRebroadcastTimeout.TotalSeconds} seconds - updating transactions & rebroadcasting work")) + .Select(x => Observable.FromAsync(() => UpdateJob(true, "Job-Refresh"))) .Concat() - .Where(isNew => isNew) - .Do(_ => hasInitialBlockTemplate = true) - .Select(GetJobParamsForStratum) - .Publish() - .RefCount(); + .Repeat()); + + Jobs = Observable.Merge(sources) + .Select(GetJobParamsForStratum); } protected virtual async Task> GetBlockTemplateAsync() @@ -495,8 +498,6 @@ public virtual async Task SubmitShareAsync(StratumClient worker, object s if (share.IsBlockCandidate) { logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{share.BlockHash}] submitted by {minerName}"); - - blockSubmissionSubject.OnNext(Unit.Default); // persist the coinbase transaction-hash to allow the payment processor // to verify later on that the pool has received the reward for the block diff --git a/src/MiningCore/Blockchain/JobManagerBase.cs b/src/MiningCore/Blockchain/JobManagerBase.cs index 41cd484ae..6b9af018d 100644 --- a/src/MiningCore/Blockchain/JobManagerBase.cs +++ b/src/MiningCore/Blockchain/JobManagerBase.cs @@ -20,9 +20,6 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; using System.Globalization; -using System.Reactive; -using System.Reactive.Linq; -using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Autofac; @@ -51,8 +48,6 @@ protected JobManagerBase(IComponentContext ctx) protected object jobLock = new object(); protected ILogger logger; protected PoolConfig poolConfig; - protected bool hasInitialBlockTemplate = false; - protected Subject blockSubmissionSubject = new Subject(); protected virtual string LogCat { get; } = "Job Manager"; diff --git a/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs b/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs index 97a13e129..809d97546 100644 --- a/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs +++ b/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs @@ -320,8 +320,6 @@ public async Task SubmitShareAsync(StratumClient worker, { logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{blobHash.Substring(0, 6)}] submitted by {context.MinerName}"); - blockSubmissionSubject.OnNext(Unit.Default); - share.TransactionConfirmationData = blobHash; } @@ -509,16 +507,11 @@ private void ConfigureRewards() protected virtual void SetupJobUpdates() { if (poolConfig.EnableInternalStratum == false) - return; - - var blockSubmission = blockSubmissionSubject.Synchronize(); - var pollTimerRestart = blockSubmissionSubject.Synchronize(); + return; + + var sources = new List>(); + var cancelTimeout = new List>(); - var triggers = new List> - { - blockSubmission.Select(x=> "Block-submission") - }; - // collect ports var zmq = poolConfig.Daemons .Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs()?.ZmqBlockNotifySocket)) @@ -533,53 +526,58 @@ protected virtual void SetupJobUpdates() if (zmq.Count > 0) { - logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - - var blockNotify = daemon.ZmqSubscribe(zmq, 2) - .Select(frames => - { - // We just take the second frame's raw data and turn it into a hex string. - // If that string changes, we got an update (DistinctUntilChanged) + logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); + + var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2) + .Select(frames => + { + // We just take the second frame's raw data and turn it into a hex string. + // If that string changes, we got an update (DistinctUntilChanged) var result = frames[1].ToHexString(); frames.Dispose(); return result; - }) - .DistinctUntilChanged() - .Select(_ => "ZMQ pub/sub") - .Publish() - .RefCount(); - - pollTimerRestart = Observable.Merge( - blockSubmission, - blockNotify.Select(_ => Unit.Default)) - .Publish() - .RefCount(); - - triggers.Add(blockNotify); + }) + .DistinctUntilChanged() + .Select(_ => Observable.FromAsync(() => UpdateJob("ZMQ pub/sub"))) + .Concat() + .Publish() + .RefCount(); + + sources.Add(newJobsPubSub); + cancelTimeout.Add(newJobsPubSub); } - if (poolConfig.BlockRefreshInterval > 0) - { - // periodically update block-template - triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .TakeUntil(pollTimerRestart) - .Select(_ => "RPC polling") - .Repeat()); - } - - else - { - // get initial blocktemplate - triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)) - .Select(_ => "Initial template") - .TakeWhile(_=> !hasInitialBlockTemplate)); + if (poolConfig.BlockRefreshInterval > 0) + { + // periodically update block-template from daemon + var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling"))) + .Concat() + .Where(isNew => isNew) + .Publish() + .RefCount(); + + sources.Add(newJobsPolled); + cancelTimeout.Add(newJobsPolled); + } + + else + { + // poll for the first successful update after which polling is suspended forever + var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling"))) + .Concat() + .Where(isNew => isNew) + .Take(1) + .Publish() + .RefCount(); + + sources.Add(newJobsPolled); + cancelTimeout.Add(newJobsPolled); } - Blocks = Observable.Merge(triggers) - .Select(via => Observable.FromAsync(() => UpdateJob(via))) - .Concat() + Blocks = Observable.Merge(sources) .Where(isNew => isNew) - .Do(_=> hasInitialBlockTemplate = true) .Select(_ => Unit.Default) .Publish() .RefCount();