Skip to content

Execution Semantics

Riccardo Tommasini edited this page Jan 2, 2021 · 5 revisions

Stream Processing Engine

This package contains all the necessary abstractions to define the exeuction semantics of an RSP engine It wraps the RSP-QL portion that is based on the SECRET MODEL

Content

This package is used to specify the content of the window as for the SECRET definition. the method coalesce() provides a time-agnostic representation of the data inside the window, as it is required by the SECRET MODEL.

In the below implementation, the status is maintained as a List whose content is merged together upon request.

public class ContentGraph implements Content<Graph> {
    private List<Graph> elements;

 @Override
    public Graph coalesce() {
        if (elements.size() == 1)
            return elements.get(0);
        else {
            Graph g = RDFUtils.createGraph();
            elements.stream().flatMap(Graph::stream).forEach(g::add);
            return g;
        }
    }

}

Report

This package is used to describe the reporting approach adopted by a given SPE w.r.t. the windowing mechanism it implements.

To represent SECRET definition of the report, we decoupled report implementation and the strategies it employs.

public class ReportImpl implements Report {
    List<ReportingStrategy> strategies = new ArrayList<>();
    @Override
    public boolean report(Window w, Content c, long application_time, long system_time) {
        return strategies.stream().allMatch(strategy -> strategy.match(w, c, tapp, tsys));
    }

}

In the SECRET MODEL, the reporting approach is the result of the combination of four strategies.

  • ON CONTENT CHANGE, the content is reported anytime it changes
  • ON WINDOW CLOSE, the content is reported when the active window closes
  • PERIODIC, the content is reported with a period p
  • NON-EMPTY CONTENT, only non-empty content is reported

To make the reporting extensible, we defined the ReportingStrategy as an interface, and we included the aforementioned one.

/**
 * Window close (Rwc): reporting is done for t
 * only when the active window closes (i.e., |Scope(t)| = w ).
 **/
public class OnWindowClose implements ReportingStrategy {
    @Override
    public boolean match(Window w, Content c, long tapp, long tsys) {
        return w.getC() < tapp;
    }
}

Tick

The Tick dimension in our model defines the condition which drives an SPE to take action on its input (also referred to as “window state change” or “window re-evaluation” ).

Like Report, Tick is also part of a system’s internal execution model. While some systems react to individual tuples as they arrive, others collectively react to all or subsets of tuples with the same tapp value. RSP-QL, like SECRET, identifies three main ways that different systems “tick”:

  • tuple-driven, where each tuple arrival causes a system to react;
  • time-driven, where the progress of tapp causes a system to react;
  • batch-driven, where either a new batch arrival or the progress of tapp causes a system to react.

An enumeration ``it.polimi.deib.sr.rsp.api.enums.Tick``` enlits the default ticks options for parsing purposes.

public interface Ticker {
    void tick(long t_e, Window w);

}

All the default Tick definition are implemented in YASPER using the Ticker interface. Moreover, the Tickerinterface (see above) allows RSP4J's users to implement custom tick logics, i.e.,

Tuple Ticker

The tuple ticker triggers the operator evaluation every time a new item arrives.

@RequiredArgsConstructor
public class TupleTicker implements Ticker {

    private final StreamToRelationOp<?, ?> op;

    @Override
    public void tick(long t_e, Window w) {
        op.compute(t_e, w);
    }
}

Time Ticker

The time ticker triggers the operator evaluation when time advances.

@RequiredArgsConstructor
public class TimeTicker implements Ticker {

    private final StreamToRelationOp<?, ?> op;
    private final Time time;

    @Override
    public void tick(long t_e, Window w) {
        time.addEvaluationTimeInstants(new TimeInstant(t_e));
        if (t_e > time.getAppTime()) {
            op.compute(t_e, w);
        }
    }

}

Batch Ticker

The time ticker triggers the operator evaluation when a new batch passes.

@RequiredArgsConstructor
public class BatchTicker implements Ticker {

    private int curr = 0;
    protected final StreamToRelationOp<?, ?> op;

    @Setter
    private int batch;

    @Override
    public void tick(long t_e, Window w) {
        curr++;
        if (curr == batch) {
            op.compute(t_e, w);
            curr = 0;
        }
    }
}

Scope