Skip to content

Commit

Permalink
Merge pull request #2117 from linas/gah
Browse files Browse the repository at this point in the history
Implement a parallelizable query
  • Loading branch information
linas authored Mar 1, 2019
2 parents 101697f + 1b7ead6 commit 4bb63a5
Show file tree
Hide file tree
Showing 18 changed files with 657 additions and 160 deletions.
1 change: 1 addition & 0 deletions examples/pattern-matcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ are the logics of theorem-proving, in general.)
* `presence.scm` -- Testing for the presence of an Atom.
* `absent.scm` -- Using the AbsentLink.
* `value-of.scm` -- Looking for high or low TruthValues.
* `query.scm` -- Running queries in parallel.


Pattern Recognition
Expand Down
196 changes: 196 additions & 0 deletions examples/pattern-matcher/query.scm
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
;
; query.scm -- Parallel queries (parallel pattern matching)
;
; QueryLink usage example.
;
; The QueryLink and the BindLink are both very similar; both search
; the AtomSpace for groundings of the query pattern, and then perform
; a re-write, based on the results. The only difference between the two
; is that the BindLink returns a SetLink containing the results, whereas
; the QueryLink returns a LinkValue containing the results. This makes
; the QueryLink a bit nicer, because it does not pollute the AtomSpace
; with nearly-useless SetLinks.
;
; Although both can be run in parallel (i.e. run in different threads),
; there's almost no point to doing so for the BindLink, since you have
; to wait for it to complete, and provide you with the resulting
; SetLink. By contrast, the QueryLink can drop off results at a
; "well-known location" in the AtomSpace, as they appear, so that
; processing can happen in parallel: processing can start on some
; results, even while others are still being found.
;
; This example uses an AnchorNode to establish a "well-known location",
; a QueryLink to attache them there, and a DeleteLink to detach results
; from the AnchorNode. The ParallelLink is used to run multiple threads,
; and SleepLink to slow it down enough to see what is happening.
;
; Taken as a whole, it demos a toy parallel processing pipeline.
;

(use-modules (opencog) (opencog exec))

; -------------
; Create three bits of "knowledge".
(Evaluation
(Predicate "foobar") (List (Concept "funny") (Concept "thing")))
(Evaluation
(Predicate "foobar") (List (Concept "funny") (Concept "story")))
(Evaluation
(Predicate "foobar") (List (Concept "funny") (Concept "joke")))

; -------------
; Define a simple query. It looks for the funny stuff, and attaches
; the result to an AnchorNode
(define query
(Query
(TypedVariable (Variable "$x") (Type 'ConceptNode))
(Evaluation
(Predicate "foobar")
(List (Concept "funny") (Variable "$x")))
(ListLink
(Anchor "*-query results-*")
(Implication (Variable "$x") (Concept "laughable")))
))

; Actually run it - this should return all of the results, wrapped
; in a LinkValue.
(cog-execute! query)

; Take a look at the incoming set of the anchor point, and verify
; that the expected content is there.
(cog-incoming-set (Anchor "*-query results-*"))

; -------------
; Define a second stage to the processing pipeline
(define absurd
(Query
(TypedVariable (Variable "$x") (Type 'ConceptNode))
(And
; Search at the earlier anchor point.
(Present (ListLink
(Anchor "*-query results-*")
(Implication (Variable "$x") (Concept "laughable"))))

; Immediately detach from that anchor, by deleting the
; ListLink that couples the two together.
(True (Delete (ListLink
(Anchor "*-query results-*")
(Implication (Variable "$x") (Concept "laughable"))))))

; After matching the above, create an attachment to the
; second stage anchor point.
(ListLink
(Anchor "*-risible results-*")
(Implication (Variable "$x") (Concept "ludicrous")))
))

; Run the query. See what happens.
(cog-execute! absurd)

; Verify that the old anchor point has been vacated, as expected.
(cog-incoming-set (Anchor "*-query results-*"))

; Verify that the results are now at the new anchor
(cog-incoming-set (Anchor "*-risible results-*"))

; -------------
; Now define a third stage of processing. This will generate output,
; i.e. it will print something to stdout.
;
(define (report-stuff NODE-A NODE-B)
(format #t "I think that ~A is ~A. -- ~A\n"
(cog-name NODE-A) (cog-name NODE-B)
(strftime "%c" (localtime (current-time)))
)
(SimpleTruthValue 1 1))

(define output
(Query
(VariableList
(TypedVariable (Variable "$x") (Type 'ConceptNode))
(TypedVariable (Variable "$y") (Type 'ConceptNode)))
(And
; Search at the earlier anchor point.
(Present (ListLink
(Anchor "*-risible results-*")
(Implication (Variable "$x") (Variable "$y"))))

; Immediately detach from that anchor, by deleting the
; ListLink that couples the two together.
(True (Delete (ListLink
(Anchor "*-risible results-*")
(Implication (Variable "$x") (Variable "$y"))))))

; After matching the above, print a report.
(ExecutionOutput
(GroundedSchema "scm:report-stuff")
(ListLink (Variable "$x") (Variable "$y")))
))

; Run it. Verify that it works.
(cog-execute! output)

; Run it a second time, verify that all inputs have been consumed.
(cog-execute! output)

; Double-check that inputs have been consumed, by looking at the anchor
; point.
(cog-incoming-set (AnchorNode "*-risible results-*"))

; -------------
; Now, assemble an automated processing pipeline.

; Print the current time
(define (prti N)
(format #t "Thread ~A. -- ~A\n" (cog-name N)
(strftime "%c" (localtime (current-time))))
(SimpleTruthValue 1 1))

; Atom to print the current time
(define (prtime STR)
(Evaluation
(GroundedPredicate "scm:prti")
(Concept STR)))

; When executed, this launches three threads, and returns to the
; caller without any delay. The threads run to conclusion, and
; then quietly exit.
(define threads
(Parallel
(SequentialAnd
(prtime "step one A")
(True query)
(True (Sleep (Number 4)))
(prtime "step one B")
(True query)
(True (Sleep (Number 4)))
(prtime "step one C")
(True query))

(SequentialAnd
(True (Sleep (Number 1)))
(prtime "step two A")
(True absurd)
(True (Sleep (Number 4)))
(prtime "step two B")
(True absurd)
(True (Sleep (Number 4)))
(prtime "step two C")
(True absurd))

(SequentialAnd
(True (Sleep (Number 2)))
(prtime "step three A")
(True output)
(True (Sleep (Number 4)))
(prtime "step three B")
(True output)
(True (Sleep (Number 4)))
(prtime "step three C")
(True output))
))

; Run the multi-threaded pipeline. This should print some fairly verbose
; messages, which hopefully makes clear what is going on.
;
; (cog-execute! threads)
11 changes: 7 additions & 4 deletions opencog/atoms/atom_types/atom_types.script
Original file line number Diff line number Diff line change
Expand Up @@ -355,18 +355,21 @@ PATTERN_LINK <- PRENEX_LINK
// executed. The distinction is made for the benefit of the C++ code,
// so that it can dispatch appropriately, based on the base type.

// Finds all groundings, return TV
// Finds all groundings, return binary truth value (true/false)
SATISFACTION_LINK <- PATTERN_LINK,EVALUATABLE_LINK
// Finds all groundings, return LinkValue
SATISFYING_LINK <- PATTERN_LINK,EVALUATABLE_LINK
// Finds all groundings, returns generic Value
SATISFYING_LINK <- PATTERN_LINK

// The GetLink is almost exactly the same thing as a SatsifyingSetLink,
// except that GetLink is imperative, while SatisfyingSetLink is
// declarative. Likewise, BindLink is exactly the same thing as an
// ImplicationLink, except that its imperative, not declarative.
// Both return SetLinks holding the results.
// QueryLink is identical to BindLink, except it returns a LinkValue
// holding the result, instead of a SetLink. (Less atomspace pollution).
GET_LINK <- SATISFYING_LINK // Finds all groundings, returns them
BIND_LINK <- SATISFYING_LINK // Finds all groundings, substitutes.
QUERY_LINK <- SATISFYING_LINK // Finds all groundings, substitutes.
BIND_LINK <- QUERY_LINK // Finds all groundings, substitutes.

// Adjoint to the GetLink. This is "adjoint" in the sense that the roles
// of the pattern and the grounding are reversed: given a grounding, the
Expand Down
2 changes: 1 addition & 1 deletion opencog/atoms/execution/EvaluationLink.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ TruthValuePtr EvaluationLink::do_eval_scratch(AtomSpace* as,
{
Instantiator inst(as);
Handle result(HandleCast(inst.execute(term, silent)));
scratch->add_atom(result);
if (result) scratch->add_atom(result);
}
}
if (TRUE_LINK == t)
Expand Down
17 changes: 15 additions & 2 deletions opencog/atoms/execution/Instantiator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -536,8 +536,8 @@ bool Instantiator::not_self_match(Type t)
* added to the atomspace, and its handle is returned.
*/
ValuePtr Instantiator::instantiate(const Handle& expr,
const HandleMap &vars,
bool silent)
const HandleMap &vars,
bool silent)
{
// throw, not assert, because this is a user error ...
if (nullptr == expr)
Expand Down Expand Up @@ -592,6 +592,19 @@ ValuePtr Instantiator::instantiate(const Handle& expr,
return pap;
}

// If there is a SatisfyingLink, we have to perform it
// and return the satisfying set.
if (nameserver().isA(t, SATISFYING_LINK))
{
return expr->execute(_as, silent);
}

// The thread-links are ambiguously executable/evaluatable.
if (nameserver().isA(t, PARALLEL_LINK))
{
return ValueCast(EvaluationLink::do_evaluate(_as, expr, silent));
}

// Instantiate.
Handle grounded(walk_tree(expr, silent));

Expand Down
6 changes: 3 additions & 3 deletions opencog/atoms/execution/Instantiator.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ class Instantiator
}

ValuePtr instantiate(const Handle& expr,
const HandleMap& vars,
bool silent=false);
const HandleMap& vars,
bool silent=false);
ValuePtr execute(const Handle& expr, bool silent=false)
{
// If no actual instantiation is involved, then do not consume
// quotations, as it might change the semantics. (??)
// quotations, as it might change the semantics. (Huh ??)
_consume_quotations = false;
return instantiate(expr, HandleMap(), silent);
}
Expand Down
Loading

0 comments on commit 4bb63a5

Please sign in to comment.