This repository has been archived by the owner on Nov 6, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
mod.rs
1199 lines (1005 loc) · 36.3 KB
/
mod.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2015-2020 Parity Technologies (UK) Ltd.
// This file is part of Open Ethereum.
// Open Ethereum is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
// Open Ethereum is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with Open Ethereum. If not, see <http://www.gnu.org/licenses/>.
//! PLP Protocol Version 1 implementation.
//!
//! This uses a "Provider" to answer requests.
use common_types::transaction::UnverifiedTransaction;
use ethereum_types::{H256, U256};
use io::TimerToken;
use kvdb::DBValue;
use network::{NetworkProtocolHandler, NetworkContext, PeerId};
use parking_lot::{Mutex, RwLock};
use provider::Provider;
use request::{Request, NetworkRequests as Requests, Response};
use rlp::{RlpStream, Rlp};
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::ops::{BitOr, BitAnd, Not};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use self::request_credits::{Credits, FlowParams};
use self::context::{Ctx, TickCtx};
use self::error::Punishment;
use self::load_timer::{LoadDistribution, NullStore, MOVING_SAMPLE_SIZE};
use self::request_set::RequestSet;
use self::id_guard::IdGuard;
mod context;
mod error;
mod load_timer;
mod status;
mod request_set;
#[cfg(test)]
mod tests;
pub mod request_credits;
pub use self::context::{BasicContext, EventContext, IoContext};
pub use self::error::Error;
pub use self::load_timer::{SampleStore, FileStore};
pub use self::status::{Status, Capabilities, Announcement};
const TIMEOUT: TimerToken = 0;
const TIMEOUT_INTERVAL: Duration = Duration::from_secs(1);
const TICK_TIMEOUT: TimerToken = 1;
const TICK_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const PROPAGATE_TIMEOUT: TimerToken = 2;
const PROPAGATE_TIMEOUT_INTERVAL: Duration = Duration::from_secs(5);
const RECALCULATE_COSTS_TIMEOUT: TimerToken = 3;
const RECALCULATE_COSTS_INTERVAL: Duration = Duration::from_secs(60 * 60);
const STATISTICS_TIMEOUT: TimerToken = 4;
const STATISTICS_INTERVAL: Duration = Duration::from_secs(15);
/// Maximum load share for the light server
pub const MAX_LIGHTSERV_LOAD: f64 = 0.5;
/// Factor to multiply leecher count to cater for
/// extra sudden connections (should be >= 1.0)
pub const LEECHER_COUNT_FACTOR: f64 = 1.25;
// minimum interval between updates.
const UPDATE_INTERVAL: Duration = Duration::from_millis(5000);
/// Packet count for PIP.
const PACKET_COUNT_V1: u8 = 9;
/// Supported protocol versions.
pub const PROTOCOL_VERSIONS: &[(u8, u8)] = &[
(1, PACKET_COUNT_V1),
];
/// Max protocol version.
pub const MAX_PROTOCOL_VERSION: u8 = 1;
// packet ID definitions.
mod packet {
// the status packet.
pub const STATUS: u8 = 0x00;
// announcement of new block hashes or capabilities.
pub const ANNOUNCE: u8 = 0x01;
// request and response.
pub const REQUEST: u8 = 0x02;
pub const RESPONSE: u8 = 0x03;
// request credits update and acknowledgement.
pub const UPDATE_CREDITS: u8 = 0x04;
pub const ACKNOWLEDGE_UPDATE: u8 = 0x05;
// relay transactions to peers.
pub const SEND_TRANSACTIONS: u8 = 0x06;
// two packets were previously meant to be reserved for epoch proofs.
// these have since been moved to requests.
}
// timeouts for different kinds of requests. all values are in milliseconds.
mod timeout {
use std::time::Duration;
pub const HANDSHAKE: Duration = Duration::from_millis(4_000);
pub const ACKNOWLEDGE_UPDATE: Duration = Duration::from_millis(5_000);
pub const BASE: u64 = 2_500; // base timeout for packet.
// timeouts per request within packet.
pub const HEADERS: u64 = 250; // per header?
pub const TRANSACTION_INDEX: u64 = 100;
pub const BODY: u64 = 50;
pub const RECEIPT: u64 = 50;
pub const PROOF: u64 = 100; // state proof
pub const CONTRACT_CODE: u64 = 100;
pub const HEADER_PROOF: u64 = 100;
pub const TRANSACTION_PROOF: u64 = 1000; // per gas?
pub const EPOCH_SIGNAL: u64 = 200;
}
/// A request id.
#[cfg(not(test))]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct ReqId(usize);
#[cfg(test)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Ord, PartialOrd, Hash)]
pub struct ReqId(pub usize);
impl fmt::Display for ReqId {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Request #{}", self.0)
}
}
// A pending peer: one we've sent our status to but
// may not have received one for.
struct PendingPeer {
sent_head: H256,
last_update: Instant,
}
/// Relevant data to each peer. Not accessible publicly, only `pub` due to
/// limitations of the privacy system.
pub struct Peer {
local_credits: Credits, // their credits relative to us
status: Status,
capabilities: Capabilities,
remote_flow: Option<(Credits, FlowParams)>,
sent_head: H256, // last chain head we've given them.
last_update: Instant,
pending_requests: RequestSet,
failed_requests: Vec<ReqId>,
propagated_transactions: HashSet<H256>,
skip_update: bool,
local_flow: Arc<FlowParams>,
awaiting_acknowledge: Option<(Instant, Arc<FlowParams>)>,
}
/// Whether or not a peer was kept by a handler
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PeerStatus {
/// The peer was kept
Kept,
/// The peer was not kept
Unkept,
}
impl Not for PeerStatus {
type Output = Self;
fn not(self) -> Self {
use self::PeerStatus::*;
match self {
Kept => Unkept,
Unkept => Kept,
}
}
}
impl BitAnd for PeerStatus {
type Output = Self;
fn bitand(self, other: Self) -> Self {
use self::PeerStatus::*;
match (self, other) {
(Kept, Kept) => Kept,
_ => Unkept,
}
}
}
impl BitOr for PeerStatus {
type Output = Self;
fn bitor(self, other: Self) -> Self {
use self::PeerStatus::*;
match (self, other) {
(_, Kept) | (Kept, _) => Kept,
_ => Unkept,
}
}
}
/// A light protocol event handler.
///
/// Each handler function takes a context which describes the relevant peer
/// and gives references to the IO layer and protocol structure so new messages
/// can be dispatched immediately.
///
/// Request responses are not guaranteed to be complete or valid, but passed IDs will be correct.
/// Response handlers are not given a copy of the original request; it is assumed
/// that relevant data will be stored by interested handlers.
pub trait Handler: Send + Sync {
/// Called when a peer connects.
fn on_connect(
&self,
_ctx: &dyn EventContext,
_status: &Status,
_capabilities: &Capabilities
) -> PeerStatus { PeerStatus::Kept }
/// Called when a peer disconnects, with a list of unfulfilled request IDs as
/// of yet.
fn on_disconnect(&self, _ctx: &dyn EventContext, _unfulfilled: &[ReqId]) { }
/// Called when a peer makes an announcement.
fn on_announcement(&self, _ctx: &dyn EventContext, _announcement: &Announcement) { }
/// Called when a peer requests relay of some transactions.
fn on_transactions(&self, _ctx: &dyn EventContext, _relay: &[UnverifiedTransaction]) { }
/// Called when a peer responds to requests.
/// Responses not guaranteed to contain valid data and are not yet checked against
/// the requests they correspond to.
fn on_responses(&self, _ctx: &dyn EventContext, _req_id: ReqId, _responses: &[Response]) { }
/// Called when a peer responds with a transaction proof. Each proof is a vector of state items.
fn on_transaction_proof(&self, _ctx: &dyn EventContext, _req_id: ReqId, _state_items: &[DBValue]) { }
/// Called to "tick" the handler periodically.
fn tick(&self, _ctx: &dyn BasicContext) { }
/// Called on abort. This signals to handlers that they should clean up
/// and ignore peers.
// TODO: coreresponding `on_activate`?
fn on_abort(&self) { }
}
/// Configuration.
pub struct Config {
/// How many stored seconds of credits peers should be able to accumulate.
pub max_stored_seconds: u64,
/// The network config median peers (used as default peer count)
pub median_peers: f64,
}
impl Default for Config {
fn default() -> Self {
const MEDIAN_PEERS: f64 = 25.0;
const MAX_ACCUMULATED: u64 = 60 * 5; // only charge for 5 minutes.
Config {
max_stored_seconds: MAX_ACCUMULATED,
median_peers: MEDIAN_PEERS,
}
}
}
/// Protocol initialization parameters.
pub struct Params {
/// Network id.
pub network_id: u64,
/// Config.
pub config: Config,
/// Initial capabilities.
pub capabilities: Capabilities,
/// The sample store (`None` if data shouldn't persist between runs).
pub sample_store: Option<Box<dyn SampleStore>>,
}
/// Type alias for convenience.
pub type PeerMap = HashMap<PeerId, Mutex<Peer>>;
mod id_guard {
use network::PeerId;
use parking_lot::RwLockReadGuard;
use super::{PeerMap, ReqId};
// Guards success or failure of given request.
// On drop, inserts the req_id into the "failed requests"
// set for the peer unless defused. In separate module to enforce correct usage.
pub struct IdGuard<'a> {
peers: RwLockReadGuard<'a, PeerMap>,
peer_id: PeerId,
req_id: ReqId,
active: bool,
}
impl<'a> IdGuard<'a> {
/// Create a new `IdGuard`, which will prevent access of the inner ReqId
/// (for forming responses, triggering handlers) until defused
pub fn new(peers: RwLockReadGuard<'a, PeerMap>, peer_id: PeerId, req_id: ReqId) -> Self {
IdGuard {
peers,
peer_id,
req_id,
active: true,
}
}
/// Defuse the guard, signalling that the request has been successfully decoded.
pub fn defuse(mut self) -> ReqId {
// can't use the mem::forget trick here since we need the
// read guard to drop.
self.active = false;
self.req_id
}
}
impl<'a> Drop for IdGuard<'a> {
fn drop(&mut self) {
if !self.active { return }
if let Some(p) = self.peers.get(&self.peer_id) {
p.lock().failed_requests.push(self.req_id);
}
}
}
}
/// Provides various statistics that could
/// be used to compute costs
pub struct Statistics {
/// Samples of peer count
peer_counts: VecDeque<usize>,
}
impl Statistics {
/// Create a new Statistics instance
pub fn new() -> Self {
Statistics {
peer_counts: VecDeque::with_capacity(MOVING_SAMPLE_SIZE),
}
}
/// Add a new peer_count sample
pub fn add_peer_count(&mut self, peer_count: usize) {
while self.peer_counts.len() >= MOVING_SAMPLE_SIZE {
self.peer_counts.pop_front();
}
self.peer_counts.push_back(peer_count);
}
/// Get the average peer count from previous samples. Is always >= 1.0
pub fn avg_peer_count(&self) -> f64 {
let len = self.peer_counts.len();
if len == 0 {
return 1.0;
}
let avg = self.peer_counts.iter()
.fold(0, |sum: u32, &v| sum.saturating_add(v as u32)) as f64
/ len as f64;
avg.max(1.0)
}
}
/// This is an implementation of the light ethereum network protocol, abstracted
/// over a `Provider` of data and a p2p network.
///
/// This is simply designed for request-response purposes. Higher level uses
/// of the protocol, such as synchronization, will function as wrappers around
/// this system.
//
// LOCK ORDER:
// Locks must be acquired in the order declared, and when holding a read lock
// on the peers, only one peer may be held at a time.
pub struct LightProtocol {
provider: Arc<dyn Provider>,
config: Config,
genesis_hash: H256,
network_id: u64,
pending_peers: RwLock<HashMap<PeerId, PendingPeer>>,
peers: RwLock<PeerMap>,
capabilities: RwLock<Capabilities>,
flow_params: RwLock<Arc<FlowParams>>,
free_flow_params: Arc<FlowParams>,
handlers: Vec<Arc<dyn Handler>>,
req_id: AtomicUsize,
sample_store: Box<dyn SampleStore>,
load_distribution: LoadDistribution,
statistics: RwLock<Statistics>,
}
impl LightProtocol {
/// Create a new instance of the protocol manager.
pub fn new(provider: Arc<dyn Provider>, params: Params) -> Self {
debug!(target: "pip", "Initializing light protocol handler");
let genesis_hash = provider.chain_info().genesis_hash;
let sample_store = params.sample_store.unwrap_or_else(|| Box::new(NullStore));
let load_distribution = LoadDistribution::load(&*sample_store);
// Default load share relative to median peers
let load_share = MAX_LIGHTSERV_LOAD / params.config.median_peers;
let flow_params = FlowParams::from_request_times(
|kind| load_distribution.expected_time(kind),
load_share,
Duration::from_secs(params.config.max_stored_seconds),
);
LightProtocol {
provider,
config: params.config,
genesis_hash,
network_id: params.network_id,
pending_peers: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
capabilities: RwLock::new(params.capabilities),
flow_params: RwLock::new(Arc::new(flow_params)),
free_flow_params: Arc::new(FlowParams::free()),
handlers: Vec::new(),
req_id: AtomicUsize::new(0),
sample_store,
load_distribution,
statistics: RwLock::new(Statistics::new()),
}
}
/// Attempt to get peer status.
pub fn peer_status(&self, peer: PeerId) -> Option<Status> {
self.peers.read().get(&peer)
.map(|peer| peer.lock().status.clone())
}
/// Get number of (connected, active) peers.
pub fn peer_count(&self) -> (usize, usize) {
let num_pending = self.pending_peers.read().len();
let peers = self.peers.read();
(
num_pending + peers.len(),
peers.values().filter(|p| !p.lock().pending_requests.is_empty()).count(),
)
}
/// Get the number of active light peers downloading from the
/// node
pub fn leecher_count(&self) -> usize {
let credit_limit = *self.flow_params.read().limit();
// Count the number of peers that used some credit
self.peers.read().iter()
.filter(|(_, p)| p.lock().local_credits.current() < credit_limit)
.count()
}
/// Make a request to a peer.
///
/// Fails on: nonexistent peer, network error, peer not server,
/// insufficient credits. Does not check capabilities before sending.
/// On success, returns a request id which can later be coordinated
/// with an event.
pub fn request_from(&self, io: &dyn IoContext, peer_id: PeerId, requests: Requests) -> Result<ReqId, Error> {
let peers = self.peers.read();
let peer = match peers.get(&peer_id) {
Some(peer) => peer,
None => return Err(Error::UnknownPeer),
};
let mut peer = peer.lock();
let peer = &mut *peer;
match peer.remote_flow {
None => Err(Error::NotServer),
Some((ref mut creds, ref params)) => {
// apply recharge to credits if there's no pending requests.
if peer.pending_requests.is_empty() {
params.recharge(creds);
}
// compute and deduct cost.
let pre_creds = creds.current();
let cost = match params.compute_cost_multi(requests.requests()) {
Some(cost) => cost,
None => return Err(Error::NotServer),
};
creds.deduct_cost(cost)?;
trace!(target: "pip", "requesting from peer {}. Cost: {}; Available: {}",
peer_id, cost, pre_creds);
let req_id = ReqId(self.req_id.fetch_add(1, Ordering::SeqCst));
io.send(peer_id, packet::REQUEST, {
let mut stream = RlpStream::new_list(2);
stream.append(&req_id.0).append_list(&requests.requests());
stream.out()
});
// begin timeout.
peer.pending_requests.insert(req_id, requests, cost, Instant::now());
Ok(req_id)
}
}
}
/// Make an announcement of new chain head and capabilities to all peers.
/// The announcement is expected to be valid.
pub fn make_announcement(&self, io: &dyn IoContext, mut announcement: Announcement) {
let mut reorgs_map = HashMap::new();
let now = Instant::now();
// update stored capabilities
self.capabilities.write().update_from(&announcement);
// calculate reorg info and send packets
for (peer_id, peer_info) in self.peers.read().iter() {
let mut peer_info = peer_info.lock();
// TODO: "urgent" announcements like new blocks?
// the timer approach will skip 1 (possibly 2) in rare occasions.
if peer_info.sent_head == announcement.head_hash ||
peer_info.status.head_num >= announcement.head_num ||
// fix for underflow reported in
// https://github.com/openethereum/openethereum/issues/10419
now < peer_info.last_update ||
now - peer_info.last_update < UPDATE_INTERVAL {
continue
}
peer_info.last_update = now;
let reorg_depth = reorgs_map.entry(peer_info.sent_head)
.or_insert_with(|| {
match self.provider.reorg_depth(&announcement.head_hash, &peer_info.sent_head) {
Some(depth) => depth,
None => {
// both values will always originate locally -- this means something
// has gone really wrong
debug!(target: "pip", "couldn't compute reorganization depth between {:?} and {:?}",
&announcement.head_hash, &peer_info.sent_head);
0
}
}
});
peer_info.sent_head = announcement.head_hash;
announcement.reorg_depth = *reorg_depth;
io.send(*peer_id, packet::ANNOUNCE, status::write_announcement(&announcement));
}
}
/// Add an event handler.
///
/// These are intended to be added when the protocol structure
/// is initialized as a means of customizing its behavior,
/// and dispatching requests immediately upon events.
pub fn add_handler(&mut self, handler: Arc<dyn Handler>) {
self.handlers.push(handler);
}
/// Signal to handlers that network activity is being aborted
/// and clear peer data.
pub fn abort(&self) {
for handler in &self.handlers {
handler.on_abort();
}
// acquire in order and hold.
let mut pending_peers = self.pending_peers.write();
let mut peers = self.peers.write();
pending_peers.clear();
peers.clear();
}
// Does the common pre-verification of responses before the response itself
// is actually decoded:
// - check whether peer exists
// - check whether request was made
// - check whether request kinds match
fn pre_verify_response(&self, peer: PeerId, raw: &Rlp) -> Result<IdGuard, Error> {
let req_id = ReqId(raw.val_at(0)?);
let cur_credits: U256 = raw.val_at(1)?;
trace!(target: "pip", "pre-verifying response for {} from peer {}", req_id, peer);
let peers = self.peers.read();
let res = match peers.get(&peer) {
Some(peer_info) => {
let mut peer_info = peer_info.lock();
let peer_info: &mut Peer = &mut *peer_info;
let req_info = peer_info.pending_requests.remove(req_id, Instant::now());
let last_batched = peer_info.pending_requests.is_empty();
let flow_info = peer_info.remote_flow.as_mut();
match (req_info, flow_info) {
(Some(_), Some(flow_info)) => {
let &mut (ref mut c, ref mut flow) = flow_info;
// only update if the cumulative cost of the request set is zero.
// and this response wasn't from before request costs were updated.
if !peer_info.skip_update && last_batched {
let actual_credits = ::std::cmp::min(cur_credits, *flow.limit());
c.update_to(actual_credits);
}
if last_batched { peer_info.skip_update = false }
Ok(())
}
(None, _) => Err(Error::UnsolicitedResponse),
(_, None) => Err(Error::NotServer), // really should be impossible.
}
}
None => Err(Error::UnknownPeer), // probably only occurs in a race of some kind.
};
res.map(|_| IdGuard::new(peers, peer, req_id))
}
/// Handle a packet using the given io context.
/// Packet data is _untrusted_, which means that invalid data won't lead to
/// issues.
pub fn handle_packet(&self, io: &dyn IoContext, peer: PeerId, packet_id: u8, data: &[u8]) {
let rlp = Rlp::new(data);
trace!(target: "pip", "Incoming packet {} from peer {}", packet_id, peer);
// handle the packet
let res = match packet_id {
packet::STATUS => self.status(peer, io, &rlp),
packet::ANNOUNCE => self.announcement(peer, io, &rlp),
packet::REQUEST => self.request(peer, io, &rlp),
packet::RESPONSE => self.response(peer, io, &rlp),
packet::UPDATE_CREDITS => self.update_credits(peer, io, &rlp),
packet::ACKNOWLEDGE_UPDATE => self.acknowledge_update(peer, io, &rlp),
packet::SEND_TRANSACTIONS => self.relay_transactions(peer, io, &rlp),
other => {
Err(Error::UnrecognizedPacket(other))
}
};
if let Err(e) = res {
punish(peer, io, &e);
}
}
// check timeouts and punish peers.
fn timeout_check(&self, io: &dyn IoContext) {
let now = Instant::now();
// handshake timeout
{
let mut pending = self.pending_peers.write();
let slowpokes: Vec<_> = pending.iter()
.filter(|&(_, ref peer)| {
peer.last_update + timeout::HANDSHAKE <= now
})
.map(|(&p, _)| p)
.collect();
for slowpoke in slowpokes {
debug!(target: "pip", "Peer {} handshake timed out", slowpoke);
pending.remove(&slowpoke);
io.disconnect_peer(slowpoke);
}
}
// request and update ack timeouts
let ack_duration = timeout::ACKNOWLEDGE_UPDATE;
{
for (peer_id, peer) in self.peers.read().iter() {
let peer = peer.lock();
if peer.pending_requests.check_timeout(now) {
debug!(target: "pip", "Peer {} request timeout", peer_id);
io.disconnect_peer(*peer_id);
}
if let Some((ref start, _)) = peer.awaiting_acknowledge {
if *start + ack_duration <= now {
debug!(target: "pip", "Peer {} update acknowledgement timeout", peer_id);
io.disconnect_peer(*peer_id);
}
}
}
}
}
// propagate transactions to relay peers.
// if we aren't on the mainnet, we just propagate to all relay peers
fn propagate_transactions(&self, io: &dyn IoContext) {
if self.capabilities.read().tx_relay { return }
let ready_transactions = self.provider.transactions_to_propagate();
if ready_transactions.is_empty() { return }
trace!(target: "pip", "propagate transactions: {} ready", ready_transactions.len());
let all_transaction_hashes: HashSet<_> = ready_transactions.iter().map(|tx| tx.hash()).collect();
let mut buf = Vec::new();
let peers = self.peers.read();
for (peer_id, peer_info) in peers.iter() {
let mut peer_info = peer_info.lock();
if !peer_info.capabilities.tx_relay { continue }
let prop_filter = &mut peer_info.propagated_transactions;
*prop_filter = &*prop_filter & &all_transaction_hashes;
// fill the buffer with all non-propagated transactions.
let to_propagate = ready_transactions.iter()
.filter(|tx| prop_filter.insert(tx.hash()))
.map(|tx| &tx.transaction);
buf.extend(to_propagate);
// propagate to the given peer.
if buf.is_empty() { continue }
io.send(*peer_id, packet::SEND_TRANSACTIONS, {
let mut stream = RlpStream::new_list(buf.len());
for pending_tx in buf.drain(..) {
stream.append(pending_tx);
}
stream.out()
})
}
}
/// called when a peer connects.
pub fn on_connect(&self, peer: PeerId, io: &dyn IoContext) {
let proto_version = match io.protocol_version(peer).ok_or(Error::WrongNetwork) {
Ok(pv) => pv,
Err(e) => { punish(peer, io, &e); return }
};
if PROTOCOL_VERSIONS.iter().find(|x| x.0 == proto_version).is_none() {
punish(peer, io, &Error::UnsupportedProtocolVersion(proto_version));
return;
}
let chain_info = self.provider.chain_info();
let status = Status {
head_td: chain_info.total_difficulty,
head_hash: chain_info.best_block_hash,
head_num: chain_info.best_block_number,
genesis_hash: chain_info.genesis_hash,
protocol_version: proto_version as u32, // match peer proto version
network_id: self.network_id,
last_head: None,
};
let capabilities = self.capabilities.read();
let cost_local_flow = self.flow_params.read();
let local_flow = if io.is_reserved_peer(peer) {
&*self.free_flow_params
} else {
&**cost_local_flow
};
let status_packet = status::write_handshake(&status, &capabilities, Some(local_flow));
self.pending_peers.write().insert(peer, PendingPeer {
sent_head: chain_info.best_block_hash,
last_update: Instant::now(),
});
trace!(target: "pip", "Sending status to peer {}", peer);
io.send(peer, packet::STATUS, status_packet);
}
/// called when a peer disconnects.
pub fn on_disconnect(&self, peer: PeerId, io: &dyn IoContext) {
trace!(target: "pip", "Peer {} disconnecting", peer);
self.pending_peers.write().remove(&peer);
let unfulfilled = match self.peers.write().remove(&peer) {
None => return,
Some(peer_info) => {
let peer_info = peer_info.into_inner();
let mut unfulfilled: Vec<_> = peer_info.pending_requests.collect_ids();
unfulfilled.extend(peer_info.failed_requests);
unfulfilled
}
};
for handler in &self.handlers {
handler.on_disconnect(&Ctx {
peer,
io,
proto: self,
}, &unfulfilled)
}
}
/// Execute the given closure with a basic context derived from the I/O context.
pub fn with_context<F, T>(&self, io: &dyn IoContext, f: F) -> T
where F: FnOnce(&dyn BasicContext) -> T
{
f(&TickCtx {
io,
proto: self,
})
}
fn tick_handlers(&self, io: &dyn IoContext) {
for handler in &self.handlers {
handler.tick(&TickCtx {
io,
proto: self,
})
}
}
fn begin_new_cost_period(&self, io: &dyn IoContext) {
self.load_distribution.end_period(&*self.sample_store);
let avg_peer_count = self.statistics.read().avg_peer_count();
// Load share relative to average peer count +LEECHER_COUNT_FACTOR%
let load_share = MAX_LIGHTSERV_LOAD / (avg_peer_count * LEECHER_COUNT_FACTOR);
let new_params = Arc::new(FlowParams::from_request_times(
|kind| self.load_distribution.expected_time(kind),
load_share,
Duration::from_secs(self.config.max_stored_seconds),
));
*self.flow_params.write() = new_params.clone();
trace!(target: "pip", "New cost period: avg_peers={} ; cost_table:{:?}", avg_peer_count, new_params.cost_table());
let peers = self.peers.read();
let now = Instant::now();
let packet_body = {
let mut stream = RlpStream::new_list(3);
stream.append(new_params.limit())
.append(new_params.recharge_rate())
.append(new_params.cost_table());
stream.out()
};
for (peer_id, peer_info) in peers.iter() {
let mut peer_info = peer_info.lock();
io.send(*peer_id, packet::UPDATE_CREDITS, packet_body.clone());
peer_info.awaiting_acknowledge = Some((now, new_params.clone()));
}
}
fn tick_statistics(&self) {
let leecher_count = self.leecher_count();
self.statistics.write().add_peer_count(leecher_count);
}
}
impl LightProtocol {
// Handle status message from peer.
fn status(&self, peer: PeerId, io: &dyn IoContext, data: &Rlp) -> Result<(), Error> {
let pending = match self.pending_peers.write().remove(&peer) {
Some(pending) => pending,
None => {
return Err(Error::UnexpectedHandshake);
}
};
let (status, capabilities, flow_params) = status::parse_handshake(data)?;
trace!(target: "pip", "Connected peer with chain head {:?}", (status.head_hash, status.head_num));
if (status.network_id, status.genesis_hash) != (self.network_id, self.genesis_hash) {
trace!(target: "pip", "peer {} wrong network: network_id is {} vs our {}, gh is {} vs our {}",
peer, status.network_id, self.network_id, status.genesis_hash, self.genesis_hash);
return Err(Error::WrongNetwork);
}
if Some(status.protocol_version as u8) != io.protocol_version(peer) {
return Err(Error::BadProtocolVersion);
}
let remote_flow = flow_params.map(|params| (params.create_credits(), params));
let local_flow = if io.is_reserved_peer(peer) {
self.free_flow_params.clone()
} else {
self.flow_params.read().clone()
};
self.peers.write().insert(peer, Mutex::new(Peer {
local_credits: local_flow.create_credits(),
status: status.clone(),
capabilities,
remote_flow,
sent_head: pending.sent_head,
last_update: pending.last_update,
pending_requests: RequestSet::default(),
failed_requests: Vec::new(),
propagated_transactions: HashSet::new(),
skip_update: false,
local_flow,
awaiting_acknowledge: None,
}));
let any_kept = self.handlers.iter().map(
|handler| handler.on_connect(
&Ctx {
peer,
io,
proto: self,
},
&status,
&capabilities
)
).fold(PeerStatus::Kept, PeerStatus::bitor);
if any_kept == PeerStatus::Unkept {
Err(Error::RejectedByHandlers)
} else {
Ok(())
}
}
// Handle an announcement.
fn announcement(&self, peer: PeerId, io: &dyn IoContext, data: &Rlp) -> Result<(), Error> {
if !self.peers.read().contains_key(&peer) {
debug!(target: "pip", "Ignoring announcement from unknown peer");
return Ok(())
}
let announcement = status::parse_announcement(data)?;
// scope to ensure locks are dropped before moving into handler-space.
{
let peers = self.peers.read();
let peer_info = match peers.get(&peer) {
Some(info) => info,
None => return Ok(()),
};
let mut peer_info = peer_info.lock();
// update status.
{
// TODO: punish peer if they've moved backwards.
let status = &mut peer_info.status;
let last_head = status.head_hash;
status.head_hash = announcement.head_hash;
status.head_td = announcement.head_td;
status.head_num = announcement.head_num;
status.last_head = Some((last_head, announcement.reorg_depth));
}
// update capabilities.
peer_info.capabilities.update_from(&announcement);
}
for handler in &self.handlers {
handler.on_announcement(&Ctx {
peer,
io,
proto: self,
}, &announcement);
}
Ok(())
}
// Receive requests from a peer.
fn request(&self, peer_id: PeerId, io: &dyn IoContext, raw: &Rlp) -> Result<(), Error> {
// the maximum amount of requests we'll fill in a single packet.
const MAX_REQUESTS: usize = 256;
use ::request::Builder;
use ::request::CompleteRequest;
let peers = self.peers.read();
let peer = match peers.get(&peer_id) {
Some(peer) => peer,
None => {
debug!(target: "pip", "Ignoring request from unknown peer");
return Ok(())
}
};
let mut peer = peer.lock();