forked from libp2p/go-libp2p-kad-dht
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdht_options.go
405 lines (361 loc) · 13 KB
/
dht_options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
package dht
import (
"fmt"
"time"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
"github.com/ipfs/go-ipns"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-kad-dht/providers"
record "github.com/libp2p/go-libp2p-record"
)
// ModeOpt describes what mode the dht should operate in
type ModeOpt int
const (
// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
// between Client and Server modes based on network conditions
ModeAuto ModeOpt = iota
// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
ModeClient
// ModeServer operates the DHT as a server, it can both send and respond to queries
ModeServer
// ModeAutoServer operates in the same way as ModeAuto, but acts as a server when reachability is unknown
ModeAutoServer
)
// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
const DefaultPrefix protocol.ID = "/ipfs"
// Options is a structure containing all the options that can be used when constructing a DHT.
type config struct {
datastore ds.Batching
validator record.Validator
validatorChanged bool // if true implies that the validator has been changed and that defaults should not be used
mode ModeOpt
protocolPrefix protocol.ID
bucketSize int
concurrency int
resiliency int
maxRecordAge time.Duration
enableProviders bool
enableValues bool
providersOptions []providers.Option
queryPeerFilter QueryFilterFunc
routingTable struct {
refreshQueryTimeout time.Duration
refreshInterval time.Duration
autoRefresh bool
latencyTolerance time.Duration
checkInterval time.Duration
peerFilter RouteTableFilterFunc
}
// set to true if we're operating in v1 dht compatible mode
v1CompatibleMode bool
bootstrapPeers []peer.AddrInfo
}
func emptyQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool { return true }
func emptyRTFilter(_ *IpfsDHT, conns []network.Conn) bool { return true }
// apply applies the given options to this Option
func (c *config) apply(opts ...Option) error {
for i, opt := range opts {
if err := opt(c); err != nil {
return fmt.Errorf("dht option %d failed: %s", i, err)
}
}
return nil
}
// applyFallbacks sets default values that could not be applied during config creation since they are dependent
// on other configuration parameters (e.g. optA is by default 2x optB) and/or on the Host
func (c *config) applyFallbacks(h host.Host) error {
if !c.validatorChanged {
nsval, ok := c.validator.(record.NamespacedValidator)
if ok {
if _, pkFound := nsval["pk"]; !pkFound {
nsval["pk"] = record.PublicKeyValidator{}
}
if _, ipnsFound := nsval["ipns"]; !ipnsFound {
nsval["ipns"] = ipns.Validator{KeyBook: h.Peerstore()}
}
} else {
return fmt.Errorf("the default validator was changed without being marked as changed")
}
}
return nil
}
// Option DHT option type.
type Option func(*config) error
const defaultBucketSize = 20
// defaults are the default DHT options. This option will be automatically
// prepended to any options you pass to the DHT constructor.
var defaults = func(o *config) error {
o.validator = record.NamespacedValidator{}
o.datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.protocolPrefix = DefaultPrefix
o.enableProviders = true
o.enableValues = true
o.queryPeerFilter = emptyQueryFilter
o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 1 * time.Minute
o.routingTable.refreshInterval = 10 * time.Minute
o.routingTable.autoRefresh = true
o.routingTable.peerFilter = emptyRTFilter
o.maxRecordAge = time.Hour * 36
o.bucketSize = defaultBucketSize
o.concurrency = 10
o.resiliency = 3
o.v1CompatibleMode = true
return nil
}
func (c *config) validate() error {
if c.protocolPrefix != DefaultPrefix {
return nil
}
if c.bucketSize != defaultBucketSize {
return fmt.Errorf("protocol prefix %s must use bucket size %d", DefaultPrefix, defaultBucketSize)
}
if !c.enableProviders {
return fmt.Errorf("protocol prefix %s must have providers enabled", DefaultPrefix)
}
if !c.enableValues {
return fmt.Errorf("protocol prefix %s must have values enabled", DefaultPrefix)
}
nsval, isNSVal := c.validator.(record.NamespacedValidator)
if !isNSVal {
return fmt.Errorf("protocol prefix %s must use a namespaced validator", DefaultPrefix)
}
if len(nsval) != 2 {
return fmt.Errorf("protocol prefix %s must have exactly two namespaced validators - /pk and /ipns", DefaultPrefix)
}
if pkVal, pkValFound := nsval["pk"]; !pkValFound {
return fmt.Errorf("protocol prefix %s must support the /pk namespaced validator", DefaultPrefix)
} else if _, ok := pkVal.(record.PublicKeyValidator); !ok {
return fmt.Errorf("protocol prefix %s must use the record.PublicKeyValidator for the /pk namespace", DefaultPrefix)
}
if ipnsVal, ipnsValFound := nsval["ipns"]; !ipnsValFound {
return fmt.Errorf("protocol prefix %s must support the /ipns namespaced validator", DefaultPrefix)
} else if _, ok := ipnsVal.(ipns.Validator); !ok {
return fmt.Errorf("protocol prefix %s must use ipns.Validator for the /ipns namespace", DefaultPrefix)
}
return nil
}
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option {
return func(c *config) error {
c.routingTable.latencyTolerance = latency
return nil
}
}
// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
return func(c *config) error {
c.routingTable.refreshQueryTimeout = timeout
return nil
}
}
// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(c *config) error {
c.routingTable.refreshInterval = period
return nil
}
}
// Datastore configures the DHT to use the specified datastore.
//
// Defaults to an in-memory (temporary) map.
func Datastore(ds ds.Batching) Option {
return func(c *config) error {
c.datastore = ds
return nil
}
}
// Mode configures which mode the DHT operates in (Client, Server, Auto).
//
// Defaults to ModeAuto.
func Mode(m ModeOpt) Option {
return func(c *config) error {
c.mode = m
return nil
}
}
// Validator configures the DHT to use the specified validator.
//
// Defaults to a namespaced validator that can validate both public key (under the "pk"
// namespace) and IPNS records (under the "ipns" namespace). Setting the validator
// implies that the user wants to control the validators and therefore the default
// public key and IPNS validators will not be added.
func Validator(v record.Validator) Option {
return func(c *config) error {
c.validator = v
c.validatorChanged = true
return nil
}
}
// NamespacedValidator adds a validator namespaced under `ns`. This option fails
// if the DHT is not using a `record.NamespacedValidator` as its validator (it
// uses one by default but this can be overridden with the `Validator` option).
// Adding a namespaced validator without changing the `Validator` will result in
// adding a new validator in addition to the default public key and IPNS validators.
// The "pk" and "ipns" namespaces cannot be overridden here unless a new `Validator`
// has been set first.
//
// Example: Given a validator registered as `NamespacedValidator("ipns",
// myValidator)`, all records with keys starting with `/ipns/` will be validated
// with `myValidator`.
func NamespacedValidator(ns string, v record.Validator) Option {
return func(c *config) error {
nsval, ok := c.validator.(record.NamespacedValidator)
if !ok {
return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
}
nsval[ns] = v
return nil
}
}
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
//
// Defaults to dht.DefaultPrefix
func ProtocolPrefix(prefix protocol.ID) Option {
return func(c *config) error {
c.protocolPrefix = prefix
return nil
}
}
// ProtocolExtension adds an application specific protocol to the DHT protocol. For example,
// /ipfs/lan/kad/1.0.0 instead of /ipfs/kad/1.0.0. extension should be of the form /lan.
func ProtocolExtension(ext protocol.ID) Option {
return func(c *config) error {
c.protocolPrefix += ext
return nil
}
}
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
func BucketSize(bucketSize int) Option {
return func(c *config) error {
c.bucketSize = bucketSize
return nil
}
}
// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 10.
func Concurrency(alpha int) Option {
return func(c *config) error {
c.concurrency = alpha
return nil
}
}
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
// path to complete.
//
// The default value is 3.
func Resiliency(beta int) Option {
return func(c *config) error {
c.resiliency = beta
return nil
}
}
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
// from the time its received. This does not apply to any other forms of validity that
// the record may contain.
// For example, a record may contain an ipns entry with an EOL saying its valid
// until the year 2020 (a great time in the future). For that record to stick around
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
func MaxRecordAge(maxAge time.Duration) Option {
return func(c *config) error {
c.maxRecordAge = maxAge
return nil
}
}
// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
return func(c *config) error {
c.routingTable.autoRefresh = false
return nil
}
}
// DisableProviders disables storing and retrieving provider records.
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableProviders() Option {
return func(c *config) error {
c.enableProviders = false
return nil
}
}
// DisableValues disables storing and retrieving value records (including
// public keys).
//
// Defaults to enabled.
//
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
// network and/or distinct DHT protocols with the `Protocols` option).
func DisableValues() Option {
return func(c *config) error {
c.enableValues = false
return nil
}
}
// ProvidersOptions are options passed directly to the provider manager.
//
// The provider manager adds and gets provider records from the datastore, cahing
// them in between. These options are passed to the provider manager allowing
// customisation of things like the GC interval and cache implementation.
func ProvidersOptions(opts []providers.Option) Option {
return func(c *config) error {
c.providersOptions = opts
return nil
}
}
// QueryFilter sets a function that approves which peers may be dialed in a query
func QueryFilter(filter QueryFilterFunc) Option {
return func(c *config) error {
c.queryPeerFilter = filter
return nil
}
}
// RoutingTableFilter sets a function that approves which peers may be added to the routing table. The host should
// already have at least one connection to the peer under consideration.
func RoutingTableFilter(filter RouteTableFilterFunc) Option {
return func(c *config) error {
c.routingTable.peerFilter = filter
return nil
}
}
// V1CompatibleMode sets the DHT to operate in V1 compatible mode. In this mode,
// the DHT node will act like a V1 DHT node (use the V1 protocol names) but will
// use the V2 query and routing table logic.
//
// For now, this option defaults to true for backwards compatibility. In the
// near future, it will switch to false.
//
// This option is perma-unstable and may be removed in the future.
func V1CompatibleMode(enable bool) Option {
return func(c *config) error {
c.v1CompatibleMode = enable
return nil
}
}
// BootstrapPeers configures the bootstrapping nodes that we will connect to to seed
// and refresh our Routing Table if it becomes empty.
func BootstrapPeers(bootstrappers ...peer.AddrInfo) Option {
return func(c *config) error {
c.bootstrapPeers = bootstrappers
return nil
}
}