-
Notifications
You must be signed in to change notification settings - Fork 52
/
materialize.proto
381 lines (354 loc) · 15 KB
/
materialize.proto
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
syntax = "proto3";
package materialize;
option go_package = "github.com/estuary/flow/go/protocols/materialize";
import "consumer/protocol/protocol.proto";
import "go/protocols/flow/flow.proto";
import "gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.protosizer_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
service Connector {
rpc Materialize(stream Request) returns (stream Response);
}
message Request {
// Spec requests the specification definition of this connector.
// Notably this includes its configuration JSON schemas.
message Spec {
// Connector type addressed by this request.
flow.MaterializationSpec.ConnectorType connector_type = 1;
// Connector configuration, as an encoded JSON object.
// This may be a partial specification (for example, a Docker image),
// providing only enough information to fetch the remainder of the
// specification schema.
string config_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "config"
];
}
Spec spec = 1;
// Validate a materialization configuration and proposed bindings.
// Validate is run out-of-band with ongoing capture invocations.
// It's purpose is to confirm that the proposed configuration
// is likely to succeed if applied and run, or to report any
// potential issues for the user to address.
message Validate {
// Name of the materialization being validated.
string name = 1
[ (gogoproto.casttype) =
"github.com/estuary/flow/go/protocols/flow.Materialization" ];
// Connector type addressed by this request.
flow.MaterializationSpec.ConnectorType connector_type = 2;
// Connector configuration, as an encoded JSON object.
string config_json = 3 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "config"
];
// Bindings of endpoint resources and collections from which they would be
// materialized. Bindings are ordered and unique on the bound collection name.
message Binding {
// JSON-encoded object which specifies the endpoint resource to be materialized.
string resource_config_json = 1 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "resourceConfig"
];
// Collection to be materialized.
flow.CollectionSpec collection = 2 [ (gogoproto.nullable) = false ];
// Projection configuration, keyed by the projection field name,
// with JSON-encoded and driver-defined configuration objects.
map<string, string> field_config_json_map = 3 [
(gogoproto.castvalue) = "encoding/json.RawMessage",
json_name = "fieldConfig"
];
// Backfill counter for this binding.
uint32 backfill = 4;
}
repeated Binding bindings = 4;
// Last MaterializationSpec which was validated and published.
// Note that this MaterializationSpec may not have been applied.
flow.MaterializationSpec last_materialization = 5;
// Version of the last validated MaterializationSpec.
string last_version = 6;
}
Validate validate = 2;
// Apply a materialization configuration and bindings to its endpoint.
// Apply is run out-of-band with ongoing connector invocations,
// and may be run many times for a single materialization name,
// where each invocation has varying bindings, or even no bindings.
// The connector performs any required setup or cleanup.
message Apply {
// Materialization to be applied.
flow.MaterializationSpec materialization = 1;
// Version of the MaterializationSpec being applied.
string version = 2;
// Last CaptureSpec which was successfully applied.
flow.MaterializationSpec last_materialization = 4;
// Version of the last applied MaterializationSpec.
string last_version = 5;
}
Apply apply = 3;
// Open a materialization stream.
//
// If the Flow recovery log is authoritative:
// The driver is given its last committed checkpoint state in this request.
// It MAY return a runtime checkpoint in its opened response -- perhaps an older
// Flow checkpoint which was previously embedded within its driver checkpoint.
//
// If the remote store is authoritative:
// The driver MUST fence off other streams of this materialization that
// overlap the provided [key_begin, key_end) range, such that those streams
// cannot issue further commits. The driver MUST return its stored runtime
// checkpoint for this materialization and range [key_begin, key_end]
// in its Opened response.
//
// After Open, the runtime will send only Load, Flush, Store,
// StartCommit, and Acknowledge.
message Open {
// Materialization to be transacted.
flow.MaterializationSpec materialization = 1;
// Version of the opened MaterializationSpec.
// The driver may want to require that this match the version last
// provided to a successful Apply RPC. It's possible that it won't,
// due to expected propagation races in Flow's distributed runtime.
string version = 2;
// Range of documents to be processed by this invocation.
flow.RangeSpec range = 3;
// Last-persisted connector checkpoint state from a previous invocation.
string state_json = 4 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "state"
];
}
Open open = 4;
// Load a document identified by its key. The given key may have never before been stored,
// but a given key will be sent in a transaction Load just one time.
message Load {
// Index of the Open binding for which this document is to be loaded.
uint32 binding = 1;
// key tuple, as an array of key components.
// Ordering matches `keys` of the materialization's field selection.
string key_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "key"
];
// Packed tuple of the document key to load.
bytes key_packed = 3;
}
Load load = 5;
// Flush loads. No further Loads will be sent in this transaction,
// and the runtime will await the connectors's remaining Loaded
// responses followed by one Flushed response.
message Flush {}
Flush flush = 6;
// Store documents updated by the current transaction.
message Store {
// Index of the Open binding for which this document is to be stored.
uint32 binding = 1;
// Key tuple, as an array of key components.
// Ordering matches `keys` of the materialization's field selection.
string key_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "key"
];
// Packed FoundationDB tuple of the document key to store.
bytes key_packed = 3;
// Values tuple, as an array of value components.
// Ordering matches `values` of the materialization's field selection.
string values_json = 4 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "values"
];
// Packed FoundationDB tuple of the document values to store.
bytes values_packed = 5;
// JSON document to store.
string doc_json = 6 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "doc"
];
// Exists is true if this document has previously been loaded or stored.
bool exists = 7;
// Delete is true if this document is being deleted, which also implies `exists`.
bool delete = 8;
}
Store store = 7;
// Mark the end of the Store phase, and if the remote store is authoritative,
// instruct it to start committing its transaction.
message StartCommit {
// Flow runtime checkpoint to commit with this transaction.
consumer.Checkpoint runtime_checkpoint = 1;
}
StartCommit start_commit = 8;
// Acknowledge to the connector that the previous transaction
// has committed to the Flow runtime's recovery log.
message Acknowledge {}
Acknowledge acknowledge = 9;
// Reserved for internal use.
bytes internal = 100 [ json_name = "$internal" ];
}
message Response {
// Spec responds to Request.Spec.
message Spec {
// Protocol version must be 3032023.
uint32 protocol = 1;
// JSON schema of the connector's configuration.
string config_schema_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "configSchema"
];
// JSON schema of the connecor's resource configuration.
string resource_config_schema_json = 3 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "resourceConfigSchema"
];
// URL for connector's documention.
string documentation_url = 4;
// Optional OAuth2 configuration.
flow.OAuth2 oauth2 = 5;
}
Spec spec = 1;
// Validated responds to Request.Validate.
message Validated {
// Constraint constrains the use of a flow.Projection within a materialization.
message Constraint {
// Type encodes a constraint type for this flow.Projection.
enum Type {
INVALID = 0;
// This specific projection must be present.
FIELD_REQUIRED = 1;
// At least one projection with this location pointer must be present.
LOCATION_REQUIRED = 2;
// A projection with this location is recommended, and should be included by
// default.
LOCATION_RECOMMENDED = 3;
// This projection may be included, but should be omitted by default.
FIELD_OPTIONAL = 4;
// This projection must not be present in the materialization.
FIELD_FORBIDDEN = 5;
// This specific projection is required but is also unacceptable (e.x.,
// because it uses an incompatible type with a previous applied version).
UNSATISFIABLE = 6;
}
Type type = 2;
// Optional human readable reason for the given constraint.
// Implementations are strongly encouraged to supply a descriptive message.
string reason = 3;
}
// Validation responses for each binding of the request, and matching the
// request ordering. Each Binding must have a unique resource_path.
message Binding {
// Constraints over collection projections imposed by the Driver,
// keyed by the projection field name. Projections of the CollectionSpec
// which are missing from constraints are implicitly forbidden.
map<string, Constraint> constraints = 1;
// Components of the resource path which fully qualify the resource
// identified by this binding.
// - For an RDBMS, this might be []{dbname, schema, table}.
// - For Kafka, this might be []{topic}.
// - For Redis, this might be []{key_prefix}.
repeated string resource_path = 2;
// Materialize combined delta updates of documents rather than full
// reductions.
//
// When set, the Flow runtime will not attempt to load documents via
// Request.Load, and also disables re-use of cached documents
// stored in prior transactions. Each stored document is exclusively
// combined from updates processed by the runtime within the current
// transaction only.
//
// This is appropriate for drivers over streams, WebHooks, and append-only
// files.
//
// For example, given a collection which reduces a sum count for each key,
// its materialization will produce a stream of delta updates to the count,
// such that a reader of the stream will arrive at the correct total count.
bool delta_updates = 3;
}
repeated Binding bindings = 1;
}
Validated validated = 2;
// Applied responds to Request.Apply.
message Applied {
// Human-readable description of the action that the connector took.
// If empty, this Apply is to be considered a "no-op".
string action_description = 1;
}
Applied applied = 3;
// Opened responds to Request.Open.
// After Opened, the connector sends only Loaded, Flushed,
// StartedCommit, and Acknowledged as per the materialization
// protocol.
message Opened {
// Flow runtime checkpoint to begin processing from.
// If empty, the most recent checkpoint of the Flow recovery log is used.
//
// Or, a driver may send the value []byte{0xf8, 0xff, 0xff, 0xff, 0xf, 0x1}
// to explicitly begin processing from a zero-valued checkpoint, effectively
// rebuilding the materialization from scratch. This sentinel is a trivial
// encoding of the max-value 2^29-1 protobuf tag with boolean true.
consumer.Checkpoint runtime_checkpoint = 1;
}
Opened opened = 4;
// Loaded responds to Request.Load.
// It returns documents of requested keys which have previously been stored.
// Keys not found in the store MUST be omitted. Documents may be in any order,
// both within and across Loaded response messages, but a document of a given
// key MUST be sent at most one time in a Transaction.
message Loaded {
// Index of the Open binding for which this document was loaded.
uint32 binding = 1;
// Loaded JSON document.
string doc_json = 2 [
(gogoproto.casttype) = "encoding/json.RawMessage",
json_name = "doc"
];
}
Loaded loaded = 5;
// Flushed responds to a Request.Flush.
// The driver will send no further Loaded responses.
message Flushed {
// Optional update to ConnectorState.
// This update is durably written before the connector receives a following
// Store or StartCommit request.
flow.ConnectorState state = 1;
}
Flushed flushed = 6;
// StartedCommit responds to a Request.StartCommit.
// The driver has processed all Store requests, it has started to commit its
// transaction (if it has one), and it is now ready for the runtime to start
// committing to its own recovery log.
message StartedCommit {
// Optional *transactional* update to ConnectorState.
// This update commits atomically with the Flow recovery log checkpoint.
flow.ConnectorState state = 1;
}
StartedCommit started_commit = 7;
// Notify the runtime that the previous transaction has committed.
// On receipt, the runtime may begin to flush, store, and commit a
// next (pipelined) transaction.
//
// Acknowledged is _not_ a direct response to Request.Acknowledge,
// and Acknowledge vs Acknowledged may be written in either order.
message Acknowledged {
// Optional *non-transactional* update to ConnectorState.
// This update is not transactional and the connector must tolerate a future,
// duplicate Request.Acknowledge of this same checkpoint and connector state,
// even after having previously responded with Acknowledged and a (discarded)
// connector state update.
flow.ConnectorState state = 1;
}
Acknowledged acknowledged = 8;
// Reserved for internal use.
bytes internal = 100 [ json_name = "$internal" ];
}
// Extra messages used by connectors
// TODO(johnny): Do we still need this?
message Extra {
message ValidateExistingProjectionRequest {
flow.MaterializationSpec.Binding existing_binding = 1;
Request.Validate.Binding proposed_binding = 2;
}
message ValidateBindingAgainstConstraints {
flow.MaterializationSpec.Binding binding = 1;
map<string, Response.Validated.Constraint> constraints = 2;
}
}