From d2bd667588048da5d45fe6b6182d6e2f92b74ba8 Mon Sep 17 00:00:00 2001 From: Oliver Weichhold Date: Thu, 22 Mar 2018 23:00:55 +0100 Subject: [PATCH] Revert "Revert "Streamlined block template updating"" This reverts commit ae123a1ae26dd1d91a64dbc23b5b4b0a7eba8d67. --- .../Blockchain/Bitcoin/BitcoinJobManager.cs | 111 +++++++++--------- src/MiningCore/Blockchain/JobManagerBase.cs | 5 + .../Blockchain/Monero/MoneroJobManager.cs | 98 ++++++++-------- 3 files changed, 110 insertions(+), 104 deletions(-) diff --git a/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs b/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs index c24a8d1dc..690a4be87 100644 --- a/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs +++ b/src/MiningCore/Blockchain/Bitcoin/BitcoinJobManager.cs @@ -23,6 +23,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System.Globalization; using System.Linq; using System.Net; +using System.Reactive; using System.Reactive.Linq; using System.Text; using System.Threading.Tasks; @@ -105,10 +106,14 @@ protected virtual void SetupJobUpdates() return; jobRebroadcastTimeout = TimeSpan.FromSeconds(Math.Max(1, poolConfig.JobRebroadcastTimeout)); - - var sources = new List>(); - var cancelTimeout = new List>(); - + var blockSubmission = blockSubmissionSubject.Synchronize(); + var pollTimerRestart = blockSubmissionSubject.Synchronize(); + + var triggers = new List> + { + blockSubmission.Select(x=> (false, "Block-submission")) + }; + // collect ports var zmq = poolConfig.Daemons .Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs()?.ZmqBlockNotifySocket)) @@ -123,70 +128,62 @@ protected virtual void SetupJobUpdates() if (zmq.Count > 0) { - logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - - var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2) - .Select(frames => - { - // We just take the second frame's raw data and turn it into a hex string. - // If that string changes, we got an update (DistinctUntilChanged) - var result = frames[1].ToHexString(); - frames.Dispose(); - return result; - }) - .DistinctUntilChanged() - .Select(_ => Observable.FromAsync(() => UpdateJob(false, "ZMQ pub/sub"))) - .Concat() - .Publish() - .RefCount(); - - sources.Add(newJobsPubSub); - cancelTimeout.Add(newJobsPubSub); + logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); + + var blockNotify = daemon.ZmqSubscribe(zmq, 2) + .Select(frames => + { + // We just take the second frame's raw data and turn it into a hex string. + // If that string changes, we got an update (DistinctUntilChanged) + var result = frames[1].ToHexString(); + frames.Dispose(); + return result; + }) + .DistinctUntilChanged() + .Select(_ => (false, "ZMQ pub/sub")) + .Publish() + .RefCount(); + + pollTimerRestart = Observable.Merge( + blockSubmission, + blockNotify.Select(_ => Unit.Default)) + .Publish() + .RefCount(); + + triggers.Add(blockNotify); } if (poolConfig.BlockRefreshInterval > 0) { - // periodically update block-template from daemon - var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling"))) - .Concat() - .Where(isNew => isNew) - .Publish() - .RefCount(); - - sources.Add(newJobsPolled); - cancelTimeout.Add(newJobsPolled); + // periodically update block-template + triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .TakeUntil(pollTimerRestart) + .Select(_ => (false, "RPC polling")) + .Repeat()); } else { - // poll for the first successful update after which polling is suspended forever - var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .Select(_ => Observable.FromAsync(() => UpdateJob(false, "RPC polling"))) - .Concat() - .Where(isNew => isNew) - .Take(1) - .Publish() - .RefCount(); - - sources.Add(newJobsPolled); - cancelTimeout.Add(newJobsPolled); + // get initial blocktemplate + triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)) + .Select(_ => (false, "Initial template")) + .TakeWhile(_ => !hasInitialBlockTemplate)); } - // if there haven't been any new jobs for a while, force an update - var cancelRebroadcast = cancelTimeout.Count > 0 ? - cancelTimeout.Count > 1 ? Observable.Merge(cancelTimeout) : cancelTimeout.First() : - Observable.Never(); - - sources.Add(Observable.Timer(jobRebroadcastTimeout) - .TakeUntil(cancelRebroadcast) // cancel timeout if an actual new job has been detected - .Do(_ => logger.Debug(() => $"[{LogCat}] No new blocks for {jobRebroadcastTimeout.TotalSeconds} seconds - updating transactions & rebroadcasting work")) - .Select(x => Observable.FromAsync(() => UpdateJob(true, "Job-Refresh"))) - .Concat() + // periodically update transactions for current template + triggers.Add(Observable.Timer(jobRebroadcastTimeout) + .TakeUntil(pollTimerRestart) + .Select(_ => (true, "Job-Refresh")) .Repeat()); - Jobs = Observable.Merge(sources) - .Select(GetJobParamsForStratum); + Jobs = Observable.Merge(triggers) + .Select(x => Observable.FromAsync(() => UpdateJob(x.Force, x.Via))) + .Concat() + .Where(isNew => isNew) + .Do(_ => hasInitialBlockTemplate = true) + .Select(GetJobParamsForStratum) + .Publish() + .RefCount(); } protected virtual async Task> GetBlockTemplateAsync() @@ -498,6 +495,8 @@ public virtual async Task SubmitShareAsync(StratumClient worker, object s if (share.IsBlockCandidate) { logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{share.BlockHash}] submitted by {minerName}"); + + blockSubmissionSubject.OnNext(Unit.Default); // persist the coinbase transaction-hash to allow the payment processor // to verify later on that the pool has received the reward for the block diff --git a/src/MiningCore/Blockchain/JobManagerBase.cs b/src/MiningCore/Blockchain/JobManagerBase.cs index 6b9af018d..41cd484ae 100644 --- a/src/MiningCore/Blockchain/JobManagerBase.cs +++ b/src/MiningCore/Blockchain/JobManagerBase.cs @@ -20,6 +20,9 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. using System; using System.Globalization; +using System.Reactive; +using System.Reactive.Linq; +using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Autofac; @@ -48,6 +51,8 @@ protected JobManagerBase(IComponentContext ctx) protected object jobLock = new object(); protected ILogger logger; protected PoolConfig poolConfig; + protected bool hasInitialBlockTemplate = false; + protected Subject blockSubmissionSubject = new Subject(); protected virtual string LogCat { get; } = "Job Manager"; diff --git a/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs b/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs index 809d97546..97a13e129 100644 --- a/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs +++ b/src/MiningCore/Blockchain/Monero/MoneroJobManager.cs @@ -320,6 +320,8 @@ public async Task SubmitShareAsync(StratumClient worker, { logger.Info(() => $"[{LogCat}] Daemon accepted block {share.BlockHeight} [{blobHash.Substring(0, 6)}] submitted by {context.MinerName}"); + blockSubmissionSubject.OnNext(Unit.Default); + share.TransactionConfirmationData = blobHash; } @@ -507,11 +509,16 @@ private void ConfigureRewards() protected virtual void SetupJobUpdates() { if (poolConfig.EnableInternalStratum == false) - return; - - var sources = new List>(); - var cancelTimeout = new List>(); + return; + + var blockSubmission = blockSubmissionSubject.Synchronize(); + var pollTimerRestart = blockSubmissionSubject.Synchronize(); + var triggers = new List> + { + blockSubmission.Select(x=> "Block-submission") + }; + // collect ports var zmq = poolConfig.Daemons .Where(x => !string.IsNullOrEmpty(x.Extra.SafeExtensionDataAs()?.ZmqBlockNotifySocket)) @@ -526,58 +533,53 @@ protected virtual void SetupJobUpdates() if (zmq.Count > 0) { - logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); - - var newJobsPubSub = daemon.ZmqSubscribe(zmq, 2) - .Select(frames => - { - // We just take the second frame's raw data and turn it into a hex string. - // If that string changes, we got an update (DistinctUntilChanged) + logger.Info(() => $"[{LogCat}] Subscribing to ZMQ push-updates from {string.Join(", ", zmq.Values)}"); + + var blockNotify = daemon.ZmqSubscribe(zmq, 2) + .Select(frames => + { + // We just take the second frame's raw data and turn it into a hex string. + // If that string changes, we got an update (DistinctUntilChanged) var result = frames[1].ToHexString(); frames.Dispose(); return result; - }) - .DistinctUntilChanged() - .Select(_ => Observable.FromAsync(() => UpdateJob("ZMQ pub/sub"))) - .Concat() - .Publish() - .RefCount(); - - sources.Add(newJobsPubSub); - cancelTimeout.Add(newJobsPubSub); - } - - if (poolConfig.BlockRefreshInterval > 0) - { - // periodically update block-template from daemon - var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling"))) - .Concat() - .Where(isNew => isNew) - .Publish() - .RefCount(); - - sources.Add(newJobsPolled); - cancelTimeout.Add(newJobsPolled); + }) + .DistinctUntilChanged() + .Select(_ => "ZMQ pub/sub") + .Publish() + .RefCount(); + + pollTimerRestart = Observable.Merge( + blockSubmission, + blockNotify.Select(_ => Unit.Default)) + .Publish() + .RefCount(); + + triggers.Add(blockNotify); } - else - { - // poll for the first successful update after which polling is suspended forever - var newJobsPolled = Observable.Interval(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) - .Select(_ => Observable.FromAsync(() => UpdateJob("RPC polling"))) - .Concat() - .Where(isNew => isNew) - .Take(1) - .Publish() - .RefCount(); - - sources.Add(newJobsPolled); - cancelTimeout.Add(newJobsPolled); + if (poolConfig.BlockRefreshInterval > 0) + { + // periodically update block-template + triggers.Add(Observable.Timer(TimeSpan.FromMilliseconds(poolConfig.BlockRefreshInterval)) + .TakeUntil(pollTimerRestart) + .Select(_ => "RPC polling") + .Repeat()); + } + + else + { + // get initial blocktemplate + triggers.Add(Observable.Interval(TimeSpan.FromMilliseconds(1000)) + .Select(_ => "Initial template") + .TakeWhile(_=> !hasInitialBlockTemplate)); } - Blocks = Observable.Merge(sources) + Blocks = Observable.Merge(triggers) + .Select(via => Observable.FromAsync(() => UpdateJob(via))) + .Concat() .Where(isNew => isNew) + .Do(_=> hasInitialBlockTemplate = true) .Select(_ => Unit.Default) .Publish() .RefCount();