Skip to content

Commit

Permalink
More on #189
Browse files Browse the repository at this point in the history
  • Loading branch information
enridaga committed Jan 20, 2022
1 parent b63400e commit 754f44d
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public FileStreamIndex(){

}

public synchronized void add(Quad q){
public void add(Quad q){

if(completed == true) {
throw new RuntimeException();
Expand All @@ -84,7 +84,7 @@ public synchronized void add(Quad q){
add(spo_index, Triple.of(q.getSubject(),q.getPredicate(),q.getObject()), q);
}

public synchronized void add(Map index, Object key, Quad value){
public void add(Map index, Object key, Quad value){
safeGetIndex(index, key).add(value);
// Generate Union Graph Equivalent
Quad uq = new Quad(UNIONGRAPH, value.getSubject(), value.getPredicate(), value.getObject());
Expand All @@ -94,10 +94,12 @@ public synchronized void add(Map index, Object key, Quad value){
}

private List<Quad> safeGetIndex(Map ix, Object key){
if(!ix.containsKey(key)){
ix.put(key, new ArrayList<>());
synchronized (ix) {
if (!ix.containsKey(key)) {
ix.put(key, new ArrayList<>());
}
return (List<Quad>) ix.get(key);
}
return (List<Quad>) ix.get(key);
}

private List<Quad> selectIndex(Quad target){
Expand Down Expand Up @@ -150,41 +152,65 @@ private List<Quad> selectIndex(Quad target){

return index;
}
public synchronized Iterator<Quad> find (Node graph, Node s, Node p, Node o ) {
public Iterator<Quad> find (Node graph, Node s, Node p, Node o ) {
final Quad target = new Quad(graph, s,p,o);
final List<Quad> ix = selectIndex(target);
return new Iterator<Quad>() {
int x = 0;
Quad next = null;
@Override
public boolean hasNext() {
if(index.contains(target) && next == null){
next = target;
return true;
}else if(index.contains(target)){
return false;
}
// Otherwise, iterate over index
while(!isCompleted() || x < ix.size() ){
if(ix.size() <= x){
// Wait
continue;
}
Quad qq = ix.get(x);
x++;
if(target.getGraph().matches(qq.getGraph())
&& target.getSubject().matches(qq.getSubject()) &&
target.getPredicate().matches(qq.getPredicate()) &&
target.getObject().matches(qq.getObject())){
next = qq;
L.trace("hasNext() seeking target: {}", target);
synchronized (ix) {
if (ix.contains(target) && next == null) {
next = target;
return true;
} else if (ix.contains(target)) {
return false;
}
// Otherwise, iterate over index
while (!isCompleted() || x < ix.size()) {
if (ix.size() <= x) {
// Wait
continue;
}
Quad qq = ix.get(x);
x++;

boolean mg = false;
boolean ms = false;
boolean mp = false;
boolean mo = false;
if (!target.getGraph().isConcrete() || target.getGraph().matches(qq.getGraph())){
// If target graph is variable or [], only match with qq not in Union Graph
if(!target.getGraph().isConcrete() && qq.getGraph().matches(UNIONGRAPH)){
mg = false;
} else {
mg = true;
}
}
if (!target.getSubject().isConcrete() || target.getSubject().matches(qq.getSubject())){
ms = true;
}
if (!target.getPredicate().isConcrete() || target.getPredicate().matches(qq.getPredicate())){
mp = true;
}
if (!target.getObject().isConcrete() || target.getObject().matches(qq.getObject())){
mo = true;
}
if(mg && ms && mp && mo){
next = qq;
L.trace("hasNext() {}", next);
return true;
}
}
}
return false;
}

@Override
public Quad next() {
L.trace("next() {}", next);
Quad ret = next;
next = null;
return ret;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,34 @@ public FileStreamManager(Context context, Op op, Properties properties, FileStre
this.triplifier = triplifier;
}

private Iterator<Quad> streamFromFile(Quad target){
private void streamFromFile(Quad target){
// Run Triplifier, intercept triples which are useful to answer the pattern, return them as quads
// To store indexes of relevant quads
index = new FileStreamIndex(); // Collections.synchronizedList(new ArrayList<Object>());
// To stream the currently requested Quads
LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>();
StreamQuadHandler handler = new StreamQuadHandler(properties, target, op, buffer, index);
FileStreamer streamer = new FileStreamer(properties, triplifier, buffer, index, handler);
// LinkedBlockingQueue<Object> buffer = new LinkedBlockingQueue<Object>();
StreamQuadHandler handler = new StreamQuadHandler(properties, target, op, index);
FileStreamer streamer = new FileStreamer(properties, triplifier, index, handler);
Thread worker = new Thread(streamer);
log.debug("Starting thread to seek {}", target);
worker.start();
return new FileStreamQuadIterator(buffer);
//return new FileStreamQuadIterator(buffer);
}

public Iterator<Quad> find(Node g, Node s, Node p, Node o){
Quad target = new Quad(g, s, p, o);
if (streamInProgress) {
return index.find(g,s,p,o);
} else {
log.debug("find {}", target);
if (!streamInProgress) {
//
// } else {
streamInProgress = true;
return streamFromFile(target);
log.debug("start reading file {}", target);
streamFromFile(target);
}
log.debug("stream in progress {}", target);
Iterator<Quad> it = index.find(g,s,p,o);
log.debug("iterator ready {}", it);
return it;
}

public List<String> getDataSourceIds(){
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,28 +32,27 @@ public class FileStreamer implements Runnable {
private final Properties properties;
private final Triplifier triplifier;
private final StreamQuadHandler handler;
private final LinkedBlockingQueue<Object> buffer;
// private final LinkedBlockingQueue<Object> buffer;
private final FileStreamIndex index;

public FileStreamer(Properties properties, Triplifier triplifier, LinkedBlockingQueue<Object> buffer, FileStreamIndex index, StreamQuadHandler handler){
public FileStreamer(Properties properties, Triplifier triplifier, FileStreamIndex index, StreamQuadHandler handler){
this.properties = properties;
this.triplifier = triplifier;
this.handler = handler;
this.buffer = buffer;
// this.buffer = buffer;
this.index = index;
}
@Override
public void run() {
try {
log.debug("start seeking quad: {}", handler.getTarget());
triplifier.triplify(properties, handler);
buffer.put(FileStreamQuadIterator.ENDSIGNAL);
index.setCompleted();

if(log.isDebugEnabled()) {
log.debug("finished seeking quad ({} found): {}", handler.debug, handler.getTarget());
log.debug("streaming started, seeking quad: {}", handler.getTarget());
}
} catch (IOException | TriplifierHTTPException | InterruptedException e) {
triplifier.triplify(properties, handler);
// buffer.put(FileStreamQuadIterator.ENDSIGNAL);
index.setCompleted();
log.debug("streaming finished");
} catch (IOException | TriplifierHTTPException e) {
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,18 @@
*/
public class StreamQuadHandler extends TripleFilteringFacadeXGraphBuilder {
protected static final Logger log = LoggerFactory.getLogger(StreamQuadHandler.class);
private LinkedBlockingQueue<Object> queue;
// private LinkedBlockingQueue<Object> queue;
private static final Node unionGraph = NodeFactory.createURI("urn:x-arq:UnionGraph");
public int debug = 0;
private Quad target;
private FileStreamIndex index;

protected StreamQuadHandler(Properties properties, Quad target, Op op, LinkedBlockingQueue<Object> queue, FileStreamIndex index) {
protected StreamQuadHandler(Properties properties, Quad target, Op op, FileStreamIndex index) {
super(Triplifier.getResourceId(properties), op, DatasetGraphFactory.create(), properties);
this.queue = queue;
// this.queue = queue;
this.target = target;
this.index = index;
}

// public Node getRoot(){
// return root;
// }

public Quad getTarget(){
return target;
}
Expand All @@ -67,19 +62,14 @@ public Quad getTarget(){
@Override
public boolean add(Node graph, Node subject, Node predicate, Node object) {
Quad q = new Quad(graph, subject, predicate, object);
if(log.isDebugEnabled()){
log.trace("{} matches ", q);
debug++;
}

if (match(graph, subject, predicate, object)) {
// Relevant to any of following invocations
try {
// try {
index.add(q);
queue.put(q);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// queue.put(q);
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
return true;
}
return false;
Expand Down

0 comments on commit 754f44d

Please sign in to comment.