This repository has been archived by the owner on Sep 20, 2022. It is now read-only.
forked from dedis/onet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol.go
260 lines (230 loc) · 8.69 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
package onet
import (
"sync"
"go.dedis.ch/onet/v3/log"
"go.dedis.ch/onet/v3/network"
"golang.org/x/xerrors"
uuid "gopkg.in/satori/go.uuid.v1"
)
// ProtocolID uniquely identifies a protocol
type ProtocolID uuid.UUID
// String returns canonical string representation of the ID
func (pid ProtocolID) String() string {
return uuid.UUID(pid).String()
}
// Equal returns true if and only if pid2 equals this ProtocolID.
func (pid ProtocolID) Equal(pid2 ProtocolID) bool {
return uuid.Equal(uuid.UUID(pid), uuid.UUID(pid2))
}
// IsNil returns true iff the ProtocolID is Nil
func (pid ProtocolID) IsNil() bool {
return pid.Equal(ProtocolID(uuid.Nil))
}
// NewProtocol is the function-signature needed to instantiate a new protocol
type NewProtocol func(*TreeNodeInstance) (ProtocolInstance, error)
// ProtocolInstance is the interface that instances have to use in order to be
// recognized as protocols
type ProtocolInstance interface {
// Start is called when a leader has created its tree configuration and
// wants to start a protocol, it calls host.StartProtocol(protocolID), that
// in turns instantiate a new protocol (with a fresh token), and then call
// Start on it.
Start() error
// Dispatch is called at the beginning by onet for listening on the channels
Dispatch() error
// DispatchMsg is a method that is called each time a message arrives for
// this protocolInstance. TreeNodeInstance implements that method for you
// using channels or handlers.
ProcessProtocolMsg(*ProtocolMsg)
// The token representing this ProtocolInstance
Token() *Token
// Shutdown cleans up the resources used by this protocol instance
Shutdown() error
}
var protocols = newProtocolStorage()
// protocolStorage holds all protocols either globally or per-Server.
type protocolStorage struct {
// Lock used because of the 'serverStarted' flag: it can be changed from a
// call to 'Server.Start' and is checked when calling
// 'GlobalProtocolRegister'.
sync.Mutex
// Instantiators maps the name of the protocols to the `NewProtocol`-
// methods.
instantiators map[string]NewProtocol
// Flag indicating if a server has already started; here to avoid calls
// to 'GlobalProtocolRegister' when a server has already started.
serverStarted bool
}
// newProtocolStorage returns an initialized ProtocolStorage-struct.
func newProtocolStorage() *protocolStorage {
return &protocolStorage{
instantiators: map[string]NewProtocol{},
}
}
// ProtocolIDToName returns the name to the corresponding protocolID.
func (ps *protocolStorage) ProtocolIDToName(id ProtocolID) string {
ps.Lock()
defer ps.Unlock()
for n := range ps.instantiators {
if id.Equal(ProtocolNameToID(n)) {
return n
}
}
return ""
}
// ProtocolExists returns whether a certain protocol already has been
// registered.
func (ps *protocolStorage) ProtocolExists(protoID ProtocolID) bool {
name := ps.ProtocolIDToName(protoID)
ps.Lock()
_, ok := ps.instantiators[name]
ps.Unlock()
return ok
}
// Register takes a name and a NewProtocol and stores it in the structure.
// If the protocol already exists, a warning is printed and the NewProtocol is
// *not* stored.
func (ps *protocolStorage) Register(name string, protocol NewProtocol) (ProtocolID, error) {
ps.Lock()
defer ps.Unlock()
id := ProtocolNameToID(name)
if _, exists := ps.instantiators[name]; exists {
return ProtocolID(uuid.Nil),
xerrors.Errorf("Protocol -%s- already exists - not overwriting", name)
}
ps.instantiators[name] = protocol
log.Lvl4("Registered", name, "to", id)
return id, nil
}
// ProtocolNameToID returns the ProtocolID corresponding to the given name.
func ProtocolNameToID(name string) ProtocolID {
url := network.NamespaceURL + "protocolname/" + name
return ProtocolID(uuid.NewV3(uuid.NamespaceURL, url))
}
// GlobalProtocolRegister registers a protocol in the global namespace.
// This is used in protocols that register themselves in the `init`-method.
// All registered protocols will be copied to every instantiated Server. If a
// protocol is tied to a service, use `Server.ProtocolRegisterName`
func GlobalProtocolRegister(name string, protocol NewProtocol) (ProtocolID, error) {
protocols.Lock()
// Cannot defer the "Unlock" because "Register" is using the lock too.
if protocols.serverStarted {
protocols.Unlock()
panic("Cannot call 'GlobalProtocolRegister' when a server has already started.")
}
protocols.Unlock()
id, err := protocols.Register(name, protocol)
if err != nil {
return id, xerrors.Errorf("registering protocol: %v", err)
}
return id, nil
}
// InformServerStarted allows to set the 'serverStarted' flag to true.
func InformServerStarted() {
protocols.Lock()
defer protocols.Unlock()
protocols.serverStarted = true
}
// InformAllServersStopped allows to set the 'serverStarted' flag to false.
func InformAllServersStopped() {
protocols.Lock()
defer protocols.Unlock()
protocols.serverStarted = false
}
// MessageProxy is an interface that allows one protocol to completely define its
// wire protocol format while still using the Overlay.
// Cothority sends different messages dynamically as slices of bytes, whereas
// Google proposes to use union-types:
// https://developers.google.com/protocol-buffers/docs/techniques#union
// This is a wrapper to enable union-types while still keeping compatibility with
// the dynamic cothority-messages. Implementations must provide methods to
// pass from the 'union-types' to 'cothority-dynamic-messages' with the Wrap
// and Unwrap method.
// A default one is provided with defaultMessageProxy so the regular wire-format
// protocol can still be used.
type MessageProxy interface {
// Wrap takes a message and the overlay information and returns the message
// that has to be sent directly to the network alongside with any error that
// happened.
// If msg is nil, it is only an internal message of the Overlay.
Wrap(msg interface{}, info *OverlayMsg) (interface{}, error)
// Unwrap takes the message coming from the network and returns the
// inner message that is going to be dispatched to the ProtocolInstance, the
// OverlayMessage needed by the Overlay to function correctly and then any
// error that might have occurred.
Unwrap(msg interface{}) (interface{}, *OverlayMsg, error)
// PacketType returns the packet type ID that this Protocol expects from the
// network. This is needed in order for the Overlay to receive those
// messages and dispatch them to the correct MessageProxy.
PacketType() network.MessageTypeID
// Name returns the name associated with this MessageProxy. When creating a
// protocol, if one use a name used by a MessageProxy, this MessageProxy will be
// used to Wrap and Unwrap messages.
Name() string
}
// NewMessageProxy is a function typedef to instantiate a new MessageProxy.
type NewMessageProxy func() MessageProxy
type messageProxyFactoryStruct struct {
factories []NewMessageProxy
}
// RegisterMessageProxy stores the message proxy creation function
func (mpfs *messageProxyFactoryStruct) RegisterMessageProxy(n NewMessageProxy) {
mpfs.factories = append(mpfs.factories, n)
}
var messageProxyFactory = messageProxyFactoryStruct{}
// RegisterMessageProxy saves a new NewMessageProxy under its name.
// When a Server is instantiated, all MessageProxys will be generated and stored
// for this Server.
func RegisterMessageProxy(n NewMessageProxy) {
messageProxyFactory.RegisterMessageProxy(n)
}
// messageProxyStore contains all created MessageProxys. It contains the default
// MessageProxy used by the Overlay for backwards-compatibility.
type messageProxyStore struct {
sync.Mutex
protos []MessageProxy
defaultIO MessageProxy
}
// RegisterMessageProxy saves directly the given MessageProxy. It's useful if
// one wants different message proxy per server/overlay.
func (p *messageProxyStore) RegisterMessageProxy(mp MessageProxy) {
if p.getByName(mp.Name()) == p.defaultIO {
return
}
p.Lock()
defer p.Unlock()
p.protos = append(p.protos, mp)
}
func (p *messageProxyStore) getByName(name string) MessageProxy {
p.Lock()
defer p.Unlock()
for _, pio := range p.protos {
if pio.Name() == name {
return pio
}
}
return p.defaultIO
}
func (p *messageProxyStore) getByPacketType(mid network.MessageTypeID) MessageProxy {
p.Lock()
defer p.Unlock()
for _, pio := range p.protos {
if pio.PacketType().Equal(mid) {
return pio
}
}
return p.defaultIO
}
func newMessageProxyStore(disp network.Dispatcher, proc network.Processor) *messageProxyStore {
pstore := &messageProxyStore{
// also add the default one
defaultIO: &defaultProtoIO{},
}
for name, newIO := range messageProxyFactory.factories {
io := newIO()
pstore.protos = append(pstore.protos, io)
disp.RegisterProcessor(proc, io.PacketType())
log.Lvl3("Instantiating MessageProxy", name, "at position", len(pstore.protos))
}
return pstore
}