Skip to content

CQELS SLIDING WINDOW OPERATOR

Riccardo Tommasini edited this page Jan 2, 2021 · 1 revision

Factory

public class CQELSTimeWindowOperatorFactory implements StreamToRelationOperatorFactory<Graph, Graph> {

    private final long a, t0;
    private final Time time;
    private final Tick tick;
    private final Report report;
    private final ReportGrain grain;
    private SDS<Graph> context;

    public CQELSTimeWindowOperatorFactory(long a, long t0, Time time, Tick tick, Report report, ReportGrain grain, SDS<Graph> context) {
        this.a = a;
        this.t0 = t0;
        this.time = time;
        this.tick = tick;
        this.report = report;
        this.grain = grain;
        this.context=context;
    }

    @Override
    public TimeVarying<Graph> apply(WebDataStream<Graph> s, IRI iri) {
        StreamToRelationOp<Graph, Graph> windowStreamToRelationOp = new CQELSStreamToRelationOp(iri, a, time, tick, report, grain);
        s.addConsumer(windowStreamToRelationOp);
        return windowStreamToRelationOp.set(this.context);
    }
}

Window Instances

public class WindowImpl implements Window {

    private long c, o;

    public WindowImpl(long o, long c) {
        this.o = o;
        this.c = c;
    }

    public long getC() {
        return c;
    }

    public long getO() {
        return o;
    }
...

Operator

public class CQELSStreamToRelationOp extends ObservableStreamToRelationOp<Graph, Graph> {

    private final long a;

    private Map<Window, Content<Graph, Graph>> windows;
    private Map<Graph, Long> r_stream;
    private Map<Graph, Long> d_stream;

    private Set<Window> to_evict;
    private long tc0;
    private long toi;


    public CQELSStreamToRelationOp(IRI iri, long a, Time instance, Tick tick, Report report, ReportGrain grain) {
        super(iri, instance, tick, report, grain);
        this.a = a;
        this.tc0 = instance.getScope();
        this.toi = 0;
        this.windows = new HashMap<>();
        this.to_evict = new HashSet<>();
        this.r_stream = new HashMap<>();
        this.d_stream = new HashMap<>();
    }

    @Override
    public Time time() {
        return time;
    }

    @Override
    public Content<Graph, Graph> getContent(long t_e) {
        Optional<Window> max = windows.keySet().stream()
                .filter(w -> w.getO() < t_e && w.getC() <= t_e)
                .max(Comparator.comparingLong(Window::getC));

        if (max.isPresent())
            return windows.get(max.get());

        return new EmptyGraphContent();
    }

    @Override
    public List<Content<Graph, Graph>> getContents(long t_e) {
        return windows.keySet().stream()
                .filter(w -> w.getO() <= t_e && t_e < w.getC())
                .map(windows::get).collect(Collectors.toList());
    }


    protected void windowing(Graph e, long ts) {
        log.debug("Received element (" + e + "," + ts + ")");
        long t_e = ts;

        if (time.getAppTime() > t_e) {
            log.error("OUT OF ORDER NOT HANDLED");
            throw new OutOfOrderElementException("(" + e + "," + ts + ")");
        }

        Window active = scope(t_e);
        Content<Graph, Graph> content = windows.get(active);

        r_stream.entrySet().stream().filter(ee -> ee.getValue() < active.getO()).forEach(ee -> d_stream.put(ee.getKey(), ee.getValue()));

        r_stream.entrySet().stream().filter(ee -> ee.getValue() >= active.getO()).map(Map.Entry::getKey).forEach(content::add);

        r_stream.put(e, ts);
        content.add(e);

        if (report.report(active, content, t_e, System.currentTimeMillis())) {
            ticker.tick(t_e, active);
        }


        //REMOVE ALL THE WINDOWS THAT CONTAIN DSTREAM ELEMENTS
        //Theoretically active window has always size 1
        d_stream.entrySet().forEach(ee -> {
            log.debug("Evicting [" + ee + "]");

            windows.forEach((window, content1) -> {
                if (window.getO() <= ee.getValue() && window.getC() < ee.getValue())
                    schedule_for_eviction(window);

            });

            r_stream.remove(ee);
        });

        to_evict.forEach(windows::remove);
        to_evict.clear();
    }

    private Window scope(long t_e) {
        long o_i = t_e - a;
        log.debug("Calculating the Windows to Open. First one opens at [" + o_i + "] and closes at [" + t_e + "]");
        log.debug("Computing Window [" + o_i + "," + (o_i + a) + ") if absent");

        WindowImpl active = new WindowImpl(o_i, t_e);
        windows.computeIfAbsent(active, window -> new ContentGraph());
        return active;
    }

    private void schedule_for_eviction(Window w) {
        to_evict.add(w);
    }

    public Content<Graph, Graph> compute(long t_e, Window w) {
        Content<Graph, Graph> content = windows.containsKey(w) ? windows.get(w) : new EmptyGraphContent();
        time.setAppTime(t_e);
        return setVisible(t_e, w, content);
    }


    @Override
    public TimeVaryingGraph set(SDS<Graph> content) {
        this.addObserver((Observer) content);
        //TODO Generalize the type of content using an ENUM
        return new TimeVaryingGraph(this, iri, RDFUtils.createGraph());
    }

}