-
Notifications
You must be signed in to change notification settings - Fork 56
/
mod.rs
4517 lines (4061 loc) · 179 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! # ChainManager actor
//!
//! This module contains the ChainManager actor which is in charge
//! of managing the blocks and transactions of the Witnet blockchain
//! received through the protocol, and also encapsulates the logic of the
//! _unspent transaction outputs_.
//!
//! Among its responsibilities are the following:
//!
//! * Initializing the chain info upon running the node for the first time and persisting it into storage [StorageManager](actors::storage_manager::StorageManager)
//! * Recovering the chain info from storage and keeping it in its state.
//! * Validating block candidates as they come from a session.
//! * Consolidating multiple block candidates for the same checkpoint into a single valid block.
//! * Putting valid blocks into storage by sending them to the inventory manager actor.
//! * Having a method for letting other components get blocks by *hash* or *checkpoint*.
//! * Having a method for letting other components get the epoch of the current tip of the
//! blockchain (e.g. the last epoch field required for the handshake in the Witnet network
//! protocol).
//! * Validating transactions as they come from any [Session](actors::session::Session). This includes:
//! - Iterating over its inputs, adding the value of the inputs to calculate the value of the transaction.
//! - Running the output scripts, expecting them all to return `TRUE` and leave an empty stack.
//! - Verifying that the sum of all inputs is greater than or equal to the sum of all the outputs.
//! * Keeping valid transactions into memory. This in-memory transaction pool is what we call the _mempool_. Valid transactions are immediately appended to the mempool.
//! * Keeping every unspent transaction output (UTXO) in the block chain in memory. This is called the _UTXO set_.
//! * Updating the UTXO set with valid transactions that have already been anchored into a valid block. This includes:
//! - Removing the UTXOs that the transaction spends as inputs.
//! - Adding a new UTXO for every output in the transaction.
use std::path::PathBuf;
use std::{
cmp::{max, min, Ordering},
collections::{HashMap, HashSet, VecDeque},
convert::TryFrom,
future,
net::SocketAddr,
pin::Pin,
time::Duration,
};
use actix::{
prelude::*, ActorFutureExt, ActorTryFutureExt, AsyncContext, Context, ContextFutureSpawner,
Supervised, SystemService, WrapFuture,
};
use ansi_term::Color::{Purple, White, Yellow};
use derive_more::{Display, Error};
use failure::Fail;
use futures::future::{try_join_all, FutureExt};
use glob::glob;
use itertools::Itertools;
use rand::Rng;
use witnet_config::{
config::Tapi,
defaults::{
PSEUDO_CONSENSUS_CONSTANTS_WIP0022_REWARD_COLLATERAL_RATIO,
PSEUDO_CONSENSUS_CONSTANTS_WIP0027_COLLATERAL_AGE,
},
};
use witnet_crypto::hash::calculate_sha256;
use witnet_data_structures::{
chain::{
penalize_factor,
priority::{Priorities, PriorityEngine, PriorityVisitor},
reputation_issuance,
tapi::{after_second_hard_fork, current_active_wips, in_emergency_period, ActiveWips},
Alpha, AltKeys, Block, BlockHeader, Bn256PublicKey, ChainImport, ChainInfo, ChainState,
CheckpointBeacon, CheckpointVRF, ConsensusConstants, ConsensusConstantsWit2,
DataRequestInfo, DataRequestOutput, DataRequestStage, Epoch, EpochConstants, Hash,
Hashable, InventoryEntry, InventoryItem, NodeStats, PublicKeyHash, Reputation,
ReputationEngine, SignaturesToVerify, StateMachine, SuperBlock, SuperBlockVote,
TransactionsPool,
},
data_request::DataRequestPool,
get_environment, get_protocol_version, get_protocol_version_activation_epoch,
proto::versioning::ProtocolVersion,
radon_error::RadonError,
radon_report::{RadonReport, ReportContext},
register_protocol_version,
staking::prelude::*,
superblock::{ARSIdentities, AddSuperBlockVote, SuperBlockConsensus},
transaction::{RevealTransaction, TallyTransaction, Transaction},
types::{
visitor::{StatefulVisitor, Visitor},
LastBeacon,
},
utxo_pool::{Diff, OwnUnspentOutputsPool, UnspentOutputsPool, UtxoDiff, UtxoWriteBatch},
vrf::VrfCtx,
wit::Wit,
};
use witnet_rad::{error::RadError::TooManyWitnesses, types::RadonTypes};
use witnet_util::timestamp::seconds_to_human_string;
use witnet_validations::{
eligibility::legacy::VrfSlots,
validations::{
compare_block_candidates, dr_transaction_fee, st_transaction_fee, validate_block,
validate_block_transactions, validate_new_transaction, validate_rad_request,
verify_signatures, vt_transaction_fee,
},
};
use crate::{
actors::{
chain_manager::handlers::SYNCED_BANNER,
inventory_manager::InventoryManager,
json_rpc::JsonRpcServer,
messages::{
AddItem, AddItems, AddTransaction, Anycast, BlockNotify, Broadcast, DropOutboundPeers,
GetBlocksEpochRange, GetItemBlock, NodeStatusNotify, RemoveAddressesFromTried,
SendInventoryItem, SendInventoryRequest, SendLastBeacon, SendSuperBlockVote,
SetLastBeacon, SetSuperBlockTargetBeacon, StoreInventoryItem, SuperBlockNotify,
},
node::{NodeOps, PutNodeOps},
peers_manager::PeersManager,
sessions_manager::SessionsManager,
storage_keys,
},
signature_mngr, storage_mngr,
utils::{deserialize_from_file, file_name_compose, stop_system_if_panicking, Force},
};
mod actor;
mod handlers;
/// Block and data request mining
pub mod mining;
/// Maximum blocks number to be sent during synchronization process
pub const MAX_BLOCKS_SYNC: usize = 500;
/// Possible errors when interacting with ChainManager
#[derive(Debug, PartialEq, Eq, Fail)]
pub enum ChainManagerError {
/// A block being processed was already known to this node
#[fail(display = "A block being processed was already known to this node")]
BlockAlreadyExists,
/// A block does not exist
#[fail(display = "A block does not exist")]
BlockDoesNotExist,
/// Optional fields of ChainManager are not properly initialized yet
#[fail(display = "ChainManager is not ready yet. This may self-fix in a little while")]
ChainNotReady,
/// The node attempted to do an action that is only allowed while `ChainManager`
/// is in `Synced` state.
#[fail(
display = "The node is not yet in `Synced` state (current state is {:?})",
current_state
)]
NotSynced {
/// Tells what the current state is, so users can better get an idea of why an action is
/// not possible at this time.
current_state: StateMachine,
},
/// The node is trying to mine a block so commits are not allowed
#[fail(display = "Commit received while node is trying to mine a block")]
TooLateToCommit,
/// The node received a batch of blocks that is inconsistent with the current index
#[fail(
display = "Wrong number of blocks provided {:?} for superblock index {:?} and epoch {:?})",
wrong_index, consolidated_superblock_index, current_superblock_index
)]
WrongBlocksForSuperblock {
/// Tells what the wrong index was
wrong_index: u32,
/// Tells what the current superblock index was
consolidated_superblock_index: u32,
/// Tells what the current epoch was
current_superblock_index: u32,
},
/// Tried to mine block candidates but mining is disabled through configuration.
#[fail(display = "Mining is disabled through configuration")]
MiningIsDisabled,
/// A staking-related error happened.
#[fail(display = "A staking-related error happened: {:?}", _0)]
Staking(StakesError<PublicKeyHash, witnet_data_structures::wit::Wit, Epoch>),
/// The node is not eligible to perform a certain action.
#[fail(display = "The node is not eligible to perform this action")]
NotEligible,
}
/// Synchronization target determined by the beacons received from outbound peers
#[derive(Clone, Copy, Debug)]
pub struct SyncTarget {
// TODO: the target block must be set, but the node will not assume that it is valid
block: CheckpointBeacon,
// The target superblock must always be set. Here we only know the superblock index and hash,
// we do not know the block hash. The block index can be derived from the superblock index.
// This must be a superblock beacon consolidated with more than 2/3 of the votes, and it must be
// irreversibly consolidated when reached.
superblock: CheckpointBeacon,
}
////////////////////////////////////////////////////////////////////////////////////////
// ACTOR BASIC STRUCTURE
////////////////////////////////////////////////////////////////////////////////////////
/// ChainManager actor
#[derive(Default)]
pub struct ChainManager {
/// Blockchain state data structure
chain_state: ChainState,
/// ChainState backup used to reset the state after a reorganization
chain_state_snapshot: ChainStateSnapshot,
/// Current Epoch
current_epoch: Option<Epoch>,
/// Transactions Pool (_mempool_)
transactions_pool: TransactionsPool,
/// Mining enabled
mining_enabled: bool,
/// state of the state machine
sm_state: StateMachine,
/// The best beacon known to this node—to which it will try to catch up
sync_target: Option<SyncTarget>,
/// The superblock hash and superblock according to a majority of peers
sync_superblock: Option<(Hash, SuperBlock)>,
/// The node asked for a batch of blocks on this epoch. This is used to implement a timeout
/// that will move the node back to WaitingConsensus state if it does not receive any AddBlocks
/// message after a certain number of epochs
sync_waiting_for_add_blocks_since: Option<Epoch>,
/// Map that stores candidate blocks for further validation and consolidation as tip of the blockchain
/// (block_hash, block))
candidates: HashMap<Hash, Vec<Block>>,
/// Best candidate
best_candidate: Option<BlockCandidate>,
/// Set that stores all the recently received candidates
seen_candidates: HashSet<Block>,
/// Set that stores all the recently received transactions
seen_transactions: HashSet<Transaction>,
/// Our public key hash, used to create the mint transaction
own_pkh: Option<PublicKeyHash>,
/// Our BLS public key, used to append in commit transactions
bn256_public_key: Option<Bn256PublicKey>,
/// VRF context
vrf_ctx: Option<VrfCtx>,
/// Peers beacons boolean
peers_beacons_received: bool,
/// Consensus parameter (in %)
consensus_c: u32,
/// Constants used to convert between epoch and timestamp
epoch_constants: Option<EpochConstants>,
/// Maximum number of sources to retrieve in a single epoch
data_request_max_retrievals_per_epoch: u16,
/// Timeout for data request retrieval and aggregation execution
data_request_timeout: Option<Duration>,
/// Pending transaction timeout
tx_pending_timeout: u64,
/// Magic number from ConsensusConstants
magic: u16,
/// External mint address
external_address: Option<PublicKeyHash>,
/// Mint Percentage to share with the external address
external_percentage: u8,
/// List of superblock votes received while we are synchronizing
temp_superblock_votes: HashSet<SuperBlockVote>,
/// Commits and reveals to process later
temp_reveals: Vec<RevealTransaction>,
/// Value transfers and data requests to process later
temp_vts_and_drs: VecDeque<Transaction>,
/// Maximum number of recovered transactions to include by epoch
max_reinserted_transactions: usize,
/// Last received Beacons
last_received_beacons: Vec<(SocketAddr, Option<LastBeacon>)>,
/// Last SuperBlock consensus
last_superblock_consensus: Option<CheckpointBeacon>,
/// Settings for Threshold Activation of Protocol Improvements
tapi: Tapi,
/// Transaction priority engine
priority_engine: PriorityEngine,
/// Chain snapshot to be imported
import: Force<ChainImport<ImportError>>,
/// Signals that a chain snapshot export is due.
export: Force<PathBuf>,
/// Consensus constants for wit/2
consensus_constants_wit2: ConsensusConstantsWit2,
}
impl ChainManager {
/// Drop the value of the `import` field.
fn drop_import(&mut self) {
self.import = Force::None;
}
/// Order a chain snapshot export upon starting.
fn put_export(&mut self, export: Force<PathBuf>) {
self.export = export;
}
/// Put a chain export into the `import` field.
fn put_import(&mut self, import: Force<ChainImport<ImportError>>) {
self.import = import;
}
/// Try to read and load a chain snapshot from the filesystem into a ChainManager.
///
/// This method is intentionally best-effort.
pub fn put_import_from_path(&mut self, path: Force<PathBuf>) {
// Deconstruct path and force degree, and abort if there's no path
let (path, force) = match path {
Force::All(path) => (path, true),
Force::Some(path) => (path, false),
Force::None => {
return;
}
};
// A future for reading and deserializing the chain state from a single file
let chain_state_path = path.clone();
let chain_state = Box::pin(async move {
let path_display = chain_state_path.display().to_string();
log::debug!("Trying to read chain state from file {}", path_display);
// Open file, create reader, and decode using bincode
let chain_state =
deserialize_from_file(&chain_state_path).map_err(|e: ImportError| match e {
ImportError::Bincode(_) => ImportError::Deserialize {
path: path_display.clone(),
},
ImportError::Io(_) => ImportError::FileRead {
path: path_display.clone(),
},
e => e,
})?;
Ok(chain_state)
});
// A future for reading and deserializing superblocks from a single file
let superblocks_path = file_name_compose(path.clone(), Some("superblocks".into()));
let superblocks = Box::pin(async move {
// Derive superblocks file path from the base path
let path_display = superblocks_path.display().to_string();
log::debug!("Trying to read superblocks file at {}", path_display);
// Open file, create reader, and decode using bincode
let superblocks: Vec<_> =
deserialize_from_file(&superblocks_path).map_err(|e: ImportError| match e {
ImportError::Bincode(_) => ImportError::Deserialize {
path: path_display.clone(),
},
ImportError::Io(_) => ImportError::FileRead {
path: path_display.clone(),
},
e => e,
})?;
log::info!(
"Read {} superblocks from file {}",
superblocks.len(),
path_display
);
Ok(superblocks)
});
// A vector of futures for reading and deserializing blocks from multiple files
let blocks_path = file_name_compose(path, Some("blocks_batch_*".into()));
let path_display = blocks_path.display().to_string();
let blocks = match glob(&path_display) {
Ok(entries) => {
let mut blocks = Vec::new();
for batch_path in entries.flatten() {
let fut = futurize_batch_read(batch_path);
blocks.push(fut);
}
Ok(blocks)
}
Err(_) => Err(ImportError::FileRead { path: path_display }),
};
let import = Force::new(
ChainImport {
blocks,
chain_state,
superblocks,
},
force,
);
self.put_import(import);
}
}
fn futurize_batch_read(
batch_path: PathBuf,
) -> Pin<
Box<
(dyn futures_util::Future<
Output = Result<Vec<witnet_data_structures::chain::Block>, ImportError>,
> + std::marker::Send
+ 'static),
>,
> {
Box::pin(async move {
let path_display = batch_path.display().to_string();
let batch: Vec<_> =
deserialize_from_file(&batch_path).map_err(|e: ImportError| match e {
ImportError::Bincode(_) => ImportError::Deserialize {
path: path_display.clone(),
},
ImportError::Io(_) => ImportError::FileRead {
path: path_display.clone(),
},
e => e,
})?;
log::info!("Read {} blocks from file {}", batch.len(), path_display);
Ok(batch)
})
}
impl Drop for ChainManager {
fn drop(&mut self) {
log::trace!("Dropping ChainManager");
stop_system_if_panicking("ChainManager");
}
}
impl PutNodeOps for ChainManager {
fn put_node_ops(&mut self, ops: NodeOps) {
self.put_import_from_path(ops.snapshot_import());
self.put_export(ops.snapshot_export());
}
}
/// Wrapper around a block candidate that contains additional metadata regarding
/// needed chain state mutations in case the candidate gets consolidated.
#[derive(Debug)]
pub struct BlockCandidate {
/// Block
pub block: Block,
/// Utxo diff
pub utxo_diff: Diff,
/// Reputation
pub reputation: Reputation,
/// Vrf proof
pub vrf_proof: Hash,
/// Prority info
pub priorities: Priorities,
}
/// Required trait for being able to retrieve ChainManager address from registry
impl Supervised for ChainManager {}
/// Required trait for being able to retrieve ChainManager address from registry
impl SystemService for ChainManager {}
/// Auxiliary methods for ChainManager actor
impl ChainManager {
/// Persist previous chain state into storage
/// None case: persist current chain state into storage (during synchronization)
fn persist_chain_state(
&mut self,
superblock_index: Option<u32>,
) -> ResponseActFuture<Self, Result<(), ()>> {
let previous_chain_state = if let Some(superblock_index) = superblock_index {
let chain_state_snapshot = self.chain_state_snapshot.restore(superblock_index);
if chain_state_snapshot.is_none() {
return Box::pin(actix::fut::ok(()));
}
chain_state_snapshot.unwrap()
} else {
// None case is used to persist chain_state during synchronization
self.chain_state.clone()
};
// When updating the chain state, we need to update the highest superblock checkpoint.
// This is the highest superblock that obtained a majority of votes and we do not want to
// lose it when restoring the state.
let mut state = ChainState {
chain_info: Some(ChainInfo {
highest_superblock_checkpoint: self.get_superblock_beacon(),
..previous_chain_state.chain_info.as_ref().unwrap().clone()
}),
superblock_state: self.chain_state.superblock_state.clone(),
..previous_chain_state
};
let chain_beacon = state.get_chain_beacon();
let superblock_beacon = state.get_superblock_beacon();
if let Some(superblock_index) = superblock_index {
log::debug!(
"Persisting chain state for superblock #{} with chain beacon {:?} and super beacon {:?}",
superblock_index,
chain_beacon,
superblock_beacon
);
assert_eq!(superblock_beacon.checkpoint, superblock_index);
} else {
log::debug!(
"Persisting chain state during synchronization, chain beacon: {:?}",
chain_beacon
);
}
// Update UTXO set:
// * Remove from memory the UTXOs that will be persisted
// * Persist the consolidated UTXOs to the database
self.chain_state
.unspent_outputs_pool
.remove_persisted_from_memory(&state.unspent_outputs_pool.diff);
let mut batch = UtxoWriteBatch::default();
state.unspent_outputs_pool.persist_add_to_batch(&mut batch);
let fut = storage_mngr::put_chain_state_in_batch(
&storage_keys::chain_state_key(self.get_magic()),
&state,
batch,
)
.into_actor(self)
.and_then(|_, _, _| {
log::debug!("Successfully persisted previous_chain_state into storage");
fut::ok(())
})
.map_err(|err, _, _| {
log::error!(
"Failed to persist previous_chain_state into storage: {}",
err
)
});
Box::pin(fut)
}
/// Persist an empty `ChainState` to the storage and set the node to `WaitingConsensus`.
/// This can be used to recover from a forked chain without manually deleting the storage.
fn delete_chain_state_and_reinitialize(&mut self) -> ResponseActFuture<Self, Result<(), ()>> {
// Delete all the UTXOs from the database
let mut batch = UtxoWriteBatch::default();
self.chain_state
.unspent_outputs_pool
.delete_all_from_db_batch(&mut batch);
let empty_state = ChainState::default();
let fut = storage_mngr::put_chain_state_in_batch(
&storage_keys::chain_state_key(self.get_magic()),
&empty_state,
batch,
)
.into_actor(self)
.map_err(|err, _, _| {
log::error!("Failed to persist empty chain state into storage: {}", err);
})
.and_then(|(), act, ctx| {
log::info!("Successfully persisted empty chain state into storage");
act.update_state_machine(StateMachine::WaitingConsensus, ctx);
act.initialize_from_storage_fut(true)
});
Box::pin(fut)
}
/// Resynchronize block chain using a list of blocks that are already in the storage.
///
/// The blocks are assumed to be valid, so validations are skipped, and block metadata is not
/// persisted to the storage because it is assumed to already be there.
fn resync_from_storage<F>(
&mut self,
mut block_list: VecDeque<(Epoch, Hash)>,
ctx: &mut Context<Self>,
done: F,
) where
F: FnOnce(&mut Self, &mut Context<Self>) + 'static,
{
if block_list.is_empty() {
// Done, all the blocks have been processed
done(self, ctx);
// Early return
return;
}
let last_epoch = block_list.back().unwrap().0;
let (epoch, hash) = block_list.pop_front().unwrap();
let inventory_manager_addr = InventoryManager::from_registry();
inventory_manager_addr
.send(GetItemBlock { hash })
.into_actor(self)
.map(move |res, act, ctx| {
match res {
Ok(Ok(block)) => {
log::info!(
"REWIND [{}/{}] Got block {} from storage",
epoch,
last_epoch,
hash
);
act.process_requested_block(ctx, block, true)
.expect("resync from storage fail");
// We need to persist the chain state periodically, otherwise the entire
// UTXO set will be in memory, consuming a huge amount of memory.
if block_list.len() % 1000 == 0 {
act.persist_chain_state(None)
.map(|_res: Result<(), ()>, _act, _ctx| ())
.wait(ctx);
}
// Recursion
act.resync_from_storage(block_list, ctx, done);
}
Ok(Err(e)) => {
panic!("{:?}", e);
}
Err(e) => {
panic!("{:?}", e);
}
}
})
.spawn(ctx);
}
/// Replace `previous_chain_state` with current `chain_state`
fn move_chain_state_forward(&mut self, superblock_index: u32) {
self.chain_state_snapshot
.take(superblock_index, &self.chain_state);
}
/// Method to Send items to Inventory Manager
fn persist_items(
&self,
items: Vec<StoreInventoryItem>,
) -> ResponseActFuture<Self, Result<(), failure::Error>> {
// Get InventoryManager address
let inventory_manager_addr = InventoryManager::from_registry();
// Persist block into storage through InventoryManager.
Box::pin(
inventory_manager_addr
.send(AddItems { items })
.into_actor(self)
.map_ok(|_, _, _| {
// Upon success, ignore any response and simply let the future resolve
})
.map_err(|err, _, _| {
// Error when sending message
log::error!("Unsuccessful communication with InventoryManager: {}", err);
err.into()
}),
)
}
/// Method to persist a Data Request into the Storage
fn persist_data_requests(&self, ctx: &mut Context<Self>, dr_infos: Vec<DataRequestInfo>) {
let kvs: Vec<_> = dr_infos
.into_iter()
.map(|dr_info| {
let dr_pointer = &dr_info.tally.as_ref().unwrap().dr_pointer;
let dr_pointer_string = format!("DR-REPORT-{}", dr_pointer);
(dr_pointer_string, dr_info)
})
.collect();
let kvs_len = kvs.len();
storage_mngr::put_batch(&kvs)
.into_actor(self)
.map_err(|e, _, _| {
log::error!("Failed to persist data request report into storage: {}", e)
})
.and_then(move |_, _, _| {
log::trace!(
"Successfully persisted reports for {} data requests into storage",
kvs_len
);
fut::ok(())
})
.map(|_res: Result<(), ()>, _act, _ctx| ())
.wait(ctx);
}
fn broadcast_item(&self, item: InventoryItem) {
// Get SessionsManager address
let sessions_manager_addr = SessionsManager::from_registry();
sessions_manager_addr.do_send(Broadcast {
command: SendInventoryItem { item },
only_inbound: false,
});
}
fn process_requested_block(
&mut self,
ctx: &mut Context<Self>,
block: Block,
resynchronizing: bool,
) -> Result<(), failure::Error> {
if let (Some(epoch_constants), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = (
self.epoch_constants,
self.chain_state.chain_info.as_ref(),
self.chain_state.reputation_engine.as_ref(),
self.vrf_ctx.as_mut(),
) {
if self.current_epoch.is_none() {
log::trace!("Called process_requested_block when current_epoch is None");
}
if self.chain_state.unspent_outputs_pool.db.is_none() {
panic!("NO UTXO DB");
}
let block_number = self.chain_state.block_number();
let mut vrf_input = chain_info.highest_vrf_output;
vrf_input.checkpoint = block.block_header.beacon.checkpoint;
let active_wips = ActiveWips {
active_wips: self.chain_state.tapi_engine.wip_activation.clone(),
block_epoch: block.block_header.beacon.checkpoint,
};
let mut transaction_visitor = PriorityVisitor::default();
let protocol_version = get_protocol_version(self.current_epoch);
let utxo_diff = process_validations(
&block,
self.current_epoch.unwrap_or_default(),
vrf_input,
chain_info.highest_block_checkpoint,
rep_engine,
epoch_constants,
&self.chain_state.unspent_outputs_pool,
&mut self.chain_state.data_request_pool,
vrf_ctx,
block_number,
&chain_info.consensus_constants,
&self.consensus_constants_wit2,
resynchronizing,
&active_wips,
Some(&mut transaction_visitor),
&self.chain_state.stakes,
protocol_version,
)?;
// Extract the collected priorities from the internal state of the visitor
let priorities = transaction_visitor.take_state();
// Persist block and update ChainState
self.consolidate_block(ctx, block, utxo_diff, priorities, resynchronizing);
Ok(())
} else {
Err(ChainManagerError::ChainNotReady.into())
}
}
#[allow(clippy::map_entry)]
fn process_candidate(&mut self, block: Block) {
if let (Some(current_epoch), Some(chain_info), Some(rep_engine), Some(vrf_ctx)) = (
self.current_epoch,
self.chain_state.chain_info.as_ref(),
self.chain_state.reputation_engine.as_ref(),
self.vrf_ctx.as_mut(),
) {
// To continue processing, received block epoch should equal to `current_epoch` or `current_epoch + 1`
if !(block.block_header.beacon.checkpoint == current_epoch
|| block.block_header.beacon.checkpoint == current_epoch + 1)
{
log::debug!(
"Ignoring received block candidate because its beacon shows an old epoch ({}). The current epoch is {}.",
block.block_header.beacon.checkpoint,
current_epoch,
);
return;
}
let hash_block = block.hash();
// If this candidate has not been seen before, validate it
if !self.seen_candidates.contains(&block) {
self.seen_candidates.insert(block.clone());
if self.sm_state == StateMachine::WaitingConsensus
|| self.sm_state == StateMachine::Synchronizing
{
self.candidates
.entry(hash_block)
.or_default()
.push(block.clone());
// If the node is not synced, broadcast recent candidates without validating them
self.broadcast_item(InventoryItem::Block(block));
return;
}
let mut vrf_input = chain_info.highest_vrf_output;
vrf_input.checkpoint = current_epoch;
let active_wips = ActiveWips {
active_wips: self.chain_state.tapi_engine.wip_activation.clone(),
block_epoch: block.block_header.beacon.checkpoint,
};
let target_vrf_slots = VrfSlots::from_rf(
u32::try_from(rep_engine.ars().active_identities_number()).unwrap(),
chain_info.consensus_constants.mining_replication_factor,
chain_info.consensus_constants.mining_backup_factor,
block.block_header.beacon.checkpoint,
chain_info.consensus_constants.minimum_difficulty,
chain_info
.consensus_constants
.epochs_with_minimum_difficulty,
&active_wips,
);
let block_pkh = &block.block_sig.public_key.pkh();
let reputation = rep_engine.trs().get(block_pkh);
let is_active = rep_engine.ars().contains(block_pkh);
let vrf_proof = match block.block_header.proof.proof.proof_to_hash(vrf_ctx) {
Ok(vrf) => vrf,
Err(e) => {
log::warn!(
"Block candidate has an invalid mining eligibility proof: {}",
e
);
// In order to do not block possible validate candidates in AlmostSynced
// state, we would broadcast the errors too
if self.sm_state == StateMachine::AlmostSynced {
self.broadcast_item(InventoryItem::Block(block));
}
return;
}
};
let protocol_version =
ProtocolVersion::from_epoch(block.block_header.beacon.checkpoint);
if let Some(best_candidate) = &self.best_candidate {
let best_hash = best_candidate.block.hash();
let best_pkh = best_candidate.block.block_sig.public_key.pkh();
let best_candidate_is_active =
if after_second_hard_fork(current_epoch, get_environment()) {
rep_engine.ars().contains(&best_pkh)
} else {
// In case of being before to second hard fork we would use the same bool
// than the other to avoid the "activeness" comparison
is_active
};
let power = self
.chain_state
.stakes
.query_power(
*block_pkh,
Capability::Mining,
block.block_header.beacon.checkpoint,
)
.unwrap_or(0);
let best_candidate_power = self
.chain_state
.stakes
.query_power(
best_pkh,
Capability::Mining,
best_candidate.block.block_header.beacon.checkpoint,
)
.unwrap_or(0);
if compare_block_candidates(
hash_block,
reputation,
vrf_proof,
is_active,
power,
best_hash,
best_candidate.reputation,
best_candidate.vrf_proof,
best_candidate_is_active,
best_candidate_power,
&target_vrf_slots,
protocol_version,
) != Ordering::Greater
{
log::debug!("Ignoring new block candidate ({}) because a better one ({}) has been already validated", hash_block, best_hash);
return;
}
}
// This visitor will be used to derive a `Priorities` value from the transactions
// in this block candidate.
let mut transaction_visitor = PriorityVisitor::default();
let block_number = self.chain_state.block_number();
match process_validations(
&block,
current_epoch,
vrf_input,
chain_info.highest_block_checkpoint,
rep_engine,
self.epoch_constants.unwrap(),
&self.chain_state.unspent_outputs_pool,
&mut self.chain_state.data_request_pool,
// The unwrap is safe because if there is no VRF context,
// the actor should have stopped execution
self.vrf_ctx.as_mut().expect("No initialized VRF context"),
block_number,
&chain_info.consensus_constants,
&self.consensus_constants_wit2,
false,
&active_wips,
Some(&mut transaction_visitor),
&self.chain_state.stakes,
protocol_version,
) {
Ok(utxo_diff) => {
let priorities = transaction_visitor.take_state();
self.best_candidate = Some(BlockCandidate {
block: block.clone(),
utxo_diff,
reputation,
vrf_proof,
priorities,
});
self.broadcast_item(InventoryItem::Block(block));
}
Err(e) => {
log::warn!(
"Error when processing a block candidate {}: {}",
hash_block,
e
);
// In order to do not block possible validate candidates in AlmostSynced
// state, we would broadcast the errors too
if self.sm_state == StateMachine::AlmostSynced {
self.broadcast_item(InventoryItem::Block(block));
}
}
}
} else {
log::trace!("Block candidate already seen: {}", hash_block);
}
} else {
log::warn!("ChainManager doesn't have current epoch");
}
}
fn persist_blocks_batch(&self, ctx: &mut Context<Self>, blocks: Vec<Block>) {
let mut to_persist = Vec::with_capacity(blocks.len());
for block in blocks {
to_persist.push(StoreInventoryItem::Block(Box::new(block)));
}
ctx.wait(self.persist_items(to_persist).map(|_, _, _| ()));
}
fn consolidate_block(
&mut self,
ctx: &mut Context<Self>,
block: Block,
utxo_diff: Diff,
priorities: Priorities,
resynchronizing: bool,
) {
// Update chain_info and reputation_engine
let own_pkh = match self.own_pkh {
Some(x) => x,
None => {
log::error!("No OwnPkh loaded in ChainManager");
return;
}
};
let current_epoch = if let Some(epoch) = self.current_epoch {
epoch
} else {
// If there is no epoch set, it's because the chain is yet to be bootstrapped, or because of a data race
match self.chain_state.chain_info.as_ref() {
// If the chain is yet to be bootstrapped (the block we are processing is the genesis block), set the epoch to zero
Some(chain_info) if chain_info.consensus_constants.genesis_hash == block.hash() => {
0
}
// In case of data race, shortcut the function
_ => {
log::error!("Current epoch not loaded in ChainManager");
return;
}
}
};
match self.chain_state {
ChainState {
chain_info: Some(ref mut chain_info),
reputation_engine: Some(ref mut reputation_engine),
ref mut stakes,
..
} => {
let block_hash = block.hash();
let block_epoch = block.block_header.beacon.checkpoint;
let block_signals = block.block_header.signals;
let validator_count = stakes.validator_count();
// Update `highest_block_checkpoint`
let beacon = CheckpointBeacon {
checkpoint: block_epoch,
hash_prev_block: block_hash,
};
// Get VRF context
let vrf_ctx = match self.vrf_ctx.as_mut() {
Some(x) => x,
None => {
log::error!("No VRF context available");
return;
}
};