Skip to content

Commit

Permalink
Post event stream (#363)
Browse files Browse the repository at this point in the history
* [timeseries][example] with many jobs

to reproduce front end performance issues

* [timeseries] bump react-select to 2.3.0

as it appears to be much performant with many items
looks like w/o much changes, but in case: https://react-select.com/upgrade-guide

* [timeseries][jobs] improvements

- avoid reloading jobs_definition too often
- update screen only if paused jobs changed (fixes the popup disappearing while opening it)
- force refresh on job action (prevents inconsistent ui / backend states)

* [timeseries] downgrade react-select to 1.3.0

as v2 requires much more changes :/ should be better though

* [timeseries][example] with many jobs fix

at some point it overflows

* [timeseries][jobs] fix filtering / ordering (props) and initial loading

* [examples] move TestTimeSeriesWithManyJobs to test

Also, include Tests in writeClasspath for loop support

* [examples] TestTimeSeriesWithManyJobs long ids / hundreds of jobs tags

* [timeseries] Add PostEventSource

to replace eventsource (GET) with poller (POST)

* [timeseries] PostEventSource: statistics

* [timeseries] PostEventSource: executions

* [timeseries] PostEventSource: calendar

* [timeseries] PostEventSource: calendar/focus

* [timeseries] js const vs let

* [timeseries] extract calendar/focus logic and support GET and POST

in order not to break external dependencies on calendar/focus

  • Loading branch information
vguerci authored and dufrannea committed Feb 19, 2019
1 parent 6a286f5 commit 23464af
Show file tree
Hide file tree
Showing 12 changed files with 548 additions and 385 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ lazy val commonSettings = Seq(
devMode := Option(System.getProperty("devMode")).isDefined,
writeClasspath := {
val f = file(s"/tmp/classpath_${organization.value}.${name.value}")
val classpath = (fullClasspath in Runtime).value
val classpath = (fullClasspath in Test).value
IO.write(f, classpath.map(_.data).mkString(":"))
streams.value.log.info(f.getAbsolutePath)
f
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Test: Cuttle timeseries with many jobs!

// This a very simple cuttle project using the time series scheduler
// to execute a lot of jobs to do load tests
package com.criteo.cuttle.examples

import com.criteo.cuttle._
import com.criteo.cuttle.timeseries._
import java.time.ZoneOffset.UTC
import java.time._

import scala.concurrent.Future
import scala.concurrent.duration._

object TestTimeSeriesWithManyJobs {

def main(args: Array[String]): Unit = {

val start: Instant = LocalDate.now.atStartOfDay.toInstant(UTC)

val jobs: Workflow = (1 to 1500).toArray
.map({ i =>
Job(s"hello-with-a-relatively-long-id-just-for-the-fun-to-break-things$i",
daily(UTC, start),
s"Hello $i",
tags = Set(Tag("hello"), Tag(s"hello-${i / 100}xx"))) { implicit e =>
val partitionToCompute = e.context.start + "-" + e.context.end
e.streams.info(s"Hello $i for $partitionToCompute")
Future.successful(Completed)
}
})
.foldLeft(Workflow.empty)(_ and _)

val world: Job[TimeSeries] = Job("world", daily(UTC, start), "World", tags = Set(Tag("world"))) { implicit e =>
e.streams.info("World!")
e.park(1.seconds).map(_ => Completed)
}

CuttleProject("Hello World", version = "123", env = ("dev", false)) {
world dependsOn jobs
}.start(logsRetention = Some(1.minute))
}
}
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
"react-motion-ui-pack": "^0.10.2",
"react-paginate": "^4.4.2",
"react-redux": "^5.0.2",
"react-select": "^1.0.0-rc.5",
"react-select": "^1.3.0",
"react-tooltip": "^3.3.0",
"react-vega-lite": "1.1.2",
"redux": "^3.6.0",
Expand Down
37 changes: 37 additions & 0 deletions timeseries/src/main/javascript/Utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,40 @@ export const listenEvents = (
}
};
};

export class PostEventSource {
constructor(url, body) {
this._url = url;
this._observers = [];
this._body = body;
}
stopPolling() {
this._poller && clearTimeout(this._poller);
}
startPolling() {
if (this._poller) return;
this._poller = 1;

const poll = () =>
fetch(this._url, {
includeCredentials: true,
method: "POST",
body: JSON.stringify(this._body)
})
.then(data => data.json())
.then(
data => {
this._observers.forEach(o => o({ data }));
this._poller = setTimeout(() => poll(), 5000);
},
err => {
this._poller = setTimeout(() => poll(), 15000);
}
);

poll();
}
onmessage(observer) {
this._observers.push(observer);
}
}
27 changes: 16 additions & 11 deletions timeseries/src/main/javascript/app/pages/Calendar.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import classNames from "classnames";
import { navigate } from "redux-url";
import injectSheet from "react-jss";
import moment from "moment";
import _ from "lodash";
import isEqual from "lodash/isEqual";

import { Calendar as MiniCalendar } from "react-calendar";
import Spinner from "../components/Spinner";

import { listenEvents } from "../../Utils";
import { PostEventSource } from "../../Utils";

type Props = {
classes: any,
Expand All @@ -30,7 +30,7 @@ type Day = {

type State = {
data: ?Array<Day>,
query: ?string,
query: ?any,
eventSource: ?any
};

Expand All @@ -45,16 +45,21 @@ class Calendar extends React.Component<Props, State> {
}

listenForUpdates(props: Props) {
let jobsFilter = props.selectedJobs.length
? `&jobs=${props.selectedJobs.join(",")}`
: "";
let query = `/api/timeseries/calendar?events=true${jobsFilter}`;
if (this.state.query != query) {
this.state.eventSource && this.state.eventSource.close();
let eventSource = listenEvents(query, this.updateData.bind(this));
const { selectedJobs } = props;
const { query } = this.state;
const newQuery = { jobs: selectedJobs };
let { eventSource } = this.state;
if (!isEqual(newQuery, query)) {
eventSource && eventSource.stopPolling();
eventSource = new PostEventSource("/api/timeseries/calendar", newQuery);
eventSource.onmessage(json => {
this.updateData(json.data);
});
eventSource.startPolling();
this.setState({
...this.state,
data: null,
query: newQuery,
eventSource
});
}
Expand All @@ -72,7 +77,7 @@ class Calendar extends React.Component<Props, State> {

componentWillUnmount() {
let { eventSource } = this.state;
eventSource && eventSource.close();
eventSource && eventSource.stopPolling();
}

updateData(json: Array<Day>) {
Expand Down
66 changes: 41 additions & 25 deletions timeseries/src/main/javascript/app/pages/CalendarFocus.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import Spinner from "../components/Spinner";

import type { Workflow } from "../../datamodel";

import { listenEvents } from "../../Utils";
import { PostEventSource } from "../../Utils";

type Props = {
classes: any,
Expand Down Expand Up @@ -71,7 +71,7 @@ type Stats = {

type State = {
data: ?Stats,
query: ?string,
query: ?any,
eventSource: ?any,
showVersion: boolean
};
Expand Down Expand Up @@ -127,12 +127,17 @@ let getMaxLabelWidth = (jobNames, svg, jobNameClass) => {
};

let getLabelForStatus = status => {
switch(status) {
case "failed": return "stuck"
case "successful": return "done"
case "running": return "started"
case "paused": return "paused"
default: return "todo"
switch (status) {
case "failed":
return "stuck";
case "successful":
return "done";
case "running":
return "started";
case "paused":
return "paused";
default:
return "todo";
}
};

Expand All @@ -149,7 +154,9 @@ const summaryPeriodHelper = (x1, x2) => ({
// drawing methods // job details
const jobPeriodsHelper = (x1, x2, showExecutions, drillDown) => ({
tip: ({ period, status, jobName }) =>
`<div>${jobName} is ${getLabelForStatus(status)}${formatDate(period.start)} to ${formatDate(period.end)} UTC</div>`,
`<div>${jobName} is ${getLabelForStatus(status)}${formatDate(
period.start
)} to ${formatDate(period.end)} UTC</div>`,
translate: ({ period }) => `translate(${x1(period) + MARGIN}, 0)`,
width: ({ period }) => x2(period) - x1(period),
fill: ({ status }) =>
Expand All @@ -161,9 +168,9 @@ const jobPeriodsHelper = (x1, x2, showExecutions, drillDown) => ({
? "#ffbc5a"
: status == "running"
? "#49d3e4"
: status == "paused"
? "#ffaaff"
: "#ecf1f5",
: status == "paused"
? "#ffaaff"
: "#ecf1f5",
// For aggregated periods, we want to zoom on click
click: ({ period, jobId, aggregated }) =>
aggregated
Expand Down Expand Up @@ -310,20 +317,27 @@ class CalendarFocus extends React.Component<Props, State> {
}

listenForUpdates(props: Props) {
let { start, end } = props;
let jobsFilter =
props.selectedJobs.length > 0
? `&jobs=${props.selectedJobs.join(",")}`
: "";
let query = `/api/timeseries/calendar/focus?start=${moment(
start
).toISOString()}&end=${moment(end).toISOString()}${jobsFilter}`;
let { query: currentQuery, eventSource: currentEventSource } = this.state;
if (currentQuery != query) {
currentEventSource && currentEventSource.close();
let eventSource = listenEvents(query, this.updateData.bind(this));
const { selectedJobs, start, end } = props;
const { query } = this.state;
const newQuery = {
jobs: selectedJobs,
start: moment(start).toISOString(),
end: moment(end).toISOString()
};
let { eventSource } = this.state;
if (!_.isEqual(newQuery, query)) {
eventSource && eventSource.stopPolling();
eventSource = new PostEventSource(
"/api/timeseries/calendar/focus",
newQuery
);
eventSource.onmessage(json => {
this.updateData(json.data);
});
eventSource.startPolling();
this.setState({
data: null,
query: newQuery,
eventSource
});
}
Expand Down Expand Up @@ -601,7 +615,9 @@ class CalendarFocus extends React.Component<Props, State> {
</h1>
<div>
<div className={classes.showVersion}>
<label className={classes.showVersionLabel}>Show versions for each execution</label>
<label className={classes.showVersionLabel}>
Show versions for each execution
</label>
<input
name="versionCheckBox"
type="checkbox"
Expand Down
Loading

0 comments on commit 23464af

Please sign in to comment.