-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathwiring.h
117 lines (92 loc) · 2.92 KB
/
wiring.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/*
* See wireup.md for a discussion of the "wireup" protocol
* whose data structures and message format are defined here.
*/
#ifndef _WIRES_H_
#define _WIRES_H_
#include <assert.h>
#include <inttypes.h> /* PRId32 */
#include <stdbool.h>
#include <stdint.h> /* int32_t */
#include <unistd.h> /* size_t, SIZE_MAX */
#include <sys/queue.h>
#include <ucp/api/ucp.h>
#include "wiring_compat.h"
#include "wireup.h"
typedef int32_t sender_id_t;
#define SENDER_ID_MAX INT32_MAX
#define PRIdSENDER PRId32
#define sender_id_nil ((sender_id_t)-1)
struct _wiring;
typedef struct _wiring wiring_t;
struct _wstorage;
typedef struct _wstorage wstorage_t;
typedef struct _wiring_lock_bundle {
void (*lock)(wiring_t *, void *);
void (*unlock)(wiring_t *, void *);
bool (*assert_locked)(wiring_t *, void *);
void *arg;
} wiring_lock_bundle_t;
struct _rxpool;
typedef struct _rxpool rxpool_t;
/* TBD A wire ID can embed a generation
* number to guard against wire
* reassignment. OR, add a "reclaimed"
* state after "dead" to the wire state machine?
* "Dead" wires will not be reused.
*/
typedef struct _wire_id {
sender_id_t wiring_atomic id;
} wire_id_t;
typedef enum {
wire_ev_estd = 0
, wire_ev_died
} wire_event_t;
typedef struct _wire_event_info {
wire_event_t event;
ucp_ep_h ep;
sender_id_t sender_id;
} wire_event_info_t;
typedef struct _wire_accept_info {
const ucp_address_t *addr;
size_t addrlen;
wire_id_t wire_id;
sender_id_t sender_id;
ucp_ep_h ep;
} wire_accept_info_t;
/* Indication of a new wire accepted from a remote peer. */
typedef void *(*wire_accept_cb_t)(wire_accept_info_t, void *);
/* Indication of a wire established or a wire that died. */
typedef bool (*wire_event_cb_t)(wire_event_info_t, void *);
struct _wiring {
wiring_lock_bundle_t lkb;
wire_accept_cb_t accept_cb;
void *accept_cb_arg;
rxpool_t *rxpool;
wstorage_t *storage;
void **assoc; /* assoc[i] is a pointer to wire i's optional
* "associated data"
*/
};
#define wire_id_nil (wire_id_t){.id = sender_id_nil}
wiring_t *wiring_create(ucp_worker_h, size_t, const wiring_lock_bundle_t *,
wire_accept_cb_t, void *);
bool wiring_init(wiring_t *, ucp_worker_h, size_t,
const wiring_lock_bundle_t *, wire_accept_cb_t, void *);
int wireup_once(wiring_t *);
void wiring_destroy(wiring_t *, bool);
void wiring_teardown(wiring_t *, bool);
wire_id_t wireup_start(wiring_t *, ucp_address_t *, size_t,
ucp_address_t *, size_t, wire_event_cb_t, void *, void *);
bool wireup_stop(wiring_t *, wire_id_t, bool);
void wireup_app_tag(wiring_t *, uint64_t *, uint64_t *);
const char *wire_event_string(wire_event_t);
sender_id_t wire_get_sender_id(wiring_t *, wire_id_t);
void *wire_get_data(wiring_t *, wire_id_t);
extern void * const wire_data_nil;
static inline bool
wire_is_valid(wire_id_t wid)
{
return wid.id != sender_id_nil;
}
#endif /* _WIRES_H_ */