-
Notifications
You must be signed in to change notification settings - Fork 138
df: types: Input validation option to use operations within DataFlow #382
Comments
working on this |
Some ideas. The orchestrator waits for new input sets to enter the network, then it dispatches operations. Since we want to run an operation, and we need to operation to be fully setup (instantiated and configured just like the rest of the operations in the dataflow) we'll want to dispatch the operation in the same place we're dispatching the other operations (
|
elif task is input_set_enters_network: | |
( | |
more, | |
new_input_sets, | |
) = input_set_enters_network.result() | |
for new_input_set in new_input_sets: | |
# Identify which operations have complete contextually | |
# appropriate input sets which haven't been run yet |
To keep things separate, instead of ctx.add
ing the input_set
(Side note: #401), we should remove any unvalidated
inputs from the input_set
(probably would be good to add a method to BaseInputSet
for this which takes the Input
to be removed, then based on uid
searches through its internal list and removes the matching one). The first argument in the call to ctx.add
should now be a tuple, the first element being an input set containing the unvalidated inputs (so add them to a new set as you remove them), and the second being the valid input set. If there are no invalid inputs, make None
the first element of the tuple.
Lines 250 to 254 in ad18dcc
# Add the input set to the incoming inputs | |
async with self.input_notification_set[handle_string]() as ctx: | |
await ctx.add( | |
input_set, [item async for item in input_set.inputs()] | |
) |
Also, we need to make sure that Input
s which are not validated
do not get mingled with the rest of the inputs (so they don't end up in any permutations). By adding the remove
method to the input set then the input won't show up as an item
in this loop and everything should be okay.
Lines 263 to 278 in ad18dcc
async for item in input_set.inputs(): | |
# Create set for item definition if not present | |
if ( | |
not item.definition | |
in self.ctxhd[handle_string].definitions | |
): | |
self.ctxhd[handle_string].definitions[item.definition] = [] | |
# Add input to by defintion set | |
self.ctxhd[handle_string].definitions[item.definition].append( | |
item | |
) | |
# Create set for item origin if not present | |
if not item.origin in self.ctxhd[handle_string].by_origin: | |
self.ctxhd[handle_string].by_origin[item.origin] = [] | |
# Add input to by origin set | |
self.ctxhd[handle_string].by_origin[item.origin].append(item) |
MemoryOrchestrator
In run_operations_for_ctx
when we are waiting for new input sets, we'll now use the modification we did to ctx.add
. The loop will need to be modified to be for unvalidated_input_set, new_input_set in new_input_sets:
You'll want to add another function similar to dispatch
(and maybe run_dispatch
) to grab the result of running the operation. Remember that we can only use single input / output operations here. So figure out what the name of the input should be and make it that for the inputs
dict, and then do the same for the outputs, figure out the name and then extract that and use it as the value. You won't want to use the run_dispatch
function because that creates inputs which assume they aren't valid if their is a validation parameter. Maybe just add an optional parameter to run_dispatch
to make it so that when it creates the Input
instances it sets their validated
properties to True
.
Lines 1361 to 1398 in ad18dcc
elif task is input_set_enters_network: | |
( | |
more, | |
new_input_sets, | |
) = input_set_enters_network.result() | |
for new_input_set in new_input_sets: | |
# Identify which operations have complete contextually | |
# appropriate input sets which haven't been run yet | |
async for operation, parameter_set in self.nctx.operations_parameter_set_pairs( | |
self.ictx, | |
self.octx, | |
self.rctx, | |
ctx, | |
self.config.dataflow, | |
new_input_set=new_input_set, | |
): | |
# Add inputs and operation to redundancy checker before | |
# dispatch | |
await self.rctx.add(operation, parameter_set) | |
# Dispatch the operation and input set for running | |
dispatch_operation = await self.nctx.dispatch( | |
self, operation, parameter_set | |
) | |
dispatch_operation.operation = operation | |
dispatch_operation.parameter_set = ( | |
parameter_set | |
) | |
tasks.add(dispatch_operation) | |
self.logger.debug( | |
"[%s]: dispatch operation: %s", | |
ctx_str, | |
operation.instance_name, | |
) | |
# Create a another task to waits for new input sets | |
input_set_enters_network = asyncio.create_task( | |
self.ictx.added(ctx) | |
) | |
tasks.add(input_set_enters_network) |
…dataflow policy engine: README: Add inital sketch Related: w3c/vc-jose-cose#51 Related: #1400 Related: #1315 Related: #476 Related: #349 Related: #382 Signed-off-by: John Andersen <[email protected]>
If
validation
parameter of definition is set to an operation's instance name within a dataflow use the output of that operation as the validation.The text was updated successfully, but these errors were encountered: