diff --git a/.gitignore b/.gitignore
index d1738070c3f..e8db0d8f76b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,7 +21,7 @@ config/
# Compiled source #
###################
-*.com
+!*.com/
*.class
*.dll
*.exe
diff --git a/assembly/assembly-main/pom.xml b/assembly/assembly-main/pom.xml
index c446491c365..edaafef072a 100644
--- a/assembly/assembly-main/pom.xml
+++ b/assembly/assembly-main/pom.xml
@@ -37,6 +37,18 @@
assembly-wsmaster-war
war
+
+ org.eclipse.che
+ exec-agent
+ tar.gz
+ linux_amd64
+
+
+ org.eclipse.che
+ exec-agent
+ tar.gz
+ linux_arm7
+
org.eclipse.che.core
che-core-ide-stacks
@@ -62,18 +74,6 @@
che-tomcat8-slf4j-logback
zip
-
- org.eclipse.che.lib
- che-websocket-terminal
- tar.gz
- linux_amd64
-
-
- org.eclipse.che.lib
- che-websocket-terminal
- tar.gz
- linux_arm7
-
org.eclipse.che.plugin
che-plugin-sdk-tools
diff --git a/assembly/assembly-main/src/assembly/assembly.xml b/assembly/assembly-main/src/assembly/assembly.xml
index 4676e15a942..422cd3ec63a 100644
--- a/assembly/assembly-main/src/assembly/assembly.xml
+++ b/assembly/assembly-main/src/assembly/assembly.xml
@@ -70,7 +70,7 @@
lib/linux_amd64/terminal
websocket-terminal-linux_amd64.tar.gz
- org.eclipse.che.lib:che-websocket-terminal:tar.gz:linux_amd64
+ org.eclipse.che:exec-agent:tar.gz:linux_amd64
@@ -79,7 +79,7 @@
lib/linux_arm7/terminal
websocket-terminal-linux_arm7.tar.gz
- org.eclipse.che.lib:che-websocket-terminal:tar.gz:linux_arm7
+ org.eclipse.che:exec-agent:tar.gz:linux_arm7
diff --git a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java
index c2a8c4a09cb..3e694abc56f 100644
--- a/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java
+++ b/assembly/assembly-wsmaster-war/src/main/java/org/eclipse/che/api/deploy/WsMasterModule.java
@@ -104,6 +104,8 @@ protected void configure() {
bindConstant().annotatedWith(Names.named("machine.ws_agent.run_command"))
.to("export JPDA_ADDRESS=\"4403\" && ~/che/ws-agent/bin/catalina.sh jpda run");
+ bindConstant().annotatedWith(Names.named("machine.terminal_agent.run_command"))
+ .to("$HOME/che/terminal/che-websocket-terminal -addr :4411 -cmd ${SHELL_INTERPRETER} -static $HOME/che/terminal/");
bind(org.eclipse.che.api.workspace.server.WorkspaceValidator.class)
.to(org.eclipse.che.api.workspace.server.DefaultWorkspaceValidator.class);
diff --git a/exec-agent/.gitignore b/exec-agent/.gitignore
new file mode 100644
index 00000000000..7d245c33e26
--- /dev/null
+++ b/exec-agent/.gitignore
@@ -0,0 +1,2 @@
+logs/
+exec-agent
diff --git a/exec-agent/README.md b/exec-agent/README.md
new file mode 100644
index 00000000000..bc56c341fee
--- /dev/null
+++ b/exec-agent/README.md
@@ -0,0 +1,59 @@
+Summary
+---
+Golang based server for executing commands and streaming process output logs,
+also websocket-terminal.
+
+
+Requirements
+--
+- golang 1.6+
+
+
+Docs
+---
+- jsonrpc2.0 based [Webscoket API](docs/ws_api.md)
+- jsonrpc2.0 based [Events](docs/events.md)
+- [REST API](docs/rest_api.md)
+
+Development
+---
+
+##### Link the sources to standard go workspace
+
+```bash
+export CHE_PATH=~/code/che
+mkdir $GOPATH/src/github.com/eclipse/che -p
+ln -s $CHE_PATH/exec-agent/src $GOPATH/src/github.com/eclipse/che/exec-agent
+```
+
+##### Install godep
+```bash
+go get github.com/tools/godep
+```
+
+##### Get all dependencies
+
+```bash
+cd $GOPATH/src/github.com/eclipse/che/exec-agent
+$GOPATH/bin/godep restore
+```
+
+That's it, `$GOPATH/src/github.com/eclipse/che/exec-agent` project is ready.
+
+##### Building linked project
+
+```bash
+cd $GOPATH/src/github.com/eclipse/che/exec-agent && go build
+```
+
+##### Running linked project tests
+
+```bash
+cd $GOPATH/src/github.com/eclipse/che/exec-agent && go test ./...
+```
+
+##### Formatting linked project sources
+
+```bash
+cd $GOPATH/src/github.com/eclipse/che/exec-agent && go fmt ./...
+```
diff --git a/exec-agent/docs/events.md b/exec-agent/docs/events.md
new file mode 100644
index 00000000000..186e7dfaf07
--- /dev/null
+++ b/exec-agent/docs/events.md
@@ -0,0 +1,98 @@
+Events
+===
+Messages sent via websocket connections to clients
+
+Channel Events
+---
+
+#### Connected
+
+The first event in the channel, published when client successfully connected to the exec-agent.
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "connected",
+ "params": {
+ "time": "2016-09-24T16:40:05.098478609+03:00",
+ "channel": "channel-1",
+ "text": "Hello!"
+ }
+}
+```
+
+Process Events
+---
+
+#### Process started
+
+Published when process is successfully started.
+This is the first event from all the events produced by process,
+it appears only once for one process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "process_started",
+ "params": {
+ "time": "2016-09-24T16:40:55.930743249+03:00",
+ "pid": 1,
+ "nativePid": 22164,
+ "name": "print",
+ "commandLine": "printf \"\n1\n2\n3\""
+ }
+}
+```
+
+#### STDOUT event
+
+Published when process writes to stdout.
+One stdout event describes one output line
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "process_stdout",
+ "params": {
+ "time": "2016-09-24T16:40:55.933255297+03:00",
+ "pid": 1,
+ "text": "Starting server..."
+ }
+}
+```
+
+#### STDERR event
+
+Published when process writes to stderr.
+One stderr event describes one output line
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "process_stderr",
+ "params": {
+ "time": "2016-09-24T16:40:55.933255297+03:00",
+ "pid": 1,
+ "text": "sh: ifconfig: command not found"
+ }
+}
+```
+
+#### Process died
+
+Published when process is done, or killed. This is the last event from the process,
+it appears only once for one process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "method": "process_died",
+ "params": {
+ "time": "2016-09-24T16:40:55.93354086+03:00",
+ "pid": 1,
+ "nativePid": 22164,
+ "name": "print",
+ "commandLine": "printf \"\n1\n2\n3\""
+ }
+}
+```
diff --git a/exec-agent/docs/notes.md b/exec-agent/docs/notes.md
new file mode 100644
index 00000000000..906e8ee2bc3
--- /dev/null
+++ b/exec-agent/docs/notes.md
@@ -0,0 +1,10 @@
+##### Websocket messages order
+
+The order is respected
+```
+Message fragments MUST be delivered to the recipient in the order sent by the sender.
+```
+Helpful Sources
+* https://tools.ietf.org/html/rfc6455 (search the sentence above)
+* http://stackoverflow.com/questions/11804721/can-websocket-messages-arrive-out-of-order
+* http://stackoverflow.com/questions/14287224/processing-websockets-messages-in-order-of-receiving
diff --git a/exec-agent/docs/rest_api.md b/exec-agent/docs/rest_api.md
new file mode 100644
index 00000000000..b4566fc516e
--- /dev/null
+++ b/exec-agent/docs/rest_api.md
@@ -0,0 +1,233 @@
+REST API
+===
+
+Process API
+---
+
+### Start a new process
+
+#### Request
+
+_POST /process_
+
+- `channel`(optional) - the id of the channel which should be subscribed to the process events
+- `types`(optional) - comma separated types works only in couple with specified `channel`, defines
+the events which will be sent by the process to the `channel`. Several values may be specified,
+e.g. `channel=channel-1&types=stderr,stdout`. By default channel will be subscribed to
+all the existing types(listed below). Possible type values:
+ - `stderr` - output from the process stderr
+ - `stdout` - output from the process stdout
+ - `process_status` - the process status events(_started, died_)
+
+
+```json
+{
+ "name" : "build",
+ "commandLine" : "mvn clean install",
+ "type" : "maven"
+}
+```
+
+#### Response
+
+```json
+{
+ "pid": 1,
+ "name": "build",
+ "commandLine": "mvn clean install",
+ "type" : "maven",
+ "alive": true,
+ "nativePid": 9186
+}
+```
+- `200` if successfully started
+- `400` if incoming data is not valid e.g. name is empty
+- `404` if specified `channel` doesn't exist
+- `500` if any other error occurs
+
+
+### Get a process
+
+#### Request
+
+_GET /process/{pid}_
+
+- `pid` - the id of the process to get
+
+#### Response
+
+```json
+{
+ "pid": 1,
+ "name": "build",
+ "commandLine": "mvn clean install",
+ "type" : "maven",
+ "alive": false,
+ "nativePid": 9186,
+}
+```
+
+- `200` if response contains requested process
+- `400` if `pid` is not valid, unsigned int required
+- `404` if there is no such process
+- `500` if any other error occurs
+
+### Kill a process
+
+#### Request
+
+_DELETE /process/{pid}_
+
+- `pid` - the id of the process to kill
+
+#### Response
+
+```json
+{
+ "pid": 1,
+ "name": "build",
+ "commandLine": "mvn clean install",
+ "type" : "maven",
+ "alive": true,
+ "nativePid": 9186,
+}
+```
+- `200` if successfully killed
+- `400` if `pid` is not valid, unsigned int required
+- `404` if there is no such process
+- `500` if any other error occurs
+
+
+### Get process logs
+
+#### Request
+
+_GET /process/{pid}/logs_
+
+- `pid` - the id of the process to get logs
+- `from`(optional) - time to get logs from e.g. _2016-07-12T01:48:04.097980475+03:00_ the format is _RFC3339Nano_
+don't forget to encode this query parameter
+- `till`(optional) - time to get logs till e.g. _2016-07-12T01:49:04.097980475+03:00_ the format is _RFC3339Nano_
+don't forget to encode this query parameter
+- `format`(optional) - the format of the response, default is `json`, possible values are: `text`, `json`
+- `limit`(optional) - the limit of logs in result, the default value is _50_, logs are limited from the
+latest to the earliest
+- `skip` (optional) - the logs to skip, default value is `0`
+
+#### Response
+
+The result logs of the process with the command line `printf "Hello\nWorld\n"`
+
+Text:
+```text
+[STDOUT] 2016-07-04 08:37:56.315082296 +0300 EEST Hello
+[STDOUT] 2016-07-04 08:37:56.315128242 +0300 EEST World
+```
+
+Json:
+```json
+[
+ {
+ "Kind" : "STDOUT",
+ "Time" : "2016-07-16T19:51:32.313368463+03:00",
+ "Text" : "Hello"
+ },
+ {
+ "Kind" : "STDOUT",
+ "Time" : "2016-07-16T19:51:32.313603625+03:00",
+ "Text" : "World"
+ }
+]
+```
+
+- `200` if logs are successfully fetched
+- `400` if `from` or `till` format is invalid
+- `404` if there is no such process
+- `500` if any other error occurs
+
+### Get processes
+
+#### Request
+
+_GET /process_
+
+- `all`(optional) - if `true` then all the processes including _dead_ ones will be returned(respecting paging ofc),
+otherwise only _alive_ processes will be returnedg
+
+#### Response
+
+The result of the request _GET /process?all=true_
+```json
+[
+ {
+ "pid": 1,
+ "name": "build",
+ "commandLine": "mvn clean install",
+ "type" : "maven",
+ "alive": true,
+ "nativePid": 9186,
+ },
+ {
+ "pid": 2,
+ "name": "build",
+ "commandLine": "printf \"Hello World\"",
+ "alive": false,
+ "nativePid": 9588
+ }
+]
+```
+- `200` if processes are successfully retrieved
+- `500` if any error occurs
+
+### Subscribe to the process events
+
+#### Request
+
+_POST /process/{pid}/events/{channel}_
+
+- `pid` - the id of the process to subscribe to
+- `channel` - the id of the webscoket channel which is subscriber
+- `types`(optional) - the types of the events separated by comma e.g. `?types=stderr,stdout`
+- `after`(optional) - process logs which appeared after given time will
+be republished to the channel. This method may be useful in the reconnect process
+
+#### Response
+
+- `200` if successfully subscribed
+- `400` if any of the parameters is not valid
+- `404` if there is no such process or channel
+- `500` if any other error occurs
+
+### Unsubscribe from the process events
+
+#### Request
+
+_DELETE /process/{pid}/events/{channel}_
+
+- `pid` - the id of the process to unsubscribe from
+- `channel` - the id of the webscoket channel which currenly subscribed
+to the process events
+
+#### Response
+
+- `200` if successfully unsubsribed
+- `400` if any of the parameters is not valid
+- `404` if there is no such process or channel
+- `500` if any other error occurs
+
+### Update the process events subscriber
+
+#### Request
+
+_PUT /process/{pid}/events/{channel}_
+
+- `pid` - the id of the process
+- `channel` - the id of the websocket channel which is subscriber
+- `types` - the types of the events separated with comma e.g. `?types=stderr,stdout`
+
+#### Response
+
+- `200` if successfully updated
+- `400` if any of the parameters is not valid
+- `404` if there is no such process or channel
+- `500` if any other error occurs
diff --git a/exec-agent/docs/ws_api.md b/exec-agent/docs/ws_api.md
new file mode 100644
index 00000000000..81e1be8dfca
--- /dev/null
+++ b/exec-agent/docs/ws_api.md
@@ -0,0 +1,564 @@
+Websocket API
+---
+[JSON RPC 2.0](http://www.jsonrpc.org/specification) protocol is used for client-server
+communication, but:
+- `params` is always json object(never array)
+- server to client notifications are treated as [Events](events.md)
+
+the apis described below include some of the following fields:
+```json
+{
+ "jsonrpc" : "2.0",
+ "method": "...",
+ "id": "...",
+ "params": { },
+ "error" : { },
+ "result" : { }
+}
+```
+ these fields are part of the protocol so they are not documented.
+
+## Process API
+
+
+### Start process
+
+##### Request
+
+- __name__ - the name of the command
+- __commandLine__ - command line to execute
+- __type__(optional) - command type
+- __eventTypes__(optional) - comma separated types of events which will be
+ received by this channel. By default all the process events will be received.
+Possible values are: `stderr`, `stdout`, `process_status`
+
+```json
+{
+ "method": "process.start",
+ "id": "id1234567",
+ "params": {
+ "name": "print",
+ "commandLine": "printf \"1\n2\n3\"",
+ "type": "test"
+ }
+}
+```
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "result": {
+ "pid": 1,
+ "name": "print",
+ "commandLine": "printf \"1\n2\n3\"",
+ "type": "test",
+ "alive": true,
+ "nativePid": 19920
+ }
+}
+```
+
+#### Errors
+
+- when either `name` or `commandLine` is missing, e.g:
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "error": {
+ "code": -32602,
+ "message": "Command line required"
+ }
+}
+```
+
+### Kill process
+
+##### Request
+
+- __pid__ - the id of the process to kill
+
+```json
+{
+ "method": "process.kill",
+ "id": "id1234567",
+ "params": {
+ "pid": 2
+ }
+}
+```
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "result": {
+ "pid": 2,
+ "text": "Successfully killed"
+ }
+}
+```
+
+##### Errors
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+- when process with given id is not alive
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "error": {
+ "code": -32001,
+ "message": "Process with id '2' is not alive"
+ }
+}
+```
+
+### Subscribe to process events
+
+##### Request
+
+- __pid__ - the id of the process to subscribe to
+- __eventTypes__(optional) - comma separated types of events which will be
+received by this channel. By default all the process events will be received,
+not supported even types are ignored. Possible values are: `stdout`, `stderr`, `process_status`.
+- __after__(optional) - process logs which appeared after given time will
+be republished to the channel. This parameter may be useful when reconnecting to the exec-agent
+
+```json
+{
+ "method": "process.subscribe",
+ "id": "0x12345",
+ "params": {
+ "pid": 2,
+ "eventTypes": "stdout,stderr",
+ "after" : "2016-07-26T09:36:44.920890113+03:00"
+ }
+}
+```
+
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "result": {
+ "pid": 2,
+ "eventTypes": "stdout,stderr",
+ "text": "Successfully subscribed"
+ }
+}
+```
+
+##### Errors
+
+- when there is no a single valid event type provided
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32602,
+ "message": "Required at least 1 valid event type"
+ }
+}
+```
+
+- when `after` is not in valid format
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32602,
+ "message": "Bad format of 'after', parsing time \"2016-07-26\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"\" as \"T\""
+ }
+}
+```
+
+- when this channel is already subscribed on process events
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32603,
+ "message": "Already subscribed"
+ }
+}
+```
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+- when process with given id is not alive
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32001,
+ "message": "Process with id '2' is not alive"
+ }
+}
+```
+
+
+
+### Unsubscribe from process events
+
+##### Request
+
+- __pid__ - the id of the process to unsubscribe from
+
+```json
+{
+ "method": "process.unsubscribe",
+ "id": "0x12345",
+ "params": {
+ "pid": 2
+ }
+}
+```
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "result": {
+ "pid": 2,
+ "text": "Successfully unsubscribed"
+ }
+}
+```
+
+##### Errors
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+- when process with given id is not alive
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32001,
+ "message": "Process with id '2' is not alive"
+ }
+}
+```
+
+
+
+### Update process subscriber
+
+##### Request
+
+- __pid__ - the id of the process which subscriber should be updated
+- __eventTypes__ - comma separated types of events which will be
+received by this channel. Not supported even types are ignored.
+Possible values are: `stdout`, `stderr`, `process_status`.
+
+```json
+{
+ "method": "process.updateSubscriber",
+ "id": "0x12345",
+ "params": {
+ "pid": 2,
+ "eventTypes": "process_status"
+ }
+}
+```
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "result": {
+ "pid": 2,
+ "eventTypes": "process_status",
+ "text": "Subscriber successfully updated"
+ }
+}
+```
+
+##### Errors
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+- when process with given id is not alive
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32001,
+ "message": "Process with id '2' is not alive"
+ }
+}
+```
+
+- when this channel is not subscribed on process events
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32603,
+ "message": "No subscriber with id 'channel-1'"
+ }
+}
+```
+
+
+### Get process logs
+
+##### Request
+
+- __pid__ - the id of the process to get logs
+- __from__(optional) - time to get logs from e.g. _2016-07-12T01:48:04.097980475+03:00_ the format is _RFC3339Nano_
+- __till__(optional) - time to get logs till e.g. _2016-07-12T01:49:04.097980475+03:00_ the format is _RFC3339Nano_
+- __limit__(optional) - the limit of logs in result, the default value is _50_, logs are limited from the
+latest to the earliest
+- __skip__ (optional) - the logs to skip, default value is `0`
+
+```json
+{
+ "method": "process.getLogs",
+ "id": "0x12345",
+ "params": {
+ "pid": 3,
+ "limit": 5,
+ "skip": 5
+ }
+}
+```
+
+##### Response
+
+For the command `printf "1\n2\n3\n4\n5\n6\n7\n8\n9\n10`, the result will look like
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "result": [
+ {
+ "kind": "STDOUT",
+ "time": "2016-09-24T17:18:30.757623274+03:00",
+ "text": "1"
+ },
+ {
+ "kind": "STDOUT",
+ "time": "2016-09-24T17:18:30.757701555+03:00",
+ "text": "2"
+ },
+ {
+ "kind": "STDOUT",
+ "time": "2016-09-24T17:18:30.757721423+03:00",
+ "text": "3"
+ },
+ {
+ "kind": "STDOUT",
+ "time": "2016-09-24T17:18:30.757841518+03:00",
+ "text": "4"
+ },
+ {
+ "kind": "STDOUT",
+ "time": "2016-09-24T17:18:30.757851622+03:00",
+ "text": "5"
+ }
+ ]
+}
+```
+
+##### Errors
+
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+- when one of the time parameters is invalid, e.g:
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32602,
+ "message": "Bad format of 'till', parsing time \"date\" as \"2006-01-02T15:04:05.999999999Z07:00\": cannot parse \"date\" as \"2006\""
+ }
+}
+```
+
+
+
+### Get process
+
+##### Request
+
+- __pid__ - the id of the process to get
+
+```json
+{
+ "method": "process.getProcess",
+ "id": "0x12345",
+ "params": {
+ "pid": 3
+ }
+}
+```
+
+##### Response
+
+When everything is okay
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "result": {
+ "pid": 1,
+ "name": "print",
+ "commandLine": "printf \n1\n2\n3\"",
+ "type": "test",
+ "alive": false,
+ "nativePid": 13158
+ }
+}
+```
+
+##### Errors
+
+
+- when there is no such process
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "0x12345",
+ "error": {
+ "code": -32000,
+ "message": "Process with id '2' does not exist"
+ }
+}
+```
+
+
+### Get processes
+
+##### Request
+
+- __all__(optional) - if `true` then all the processes including _dead_ ones will be returned,
+otherwise only _alive_ processes will be returned
+
+```json
+{
+ "method": "process.getProcesses",
+ "id": "id1234567",
+ "params": {
+ "all": true
+ }
+}
+```
+
+##### Response
+
+```json
+{
+ "jsonrpc": "2.0",
+ "id": "id1234567",
+ "result": [
+ {
+ "pid": 1,
+ "name": "print",
+ "commandLine": "printf \"1\n2\n3\"",
+ "type": "test",
+ "alive": false,
+ "nativePid": 13553
+ },
+ {
+ "pid": 2,
+ "name": "print2",
+ "commandLine": "printf \"\n3\n2\n1\"",
+ "type": "test2",
+ "alive": false,
+ "nativePid": 13561
+ }
+ ]
+}
+```
diff --git a/exec-agent/pom.xml b/exec-agent/pom.xml
new file mode 100644
index 00000000000..e2e945e5a40
--- /dev/null
+++ b/exec-agent/pom.xml
@@ -0,0 +1,215 @@
+
+
+
+ 4.0.0
+
+ che-parent
+ org.eclipse.che
+ 5.0.0-M9-SNAPSHOT
+
+ exec-agent
+ Exec Agent
+
+ go-workspace
+
+
+
+
+ com.mycila
+ license-maven-plugin
+
+
+ src/term/server.go
+ src/docs/**
+ src/vendor/**
+ src/Godeps/**
+ src/static/term.js
+ src/.idea/**
+ src/exec-agent
+ src/exec-agent.iml
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ copy-sources
+ compile
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ com.soebes.maven.plugins
+ iterator-maven-plugin
+ 0.4
+
+
+ compile-exec-agent
+ compile
+
+ iterator
+
+
+
+
+ linux_arm5
+
+ linux
+ arm
+ 5
+
+
+
+ linux_arm6
+
+ linux
+ arm
+ 6
+
+
+
+ linux_arm7
+
+ linux
+ arm
+ 7
+
+
+
+ linux_amd64
+
+ linux
+ amd64
+
+
+
+ linux_i386
+
+ linux
+ 386
+
+
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+
+ exec
+
+ go
+ ${project.build.directory}/${go.workspace.name}/src/github.com/eclipse/che/exec-agent
+
+ build
+ -a
+ -installsuffix
+ cgo
+ -o
+ ${project.build.directory}/${item}/che-websocket-terminal
+
+
+ ${project.build.directory}/${go.workspace.name}
+ ${terminal.target.os}
+ ${terminal.target.architecture}
+ ${terminal.target.arm.version}
+
+
+
+
+
+
+
+ assembly
+ package
+
+ iterator
+
+
+
+ - linux_arm5
+ - linux_arm6
+ - linux_arm7
+ - linux_amd64
+ - linux_i386
+
+
+
+
+ maven-assembly-plugin
+
+ single
+
+
+ ${basedir}/src/assembly/assembly.xml
+
+
+
+
+
+
+
+
+
+
+
+
+ integration
+
+ false
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+
+
+ run-tests
+ test
+
+ run
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/exec-agent/src/Godeps/Godeps.json b/exec-agent/src/Godeps/Godeps.json
new file mode 100644
index 00000000000..17b175c95ef
--- /dev/null
+++ b/exec-agent/src/Godeps/Godeps.json
@@ -0,0 +1,22 @@
+{
+ "ImportPath": "github.com/eclipse/che/exec-agent",
+ "GoVersion": "go1.6",
+ "GodepVersion": "v74",
+ "Deps": [
+ {
+ "ImportPath": "github.com/eclipse/che-lib/pty",
+ "Comment": "4.6.0-2-g69a9cee",
+ "Rev": "69a9cee57914086c88c8400f5f66cd25422f1fa7"
+ },
+ {
+ "ImportPath": "github.com/eclipse/che-lib/websocket",
+ "Comment": "4.6.0-2-g69a9cee",
+ "Rev": "69a9cee57914086c88c8400f5f66cd25422f1fa7"
+ },
+ {
+ "ImportPath": "github.com/julienschmidt/httprouter",
+ "Comment": "v1.1-38-g4563b0b",
+ "Rev": "4563b0ba73e4db6c6423b60a26f3cadd2e9a1ec9"
+ }
+ ]
+}
diff --git a/exec-agent/src/Godeps/Readme b/exec-agent/src/Godeps/Readme
new file mode 100644
index 00000000000..4cdaa53d56d
--- /dev/null
+++ b/exec-agent/src/Godeps/Readme
@@ -0,0 +1,5 @@
+This directory tree is generated automatically by godep.
+
+Please do not edit.
+
+See https://github.com/tools/godep for more information.
diff --git a/exec-agent/src/assembly/assembly.xml b/exec-agent/src/assembly/assembly.xml
new file mode 100644
index 00000000000..956a8dac74a
--- /dev/null
+++ b/exec-agent/src/assembly/assembly.xml
@@ -0,0 +1,36 @@
+
+
+ ${item}
+ true
+ terminal
+
+ zip
+ tar.gz
+
+
+
+ ${project.build.directory}/${item}
+
+ che-websocket-terminal
+
+
+
+
+ ${project.build.directory}/${go.workspace.name}/src/github.com/eclipse/che/exec-agent/static
+
+
+
+
diff --git a/exec-agent/src/auth/auth.go b/exec-agent/src/auth/auth.go
new file mode 100644
index 00000000000..bf83bc9df3f
--- /dev/null
+++ b/exec-agent/src/auth/auth.go
@@ -0,0 +1,128 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package auth
+
+import (
+ "errors"
+ "fmt"
+ "net/http"
+ "sync"
+ "time"
+
+ "github.com/eclipse/che/exec-agent/rest"
+)
+
+const (
+ DefaultTokensExpirationTimeoutInMinutes = 10
+)
+
+// Authenticates all the http calls on workspace master
+// checking if provided by request token is valid, if authentication is successful
+// then calls ServerHTTP on delegate, otherwise if UnauthorizedHandler is configured
+// then uses it to handle the request if not then returns 401 with appropriate error message
+type Handler struct {
+ Delegate http.Handler
+ ApiEndpoint string
+ Cache *TokenCache
+ UnauthorizedHandler func(w http.ResponseWriter, req *http.Request)
+}
+
+func (handler Handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
+ token := req.URL.Query().Get("token")
+ if handler.Cache.Contains(token) {
+ handler.Delegate.ServeHTTP(w, req)
+ } else if err := authenticateOnMaster(handler.ApiEndpoint, token); err == nil {
+ handler.Cache.Put(token)
+ handler.Delegate.ServeHTTP(w, req)
+ } else if handler.UnauthorizedHandler != nil {
+ handler.UnauthorizedHandler(w, req)
+ } else {
+ http.Error(w, err.Error(), http.StatusUnauthorized)
+ }
+}
+
+// Authentication tokens cache.
+type TokenCache struct {
+ sync.RWMutex
+ tokens map[string]time.Time
+ ticker *time.Ticker
+ expireTimeout time.Duration
+}
+
+func NewCache(expireDuration time.Duration, period time.Duration) *TokenCache {
+ cache := &TokenCache{
+ tokens: make(map[string]time.Time),
+ expireTimeout: expireDuration,
+ }
+ if period > 0 {
+ go cache.expirePeriodically(period)
+ }
+ return cache
+}
+
+// Puts token into the cache.
+func (cache *TokenCache) Put(token string) {
+ cache.Lock()
+ defer cache.Unlock()
+ cache.tokens[token] = time.Now().Add(cache.expireTimeout)
+}
+
+// Removes the token from the cache.
+func (cache *TokenCache) Expire(token string) {
+ cache.Lock()
+ defer cache.Unlock()
+ delete(cache.tokens, token)
+}
+
+// Returns true if token is present in the cache and false otherwise.
+func (cache *TokenCache) Contains(token string) bool {
+ cache.RLock()
+ defer cache.RUnlock()
+ _, ok := cache.tokens[token]
+ return ok
+}
+
+func (cache *TokenCache) expirePeriodically(period time.Duration) {
+ cache.ticker = time.NewTicker(period)
+ for range cache.ticker.C {
+ cache.expireAllBefore(time.Now())
+ }
+}
+
+func (cache *TokenCache) expireAllBefore(expirationPoint time.Time) {
+ cache.Lock()
+ defer cache.Unlock()
+ for token, expTime := range cache.tokens {
+ if expTime.Before(expirationPoint) {
+ delete(cache.tokens, token)
+ }
+ }
+}
+
+func authenticateOnMaster(apiEndpoint string, tokenParam string) error {
+ if tokenParam == "" {
+ return rest.Unauthorized(errors.New("Authentication failed: missing 'token' query parameter"))
+ }
+ req, err := http.NewRequest("GET", apiEndpoint+"/machine/token/user/"+tokenParam, nil)
+ if err != nil {
+ return rest.Unauthorized(err)
+ }
+ req.Header.Add("Authorization", tokenParam)
+ resp, err := http.DefaultClient.Do(req)
+ if err != nil {
+ return rest.Unauthorized(err)
+ }
+ if resp.StatusCode != 200 {
+ return rest.Unauthorized(errors.New(fmt.Sprintf("Authentication failed, token: %s is invalid", tokenParam)))
+ }
+ return nil
+}
diff --git a/exec-agent/src/auth/cache_test.go b/exec-agent/src/auth/cache_test.go
new file mode 100644
index 00000000000..c24a1918c28
--- /dev/null
+++ b/exec-agent/src/auth/cache_test.go
@@ -0,0 +1,56 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package auth
+
+import (
+ "testing"
+ "time"
+)
+
+func TestTokenCache(t *testing.T) {
+ cache := &TokenCache{
+ tokens: make(map[string]time.Time),
+ expireTimeout: 0,
+ }
+
+ token := "my-token"
+
+ cache.Put(token)
+ if !cache.Contains(token) {
+ t.Fatalf("Cache must contain token %s", token)
+ }
+
+ cache.Expire(token)
+ if cache.Contains(token) {
+ t.Fatalf("Cache must not contain token %s", token)
+ }
+}
+
+func TestExpiresTokensCreatedBeforeGivenPointOfTime(t *testing.T) {
+ cache := &TokenCache{
+ tokens: make(map[string]time.Time),
+ expireTimeout: 0,
+ }
+
+ cache.Put("token1")
+ afterToken1Put := time.Now()
+ cache.Put("token2")
+
+ cache.expireAllBefore(afterToken1Put)
+
+ if cache.Contains("token1") {
+ t.Fatal("Cache must not contain token1")
+ }
+ if !cache.Contains("token2") {
+ t.Fatal("Cache must contain token2")
+ }
+}
diff --git a/exec-agent/src/main.go b/exec-agent/src/main.go
new file mode 100644
index 00000000000..144bdfb15e2
--- /dev/null
+++ b/exec-agent/src/main.go
@@ -0,0 +1,300 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "log"
+ "net/http"
+ "net/url"
+ "os"
+ "time"
+
+ "regexp"
+
+ "github.com/eclipse/che/exec-agent/auth"
+ "github.com/eclipse/che/exec-agent/process"
+ "github.com/eclipse/che/exec-agent/rest"
+ "github.com/eclipse/che/exec-agent/rpc"
+ "github.com/eclipse/che/exec-agent/term"
+ "github.com/julienschmidt/httprouter"
+)
+
+var (
+ AppHttpRoutes = []rest.RoutesGroup{
+ process.HttpRoutes,
+ rpc.HttpRoutes,
+ term.HttpRoutes,
+ }
+
+ AppOpRoutes = []rpc.RoutesGroup{
+ process.RpcRoutes,
+ }
+
+ serverAddress string
+ staticDir string
+ basePath string
+ apiEndpoint string
+
+ authEnabled bool
+ tokensExpirationTimeoutInMinutes uint
+
+ processCleanupThresholdInMinutes int
+ processCleanupPeriodInMinutes int
+)
+
+func init() {
+ // server configuration
+ flag.StringVar(
+ &serverAddress,
+ "addr",
+ ":9000",
+ "IP:PORT or :PORT the address to start the server on",
+ )
+ flag.StringVar(
+ &staticDir,
+ "static",
+ "./static/",
+ "path to the directory where static content is located",
+ )
+ flag.StringVar(
+ &basePath,
+ "path",
+ "",
+ `the base path for all the rpc & rest routes, so route paths are treated not
+ as 'server_address + route_path' but 'server_address + path + route_path'.
+ For example for the server address 'localhost:9000', route path '/connect' and
+ configured path '/api/' exec-agent server will serve the following route:
+ 'localhost:9000/api/connect'.
+ Regexp syntax is supported`,
+ )
+
+ // terminal configuration
+ flag.StringVar(
+ &term.Cmd,
+ "cmd",
+ "/bin/bash",
+ "command to execute on slave side of the pty",
+ )
+
+ // workspace master server configuration
+ flag.StringVar(
+ &apiEndpoint,
+ "api-endpoint",
+ os.Getenv("CHE_API"),
+ `api-endpoint used by exec-agent modules(such as activity checker or authentication)
+ to request workspace master. By default the value from 'CHE_API' environment variable is used`,
+ )
+
+ // auth configuration
+ flag.BoolVar(
+ &authEnabled,
+ "enable-auth",
+ false,
+ "whether authenicate requests on workspace master before allowing them to proceed",
+ )
+ flag.UintVar(
+ &tokensExpirationTimeoutInMinutes,
+ "tokens-expiration-timeout",
+ auth.DefaultTokensExpirationTimeoutInMinutes,
+ "how much time machine tokens stay in cache(if auth is enabled)",
+ )
+
+ // terminal configuration
+ flag.BoolVar(
+ &term.ActivityTrackingEnabled,
+ "enable-activity-tracking",
+ false,
+ "whether workspace master will be notified about terminal activity",
+ )
+
+ // process executor configuration
+ flag.IntVar(
+ &processCleanupPeriodInMinutes,
+ "process-cleanup-period",
+ -1,
+ "how often processs cleanup job will be executed(in minutes)",
+ )
+ flag.IntVar(&processCleanupThresholdInMinutes,
+ "process-cleanup-threshold",
+ -1,
+ `how much time will dead and unused process stay(in minutes),
+ if -1 passed then processes won't be cleaned at all. Please note that the time
+ of real cleanup is between configured threshold and threshold + process-cleanup-period.`,
+ )
+ curDir, _ := os.Getwd()
+ curDir += string(os.PathSeparator) + "logs"
+ flag.StringVar(
+ &process.LogsDir,
+ "logs-dir",
+ curDir,
+ "base directory for process logs",
+ )
+}
+
+func main() {
+ flag.Parse()
+
+ log.SetOutput(os.Stdout)
+
+ // print configuration
+ fmt.Println("Exec-agent configuration")
+ fmt.Println(" Server")
+ fmt.Printf(" - Address: %s\n", serverAddress)
+ fmt.Printf(" - Static content: %s\n", staticDir)
+ fmt.Printf(" - Base path: '%s'\n", basePath)
+ fmt.Println(" Terminal")
+ fmt.Printf(" - Slave command: '%s'\n", term.Cmd)
+ fmt.Printf(" - Activity tracking enabled: %t\n", term.ActivityTrackingEnabled)
+ if authEnabled {
+ fmt.Println(" Authentication")
+ fmt.Printf(" - Enabled: %t\n", authEnabled)
+ fmt.Printf(" - Tokens expiration timeout: %dm\n", tokensExpirationTimeoutInMinutes)
+ }
+ fmt.Println(" Process executor")
+ fmt.Printf(" - Logs dir: %s\n", process.LogsDir)
+ if processCleanupPeriodInMinutes > 0 {
+ fmt.Printf(" - Cleanup job period: %dm\n", processCleanupPeriodInMinutes)
+ fmt.Printf(" - Not used & dead processes stay for: %dm\n", processCleanupThresholdInMinutes)
+ }
+ if authEnabled || term.ActivityTrackingEnabled {
+ fmt.Println(" Workspace master server")
+ fmt.Printf(" - API endpoint: %s\n", apiEndpoint)
+ }
+ fmt.Println()
+
+ term.ApiEndpoint = apiEndpoint
+
+ // process configuration
+ if err := os.RemoveAll(process.LogsDir); err != nil {
+ log.Fatal(err)
+ }
+
+ if processCleanupPeriodInMinutes > 0 {
+ if processCleanupThresholdInMinutes < 0 {
+ log.Fatal("Expected process cleanup threshold to be non negative value")
+ }
+ cleaner := process.NewCleaner(processCleanupPeriodInMinutes, processCleanupThresholdInMinutes)
+ cleaner.CleanPeriodically()
+ }
+
+ // terminal configuration
+ if term.ActivityTrackingEnabled {
+ go term.Activity.StartTracking()
+ }
+
+ // register routes and http handlers
+ router := httprouter.New()
+ router.NotFound = http.FileServer(http.Dir(staticDir))
+
+ fmt.Print("ā© Registered HttpRoutes:\n\n")
+ for _, routesGroup := range AppHttpRoutes {
+ fmt.Printf("%s:\n", routesGroup.Name)
+ for _, route := range routesGroup.Items {
+ router.Handle(
+ route.Method,
+ route.Path,
+ toHandle(route.HandleFunc),
+ )
+ fmt.Printf("ā %s\n", &route)
+ }
+ fmt.Println()
+ }
+
+ fmt.Print("\nā© Registered RpcRoutes:\n\n")
+ for _, routesGroup := range AppOpRoutes {
+ fmt.Printf("%s:\n", routesGroup.Name)
+ for _, route := range routesGroup.Items {
+ fmt.Printf("ā %s\n", route.Method)
+ rpc.RegisterRoute(route)
+ }
+ }
+
+ var handler http.Handler = router
+
+ // required authentication for all the requests, if it is configured
+ if authEnabled {
+ cache := auth.NewCache(time.Minute*time.Duration(tokensExpirationTimeoutInMinutes), time.Minute*5)
+
+ handler = auth.Handler{
+ Delegate: handler,
+ ApiEndpoint: apiEndpoint,
+ Cache: cache,
+ UnauthorizedHandler: func(w http.ResponseWriter, req *http.Request) {
+ dropChannelsWithExpiredToken(req.URL.Query().Get("token"))
+ http.Error(w, "Unauthorized", http.StatusUnauthorized)
+ },
+ }
+ }
+
+ // cut base path on requests, if it is configured
+ if basePath != "" {
+ if rx, err := regexp.Compile(basePath); err == nil {
+ handler = basePathChopper{rx, handler}
+ } else {
+ log.Fatal(err)
+ }
+ }
+
+ http.Handle("/", handler)
+
+ server := &http.Server{
+ Handler: handler,
+ Addr: serverAddress,
+ WriteTimeout: 10 * time.Second,
+ ReadTimeout: 10 * time.Second,
+ }
+ log.Fatal(server.ListenAndServe())
+}
+
+func dropChannelsWithExpiredToken(token string) {
+ for _, c := range rpc.GetChannels() {
+ u, err := url.ParseRequestURI(c.RequestURI)
+ if err != nil {
+ log.Printf("Couldn't parse the RequestURI '%s' of channel '%s'", c.RequestURI, c.Id)
+ } else if u.Query().Get("token") == token {
+ log.Printf("Token for channel '%s' is expired, trying to drop the channel", c.Id)
+ rpc.DropChannel(c.Id)
+ }
+ }
+}
+
+type routerParamsAdapter struct {
+ params httprouter.Params
+}
+
+func (pa routerParamsAdapter) Get(param string) string {
+ return pa.params.ByName(param)
+}
+
+func toHandle(f rest.HttpRouteHandlerFunc) httprouter.Handle {
+ return func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
+ if err := f(w, r, routerParamsAdapter{params: p}); err != nil {
+ rest.WriteError(w, err)
+ }
+ }
+}
+
+type basePathChopper struct {
+ pattern *regexp.Regexp
+ delegate http.Handler
+}
+
+func (c basePathChopper) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ // if request path starts with given base path
+ if idx := c.pattern.FindStringSubmatchIndex(r.URL.Path); len(idx) != 0 && idx[0] == 0 {
+ r.URL.Path = r.URL.Path[idx[1]:]
+ r.RequestURI = r.RequestURI[idx[1]:]
+ }
+ c.delegate.ServeHTTP(w, r)
+}
diff --git a/exec-agent/src/process/common.go b/exec-agent/src/process/common.go
new file mode 100644
index 00000000000..7ef333e27ef
--- /dev/null
+++ b/exec-agent/src/process/common.go
@@ -0,0 +1,79 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "errors"
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ DefaultLogsPerPageLimit = 50
+)
+
+func maskFromTypes(types string) uint64 {
+ var mask uint64
+ for _, t := range strings.Split(types, ",") {
+ switch strings.ToLower(strings.TrimSpace(t)) {
+ case "stderr":
+ mask |= StderrBit
+ case "stdout":
+ mask |= StdoutBit
+ case "process_status":
+ mask |= ProcessStatusBit
+ }
+ }
+ return mask
+}
+
+func parseTypes(types string) uint64 {
+ var mask uint64 = DefaultMask
+ if types != "" {
+ mask = maskFromTypes(types)
+ }
+ return mask
+}
+
+// Checks whether pid is valid and converts it to the uint64
+func parsePid(strPid string) (uint64, error) {
+ intPid, err := strconv.Atoi(strPid)
+ if err != nil {
+ return 0, errors.New("Pid value must be unsigned integer")
+ }
+ if intPid <= 0 {
+ return 0, errors.New("Pid value must be unsigned integer")
+ }
+ return uint64(intPid), nil
+}
+
+// Checks whether command is valid
+func checkCommand(command *Command) error {
+ if command.Name == "" {
+ return errors.New("Command name required")
+ }
+ if command.CommandLine == "" {
+ return errors.New("Command line required")
+ }
+ return nil
+}
+
+// If time string is empty, then default time is returned
+// If time string is invalid, then appropriate error is returned
+// If time string is valid then parsed time is returned
+func parseTime(timeStr string, defTime time.Time) (time.Time, error) {
+ if timeStr == "" {
+ return defTime, nil
+ }
+ return time.Parse(DateTimeFormat, timeStr)
+}
diff --git a/exec-agent/src/process/events.go b/exec-agent/src/process/events.go
new file mode 100644
index 00000000000..8bf1e1cb981
--- /dev/null
+++ b/exec-agent/src/process/events.go
@@ -0,0 +1,72 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "github.com/eclipse/che/exec-agent/rpc"
+ "time"
+)
+
+const (
+ StartedEventType = "process_started"
+ DiedEventType = "process_died"
+ StdoutEventType = "process_stdout"
+ StderrEventType = "process_stderr"
+)
+
+type ProcessStatusEventBody struct {
+ rpc.Timed
+ Pid uint64 `json:"pid"`
+ NativePid int `json:"nativePid"`
+ Name string `json:"name"`
+ CommandLine string `json:"commandLine"`
+}
+
+type ProcessOutputEventBody struct {
+ rpc.Timed
+ Pid uint64 `json:"pid"`
+ Text string `json:"text"`
+}
+
+func newStderrEvent(pid uint64, text string, when time.Time) *rpc.Event {
+ return rpc.NewEvent(StderrEventType, &ProcessOutputEventBody{
+ Timed: rpc.Timed{Time: when},
+ Pid: pid,
+ Text: text,
+ })
+}
+
+func newStdoutEvent(pid uint64, text string, when time.Time) *rpc.Event {
+ return rpc.NewEvent(StdoutEventType, &ProcessOutputEventBody{
+ Timed: rpc.Timed{Time: when},
+ Pid: pid,
+ Text: text,
+ })
+}
+
+func newStatusEvent(mp MachineProcess, status string) *rpc.Event {
+ return rpc.NewEvent(status, &ProcessStatusEventBody{
+ Timed: rpc.Timed{Time: time.Now()},
+ Pid: mp.Pid,
+ NativePid: mp.NativePid,
+ Name: mp.Name,
+ CommandLine: mp.CommandLine,
+ })
+}
+
+func newStartedEvent(mp MachineProcess) *rpc.Event {
+ return newStatusEvent(mp, StartedEventType)
+}
+
+func newDiedEvent(mp MachineProcess) *rpc.Event {
+ return newStatusEvent(mp, DiedEventType)
+}
diff --git a/exec-agent/src/process/file_logger.go b/exec-agent/src/process/file_logger.go
new file mode 100644
index 00000000000..fc2be5f779a
--- /dev/null
+++ b/exec-agent/src/process/file_logger.go
@@ -0,0 +1,84 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "bytes"
+ "encoding/json"
+ "log"
+ "os"
+ "sync"
+ "time"
+)
+
+const (
+ flushThreshold = 8192
+)
+
+type FileLogger struct {
+ sync.RWMutex
+ filename string
+ buffer *bytes.Buffer
+ encoder *json.Encoder
+}
+
+func NewLogger(filename string) (*FileLogger, error) {
+ fl := &FileLogger{filename: filename}
+ fl.buffer = &bytes.Buffer{}
+ fl.encoder = json.NewEncoder(fl.buffer)
+
+ // Trying to create logs file
+ file, err := os.Create(filename)
+ if err != nil {
+ return nil, err
+ }
+ defer file.Close()
+
+ return fl, nil
+}
+
+func (fl *FileLogger) Flush() {
+ fl.Lock()
+ fl.doFlush()
+ fl.Unlock()
+}
+
+func (fl *FileLogger) OnStdout(line string, time time.Time) {
+ fl.writeLine(&LogMessage{StdoutKind, time, line})
+}
+
+func (fl *FileLogger) OnStderr(line string, time time.Time) {
+ fl.writeLine(&LogMessage{StderrKind, time, line})
+}
+
+func (fl *FileLogger) Close() {
+ fl.Flush()
+}
+
+func (fl *FileLogger) writeLine(message *LogMessage) {
+ fl.Lock()
+ fl.encoder.Encode(message)
+ if flushThreshold < fl.buffer.Len() {
+ fl.doFlush()
+ }
+ fl.Unlock()
+}
+
+func (fl *FileLogger) doFlush() {
+ f, err := os.OpenFile(fl.filename, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0666)
+ if err != nil {
+ log.Printf("Couldn't open file '%s' for flushing the buffer. %s \n", fl.filename, err.Error())
+ } else {
+ defer f.Close()
+ fl.buffer.WriteTo(f)
+ }
+}
diff --git a/exec-agent/src/process/file_logger_test.go b/exec-agent/src/process/file_logger_test.go
new file mode 100644
index 00000000000..0970a60d16e
--- /dev/null
+++ b/exec-agent/src/process/file_logger_test.go
@@ -0,0 +1,125 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process_test
+
+import (
+ "encoding/json"
+ "github.com/eclipse/che/exec-agent/process"
+ "io/ioutil"
+ "math/rand"
+ "os"
+ "testing"
+ "time"
+)
+
+var alphabet = []byte("abcdefgh123456789")
+
+func TestFileLoggerCreatesFileWhenFileDoesNotExist(t *testing.T) {
+ filename := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.Remove(filename)
+
+ if _, err := os.Stat(filename); err == nil {
+ t.Fatalf("File '%s' already exists", filename)
+ }
+
+ if _, err := process.NewLogger(filename); err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := os.Stat(filename); os.IsNotExist(err) {
+ t.Fatalf("Expected file '%s' was created, but it wasn't", filename)
+ }
+}
+
+func TestFileLoggerTruncatesFileIfFileExistsOnCreate(t *testing.T) {
+ filename := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.Remove(filename)
+
+ if _, err := os.Create(filename); err != nil {
+ t.Fatal(err)
+ }
+ if err := ioutil.WriteFile(filename, []byte("file-content"), 0666); err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := process.NewLogger(filename); err != nil {
+ t.Fatal(err)
+ }
+
+ content, err := ioutil.ReadFile(filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(content) != 0 {
+ t.Errorf("Expected file '%s' content is empty", filename)
+ }
+}
+
+func TestLogsAreFlushedOnClose(t *testing.T) {
+ filename := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.Remove(filename)
+
+ fl, err := process.NewLogger(filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Write something to the log
+ now := time.Now()
+ fl.OnStdout("stdout", now)
+ fl.OnStderr("stderr", now)
+ fl.Close()
+
+ // Read file content
+ f, err := os.Open(filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Read log messages
+ stdout := process.LogMessage{}
+ stderr := process.LogMessage{}
+ decoder := json.NewDecoder(f)
+ if err := decoder.Decode(&stdout); err != nil {
+ t.Fatal(err)
+ }
+ if err := decoder.Decode(&stderr); err != nil {
+ t.Fatal(err)
+ }
+
+ // Check logs are okay
+ expectedStdout := process.LogMessage{
+ Kind: process.StdoutKind,
+ Time: now,
+ Text: "stdout",
+ }
+ if stdout != expectedStdout {
+ t.Fatalf("Expected %v but found %v", expectedStdout, stdout)
+ }
+ expectedStderr := process.LogMessage{
+ Kind: process.StderrKind,
+ Time: now,
+ Text: "stderr",
+ }
+ if stdout != expectedStdout {
+ t.Fatalf("Expected %v but found %v", expectedStderr, stderr)
+ }
+}
+
+func randomName(length int) string {
+ rand.Seed(time.Now().UnixNano())
+ bytes := make([]byte, length)
+ for i := 0; i < length; i++ {
+ bytes[i] = alphabet[rand.Intn(len(alphabet))]
+ }
+ return string(bytes)
+}
diff --git a/exec-agent/src/process/logs_distributor.go b/exec-agent/src/process/logs_distributor.go
new file mode 100644
index 00000000000..9dd1c2e7909
--- /dev/null
+++ b/exec-agent/src/process/logs_distributor.go
@@ -0,0 +1,63 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "errors"
+ "fmt"
+ "os"
+)
+
+const (
+ DefaultMaxDirsCount = 16
+)
+
+// Distributes the logs between different directories
+type LogsDistributor interface {
+
+ // The implementor must guarantee that returned file name
+ // is always the same for the same pid.
+ // Returns an error if it is impossible to create hierarchy of
+ // logs file parent folders, otherwise returns file path
+ DirForPid(baseDir string, pid uint64) (string, error)
+}
+
+type DefaultLogsDistributor struct {
+ MaxDirsCount uint
+}
+
+func NewLogsDistributor() LogsDistributor {
+ return &DefaultLogsDistributor{
+ MaxDirsCount: DefaultMaxDirsCount,
+ }
+}
+
+func (ld *DefaultLogsDistributor) DirForPid(baseDir string, pid uint64) (string, error) {
+ // directories from 1 to maxDirsCount inclusive
+ subDirName := (pid % uint64(ld.MaxDirsCount))
+
+ // {baseLogsDir}/{subDirName}
+ pidLogsDir := fmt.Sprintf("%s%c%d", baseDir, os.PathSeparator, subDirName)
+
+ // Create subdirectory
+ if info, err := os.Stat(pidLogsDir); os.IsNotExist(err) {
+ if err := os.MkdirAll(pidLogsDir, os.ModePerm); err != nil {
+ return "", err
+ }
+ } else if err != nil {
+ return "", err
+ } else if !info.IsDir() {
+ m := fmt.Sprintf("Couldn't create a directory '%s', the name is taken by file", pidLogsDir)
+ return "", errors.New(m)
+ }
+ return pidLogsDir, nil
+}
diff --git a/exec-agent/src/process/logs_distributor_test.go b/exec-agent/src/process/logs_distributor_test.go
new file mode 100644
index 00000000000..ccfb39d0ca7
--- /dev/null
+++ b/exec-agent/src/process/logs_distributor_test.go
@@ -0,0 +1,72 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process_test
+
+import (
+ "fmt"
+ "github.com/eclipse/che/exec-agent/process"
+ "io/ioutil"
+ "os"
+ "testing"
+)
+
+func TestLogsDistributorCreatesSubdirectories(t *testing.T) {
+ baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.RemoveAll(baseDir)
+
+ distributor := process.DefaultLogsDistributor{
+ MaxDirsCount: 4,
+ }
+
+ dir, err := distributor.DirForPid(baseDir, 1)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if _, err := os.Stat(dir); os.IsNotExist(err) {
+ t.Fatal("Expected that logs file subdirectory was created")
+ } else if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestLogsDistribution(t *testing.T) {
+ baseDir := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.RemoveAll(baseDir)
+
+ distributor := process.DefaultLogsDistributor{
+ MaxDirsCount: 4,
+ }
+
+ // Those files should be evenly distributed in 4 directories
+ for pid := 1; pid <= 16; pid++ {
+ dir, err := distributor.DirForPid(baseDir, uint64(pid))
+ if err != nil {
+ t.Fatal(err)
+ }
+ filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid)
+ if _, err := os.Create(filename); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ for i := 0; i < 4; i++ {
+ dir := fmt.Sprintf("%s%c%d", baseDir, os.PathSeparator, i)
+ fi, err := ioutil.ReadDir(dir)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(fi) != 4 {
+ t.Fatalf("Expected directory '%s' to contain 4 files", dir)
+ }
+ }
+}
diff --git a/exec-agent/src/process/logs_reader.go b/exec-agent/src/process/logs_reader.go
new file mode 100644
index 00000000000..73065461649
--- /dev/null
+++ b/exec-agent/src/process/logs_reader.go
@@ -0,0 +1,87 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "bufio"
+ "encoding/json"
+ "io"
+ "os"
+ "time"
+)
+
+type LogsReader struct {
+ filename string
+ readFrom *time.Time
+ readTill *time.Time
+}
+
+func NewLogsReader(filename string) *LogsReader {
+ return &LogsReader{filename: filename}
+}
+
+// Skip all the logs before the given time.
+// If the log message appeared at the given time, it won't be skipped.
+func (lr *LogsReader) From(time time.Time) *LogsReader {
+ lr.readFrom = &time
+ return lr
+}
+
+// Read logs which appeared before and right at a given time
+func (lr *LogsReader) Till(time time.Time) *LogsReader {
+ lr.readTill = &time
+ return lr
+}
+
+// Reads logs between [from, till] inclusive.
+// Returns an error if logs file is missing, or
+// decoding of file content failed.
+// If no logs matched time frame, an empty slice will be returned.
+func (lr *LogsReader) ReadLogs() ([]*LogMessage, error) {
+ // Open logs file for reading logs
+ logsFile, err := os.Open(lr.filename)
+ if err != nil {
+ return nil, err
+ }
+ defer logsFile.Close()
+
+ from := time.Time{}
+ if lr.readFrom != nil {
+ from = *lr.readFrom
+ }
+ till := time.Now()
+ if lr.readTill != nil {
+ till = *lr.readTill
+ }
+
+ // Read logs
+ logs := []*LogMessage{}
+ decoder := json.NewDecoder(bufio.NewReader(logsFile))
+ for {
+ message := &LogMessage{}
+ err = decoder.Decode(message)
+ if err != nil {
+ if err == io.EOF {
+ break
+ }
+ return nil, err
+ }
+ if message.Time.Before(from) {
+ continue
+ }
+ if message.Time.After(till) {
+ break
+ }
+ logs = append(logs, message)
+ }
+ return logs, nil
+}
diff --git a/exec-agent/src/process/logs_reader_test.go b/exec-agent/src/process/logs_reader_test.go
new file mode 100644
index 00000000000..1ee7cb4ca94
--- /dev/null
+++ b/exec-agent/src/process/logs_reader_test.go
@@ -0,0 +1,60 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process_test
+
+import (
+ "github.com/eclipse/che/exec-agent/process"
+ "os"
+ "testing"
+ "time"
+)
+
+func TestReadLogs(t *testing.T) {
+ filename := os.TempDir() + string(os.PathSeparator) + randomName(10)
+ defer os.Remove(filename)
+
+ fl, err := process.NewLogger(filename)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Write something to the log
+ now := time.Now()
+ fl.OnStdout("line1", now.Add(time.Second))
+ fl.OnStdout("line2", now.Add(time.Second*2))
+ fl.OnStdout("line3", now.Add(time.Second*3))
+ fl.OnStderr("line4", now.Add(time.Second*4))
+ fl.OnStderr("line5", now.Add(time.Second*5))
+ fl.Close()
+
+ // Read logs [2, 4]
+ logs, err :=
+ process.NewLogsReader(filename).
+ From(now.Add(time.Second * 2)).
+ Till(now.Add(time.Second * 4)).
+ ReadLogs()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Check everything is okay
+ expected := []process.LogMessage{
+ {Kind: process.StdoutKind, Time: now.Add(time.Second * 2), Text: "line2"},
+ {Kind: process.StdoutKind, Time: now.Add(time.Second * 3), Text: "line3"},
+ {Kind: process.StderrKind, Time: now.Add(time.Second * 4), Text: "line4"},
+ }
+ for i := 0; i < len(logs); i++ {
+ if *logs[i] != expected[i] {
+ t.Fatalf("Expected: '%v' Found '%v'", expected[i], *logs[i])
+ }
+ }
+}
diff --git a/exec-agent/src/process/process.go b/exec-agent/src/process/process.go
new file mode 100644
index 00000000000..25075916bae
--- /dev/null
+++ b/exec-agent/src/process/process.go
@@ -0,0 +1,483 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "errors"
+ "fmt"
+ "os"
+ "os/exec"
+ "sync"
+ "sync/atomic"
+ "syscall"
+ "time"
+
+ "github.com/eclipse/che/exec-agent/rpc"
+)
+
+const (
+ StdoutBit = 1 << iota
+ StderrBit = 1 << iota
+ ProcessStatusBit = 1 << iota
+ DefaultMask = StderrBit | StdoutBit | ProcessStatusBit
+
+ DateTimeFormat = time.RFC3339Nano
+
+ StdoutKind = "STDOUT"
+ StderrKind = "STDERR"
+)
+
+var (
+ prevPid uint64 = 0
+ processes = &processesMap{items: make(map[uint64]*MachineProcess)}
+ logsDist = NewLogsDistributor()
+ LogsDir string
+)
+
+type Command struct {
+ Name string `json:"name"`
+ CommandLine string `json:"commandLine"`
+ Type string `json:"type"`
+}
+
+// Defines machine process model
+type MachineProcess struct {
+
+ // The virtual id of the process, it is guaranteed that pid
+ // is always unique, while NativePid may occur twice or more(when including dead processes)
+ Pid uint64 `json:"pid"`
+
+ // The name of the process, it is equal to the Command.Name which this process created from.
+ // It doesn't have to be unique, at least machine agent doesn't need such constraint,
+ // as pid is used for identifying process
+ Name string `json:"name"`
+
+ // The command line executed by this process.
+ // It is equal to the Command.CommandLine which this process created from
+ CommandLine string `json:"commandLine"`
+
+ // The type of the command line, this field is rather useful meta
+ // information than something used for functioning. It is equal
+ // to the Command.Type which this process created from
+ Type string `json:"type"`
+
+ // Whether this process is alive or dead
+ Alive bool `json:"alive"`
+
+ // The native(OS) pid, it is unique per alive processes,
+ // but those which are not alive, may have the same NativePid
+ NativePid int `json:"nativePid"`
+
+ // Process log filename
+ logfileName string
+
+ // Command executed by this process.
+ // If process is not alive then the command value is set to nil
+ command *exec.Cmd
+
+ // Stdout/stderr pumper.
+ // If process is not alive then the pumper value is set to nil
+ pumper *LogsPumper
+
+ // Process subscribers, all the outgoing events are go through those subscribers.
+ // If process is not alive then the subscribers value is set to nil
+ subs []*Subscriber
+
+ // Process file logger
+ fileLogger *FileLogger
+
+ // Process mutex should be used to sync process data
+ // or block on process related operations such as events publications
+ mutex sync.RWMutex
+
+ // When the process was last time used by client
+ lastUsed time.Time
+ lastUsedLock sync.RWMutex
+
+ // Called once before any of process events is published
+ // and after process is started
+ beforeEventsHook func(process MachineProcess)
+}
+
+type Subscriber struct {
+ Id string
+ Mask uint64
+ Channel chan *rpc.Event
+}
+
+type LogMessage struct {
+ Kind string `json:"kind"`
+ Time time.Time `json:"time"`
+ Text string `json:"text"`
+}
+
+type NoProcessError struct {
+ error
+ Pid uint64
+}
+
+type NotAliveError struct {
+ error
+ Pid uint64
+}
+
+// Lockable map for storing processes
+type processesMap struct {
+ sync.RWMutex
+ items map[uint64]*MachineProcess
+}
+
+func Start(process MachineProcess) (MachineProcess, error) {
+ // wrap command to be able to kill child processes see https://github.com/golang/go/issues/8854
+ cmd := exec.Command("setsid", "sh", "-c", process.CommandLine)
+
+ // getting stdout pipe
+ stdout, err := cmd.StdoutPipe()
+ if err != nil {
+ return process, err
+ }
+
+ // getting stderr pipe
+ stderr, err := cmd.StderrPipe()
+ if err != nil {
+ return process, err
+ }
+
+ // starting a new process
+ err = cmd.Start()
+ if err != nil {
+ return process, err
+ }
+
+ // increment current pid & assign it to the value
+ pid := atomic.AddUint64(&prevPid, 1)
+
+ // Figure out the place for logs file
+ dir, err := logsDist.DirForPid(LogsDir, pid)
+ if err != nil {
+ return process, err
+ }
+ filename := fmt.Sprintf("%s%cpid-%d", dir, os.PathSeparator, pid)
+
+ fileLogger, err := NewLogger(filename)
+ if err != nil {
+ return process, err
+ }
+
+ // save process
+ process.Pid = pid
+ process.Alive = true
+ process.NativePid = cmd.Process.Pid
+ process.command = cmd
+ process.pumper = NewPumper(stdout, stderr)
+ process.logfileName = filename
+ process.fileLogger = fileLogger
+ process.updateLastUsedTime()
+
+ processes.Lock()
+ processes.items[pid] = &process
+ processes.Unlock()
+
+ // register logs consumers
+ process.pumper.AddConsumer(fileLogger)
+ process.pumper.AddConsumer(&process)
+
+ if process.beforeEventsHook != nil {
+ process.beforeEventsHook(process)
+ }
+
+ // before pumping is started publish process_started event
+ startPublished := make(chan bool)
+ go func() {
+ process.notifySubs(newStartedEvent(process), ProcessStatusBit)
+ startPublished <- true
+ }()
+
+ // start pumping after start event is published 'pumper.Pump' is blocking
+ go func() {
+ <-startPublished
+ process.pumper.Pump()
+ }()
+
+ return process, nil
+}
+
+// Gets process by pid.
+// If process doesn't exist then error of type NoProcessError is returned.
+func Get(pid uint64) (MachineProcess, error) {
+ p, ok := directGet(pid)
+ if ok {
+ return *p, nil
+ }
+ return MachineProcess{}, noProcess(pid)
+}
+
+func GetProcesses(all bool) []MachineProcess {
+ processes.RLock()
+ defer processes.RUnlock()
+
+ pArr := make([]MachineProcess, 0, len(processes.items))
+ for _, p := range processes.items {
+ if all {
+ pArr = append(pArr, *p)
+ } else {
+ p.mutex.RLock()
+ if p.Alive {
+ pArr = append(pArr, *p)
+ }
+ p.mutex.RUnlock()
+ }
+ }
+ return pArr
+}
+
+// Kills process by given pid.
+// Returns an error when any error occurs during process kill.
+// If process doesn't exist error of type NoProcessError is returned.
+func Kill(pid uint64) error {
+ p, ok := directGet(pid)
+ if !ok {
+ return noProcess(pid)
+ }
+ if !p.Alive {
+ return notAlive(pid)
+ }
+ // workaround for killing child processes see https://github.com/golang/go/issues/8854
+ return syscall.Kill(-p.NativePid, syscall.SIGKILL)
+}
+
+// Reads process logs between [from, till] inclusive.
+// Returns an error if any error occurs during logs reading.
+// If process doesn't exist error of type NoProcessError is returned.
+func ReadLogs(pid uint64, from time.Time, till time.Time) ([]*LogMessage, error) {
+ p, ok := directGet(pid)
+ if !ok {
+ return nil, noProcess(pid)
+ }
+ fl := p.fileLogger
+ if p.Alive {
+ fl.Flush()
+ }
+ return NewLogsReader(p.logfileName).From(from).Till(till).ReadLogs()
+}
+
+// Reads all process logs.
+// Returns an error if any error occurs during logs reading.
+// If process doesn't exist error of type NoProcessError is returned.
+func ReadAllLogs(pid uint64) ([]*LogMessage, error) {
+ return ReadLogs(pid, time.Time{}, time.Now())
+}
+
+// Unsubscribe subscriber with given id from process events.
+// If process doesn't exist then error of type NoProcessError is returned.
+func RemoveSubscriber(pid uint64, id string) error {
+ p, ok := directGet(pid)
+ if !ok {
+ return noProcess(pid)
+ }
+ if !p.Alive {
+ return notAlive(pid)
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ for idx, sub := range p.subs {
+ if sub.Id == id {
+ p.subs = append(p.subs[0:idx], p.subs[idx+1:]...)
+ break
+ }
+ }
+ return nil
+}
+
+// Subscribe to the process output.
+// An error of type NoProcessError is returned when process
+// with given pid doesn't exist, a regular error is returned
+// if the process is dead or subscriber with such id already subscribed
+// to the process output.
+func AddSubscriber(pid uint64, subscriber Subscriber) error {
+ p, ok := directGet(pid)
+ if !ok {
+ return noProcess(pid)
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ if !p.Alive && p.NativePid != 0 {
+ return errors.New("Can't subscribe to the events of dead process")
+ }
+ for _, sub := range p.subs {
+ if sub.Id == subscriber.Id {
+ return errors.New("Already subscribed")
+ }
+ }
+ p.subs = append(p.subs, &subscriber)
+ return nil
+}
+
+// Adds a new process subscriber by reading all the logs between
+// given 'after' and now and publishing them to the channel.
+// Returns an error of type NoProcessError if process with given id doesn't exist,
+// returns a regular error if process is alive an subscriber with such id
+// already subscribed.
+func RestoreSubscriber(pid uint64, subscriber Subscriber, after time.Time) error {
+ p, ok := directGet(pid)
+ if !ok {
+ return noProcess(pid)
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+
+ // Read logs between after and now
+ logs, err := ReadLogs(pid, after, time.Now())
+ if err != nil {
+ return err
+ }
+
+ // If process is dead there is no need to subscribe to it
+ // as it is impossible to get it alive again, but it is still
+ // may be useful for client to get missed logs, that's why this
+ // function doesn't throw any errors in the case of dead process
+ if p.Alive {
+ for _, sub := range p.subs {
+ if sub.Id == subscriber.Id {
+ return errors.New("Already subscribed")
+ }
+ }
+ p.subs = append(p.subs, &subscriber)
+ }
+
+ // Publish all the logs between (after, now]
+ for i := 0; i < len(logs); i++ {
+ message := logs[i]
+ if message.Time.After(after) {
+ if message.Kind == StdoutKind {
+ subscriber.Channel <- newStdoutEvent(p.Pid, message.Text, message.Time)
+ } else {
+ subscriber.Channel <- newStderrEvent(p.Pid, message.Text, message.Time)
+ }
+ }
+ }
+
+ // Publish died event after logs are published and process is dead
+ if !p.Alive {
+ subscriber.Channel <- newDiedEvent(*p)
+ }
+
+ return nil
+}
+
+// Updates subscriber with given id.
+// An error of type NoProcessError is returned when process
+// with given pid doesn't exist, a regular error is returned
+// if the process is dead.
+func UpdateSubscriber(pid uint64, id string, newMask uint64) error {
+ p, ok := directGet(pid)
+ if !ok {
+ return noProcess(pid)
+ }
+ if !p.Alive {
+ return notAlive(pid)
+ }
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ for _, sub := range p.subs {
+ if sub.Id == id {
+ sub.Mask = newMask
+ return nil
+ }
+ }
+ return errors.New(fmt.Sprintf("No subscriber with id '%s'", id))
+}
+
+func (process *MachineProcess) OnStdout(line string, time time.Time) {
+ process.notifySubs(newStdoutEvent(process.Pid, line, time), StdoutBit)
+}
+
+func (process *MachineProcess) OnStderr(line string, time time.Time) {
+ process.notifySubs(newStderrEvent(process.Pid, line, time), StderrBit)
+}
+
+func (mp *MachineProcess) Close() {
+ // Cleanup command resources
+ mp.command.Wait()
+ // Cleanup machine process resources before dead event is sent
+ mp.mutex.Lock()
+ mp.Alive = false
+ mp.command = nil
+ mp.pumper = nil
+ mp.mutex.Unlock()
+
+ mp.notifySubs(newDiedEvent(*mp), ProcessStatusBit)
+
+ mp.mutex.Lock()
+ mp.subs = nil
+ mp.mutex.Unlock()
+
+ mp.updateLastUsedTime()
+}
+
+func (p *MachineProcess) notifySubs(event *rpc.Event, typeBit uint64) {
+ p.mutex.RLock()
+ subs := p.subs
+ for _, subscriber := range subs {
+ // Check whether subscriber needs such kind of event and then try to notify it
+ if subscriber.Mask&typeBit == typeBit && !tryWrite(subscriber.Channel, event) {
+ // Impossible to write to the channel, remove the channel from the subscribers list.
+ // It may happen when writing to the closed channel
+ defer RemoveSubscriber(p.Pid, subscriber.Id)
+ }
+ }
+ p.mutex.RUnlock()
+}
+
+func (mp *MachineProcess) updateLastUsedTime() {
+ mp.lastUsedLock.Lock()
+ mp.lastUsed = time.Now()
+ mp.lastUsedLock.Unlock()
+}
+
+// Writes to a channel and returns true if write is successful,
+// otherwise if write to the channel failed e.g. channel is closed then returns false
+func tryWrite(eventsChan chan *rpc.Event, event *rpc.Event) (ok bool) {
+ defer func() {
+ if r := recover(); r != nil {
+ ok = false
+ }
+ }()
+ eventsChan <- event
+ return true
+}
+
+func directGet(pid uint64) (*MachineProcess, bool) {
+ processes.RLock()
+ defer processes.RUnlock()
+ item, ok := processes.items[pid]
+ if ok {
+ item.updateLastUsedTime()
+ }
+ return item, ok
+}
+
+// Returns an error indicating that process with given pid doesn't exist
+func noProcess(pid uint64) *NoProcessError {
+ return &NoProcessError{
+ error: errors.New(fmt.Sprintf("Process with id '%d' does not exist", pid)),
+ Pid: pid,
+ }
+}
+
+// Returns an error indicating that process with given pid is not alive
+func notAlive(pid uint64) *NotAliveError {
+ return &NotAliveError{
+ error: errors.New(fmt.Sprintf("Process with id '%d' is not alive", pid)),
+ Pid: pid,
+ }
+}
diff --git a/exec-agent/src/process/process_builder.go b/exec-agent/src/process/process_builder.go
new file mode 100644
index 00000000000..3d18110df41
--- /dev/null
+++ b/exec-agent/src/process/process_builder.go
@@ -0,0 +1,72 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+type ProcessBuilder struct {
+ command Command
+ beforeEventsHook func(p MachineProcess)
+ firstSubscriber *Subscriber
+}
+
+func NewBuilder() *ProcessBuilder {
+ return &ProcessBuilder{}
+}
+
+func (pb *ProcessBuilder) Cmd(command Command) *ProcessBuilder {
+ pb.command = command
+ return pb
+}
+
+func (pb *ProcessBuilder) CmdLine(cmdLine string) *ProcessBuilder {
+ pb.command.CommandLine = cmdLine
+ return pb
+}
+
+func (pb *ProcessBuilder) CmdType(cmdType string) *ProcessBuilder {
+ pb.command.Type = cmdType
+ return pb
+}
+
+func (pb *ProcessBuilder) CmdName(cmdName string) *ProcessBuilder {
+ pb.command.Name = cmdName
+ return pb
+}
+
+// Sets the hook which will be called once before
+// process subscribers notified with any of the process events,
+// and after process is started.
+func (pb *ProcessBuilder) BeforeEventsHook(hook func(p MachineProcess)) *ProcessBuilder {
+ pb.beforeEventsHook = hook
+ return pb
+}
+
+func (pb *ProcessBuilder) FirstSubscriber(subscriber Subscriber) *ProcessBuilder {
+ pb.firstSubscriber = &subscriber
+ return pb
+}
+
+func (pb *ProcessBuilder) Build() MachineProcess {
+ p := MachineProcess{
+ Name: pb.command.Name,
+ CommandLine: pb.command.CommandLine,
+ Type: pb.command.Type,
+ beforeEventsHook: pb.beforeEventsHook,
+ }
+ if pb.firstSubscriber != nil {
+ p.subs = []*Subscriber{pb.firstSubscriber}
+ }
+ return p
+}
+
+func (pb *ProcessBuilder) Start() (MachineProcess, error) {
+ return Start(pb.Build())
+}
diff --git a/exec-agent/src/process/process_cleaner.go b/exec-agent/src/process/process_cleaner.go
new file mode 100644
index 00000000000..7068618fbc2
--- /dev/null
+++ b/exec-agent/src/process/process_cleaner.go
@@ -0,0 +1,56 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "log"
+ "os"
+ "time"
+)
+
+type Cleaner struct {
+ CleanupPeriod time.Duration
+ CleanupThreshold time.Duration
+}
+
+func NewCleaner(period int, threshold int) *Cleaner {
+ return &Cleaner{
+ time.Duration(period) * time.Minute,
+ time.Duration(threshold) * time.Minute,
+ }
+}
+
+func (c *Cleaner) CleanPeriodically() {
+ ticker := time.NewTicker(c.CleanupPeriod)
+ defer ticker.Stop()
+ for range ticker.C {
+ c.CleanOnce()
+ }
+}
+
+func (pc *Cleaner) CleanOnce() {
+ deadPoint := time.Now().Add(-pc.CleanupThreshold)
+ processes.Lock()
+ for _, mp := range processes.items {
+ mp.lastUsedLock.RLock()
+ if !mp.Alive && mp.lastUsed.Before(deadPoint) {
+ delete(processes.items, mp.Pid)
+ if err := os.Remove(mp.logfileName); err != nil {
+ if !os.IsNotExist(err) {
+ log.Printf("Couldn't remove process logs file, '%s'", mp.logfileName)
+ }
+ }
+ }
+ mp.lastUsedLock.RUnlock()
+ }
+ processes.Unlock()
+}
diff --git a/exec-agent/src/process/process_cleaner_test.go b/exec-agent/src/process/process_cleaner_test.go
new file mode 100644
index 00000000000..42f8fc1d12a
--- /dev/null
+++ b/exec-agent/src/process/process_cleaner_test.go
@@ -0,0 +1,60 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process_test
+
+import (
+ "testing"
+
+ "github.com/eclipse/che/exec-agent/process"
+ "time"
+)
+
+func TestCleanWithZeroThreshold(t *testing.T) {
+ p := startAndWaitTestProcess(testCmd, t)
+ defer cleanupLogsDir()
+
+ process.NewCleaner(0, 0).CleanOnce()
+
+ _, err := process.Get(p.Pid)
+ if err == nil {
+ t.Fatal("Must not exist")
+ }
+ if _, ok := err.(*process.NoProcessError); !ok {
+ t.Fatal(err)
+ }
+}
+
+func TestCleansOnlyUnusedProcesses(t *testing.T) {
+ p1 := startAndWaitTestProcess(testCmd, t)
+ p2 := startAndWaitTestProcess(testCmd, t)
+
+ time.Sleep(500 * time.Millisecond)
+
+ // use one of the processes, so it is used now
+ process.Get(p1.Pid)
+
+ // cleanup immediately
+ (&process.Cleaner{CleanupPeriod: 0, CleanupThreshold: 500 * time.Millisecond}).CleanOnce()
+
+ _, err1 := process.Get(p1.Pid)
+ _, err2 := process.Get(p2.Pid)
+
+ // process 1 must be cleaned
+ if err1 != nil {
+ t.Fatalf("Expected process 2 to exist, but got an error: %s", err1.Error())
+ }
+
+ // process 2 must exist
+ if _, ok := err2.(*process.NoProcessError); !ok {
+ t.Fatal("Expected process 2 to be cleaned")
+ }
+}
diff --git a/exec-agent/src/process/process_test.go b/exec-agent/src/process/process_test.go
new file mode 100644
index 00000000000..a535789922a
--- /dev/null
+++ b/exec-agent/src/process/process_test.go
@@ -0,0 +1,255 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process_test
+
+import (
+ "github.com/eclipse/che/exec-agent/process"
+ "github.com/eclipse/che/exec-agent/rpc"
+ "os"
+ "strings"
+ "testing"
+ "time"
+)
+
+const (
+ testCmd = "printf \"1\n2\n3\n4\n5\n6\n7\n8\n9\n10\""
+)
+
+func TestOneLineOutput(t *testing.T) {
+ defer cleanupLogsDir()
+ // create and start a process
+ p := startAndWaitTestProcess("echo test", t)
+
+ logs, _ := process.ReadAllLogs(p.Pid)
+
+ if len(logs) != 1 {
+ t.Fatalf("Expected logs size to be 1, but got %d", len(logs))
+ }
+
+ if logs[0].Text != "test" {
+ t.Fatalf("Expected to get 'test' output but got %s", logs[0].Text)
+ }
+}
+
+func TestEmptyLinesOutput(t *testing.T) {
+ defer cleanupLogsDir()
+ p := startAndWaitTestProcess("printf \"\n\n\n\n\n\"", t)
+
+ logs, _ := process.ReadAllLogs(p.Pid)
+
+ if len(logs) != 5 {
+ t.Fatal("Expected logs to be 4 sized")
+ }
+
+ for _, value := range logs {
+ if value.Text != "" {
+ t.Fatal("Expected all the logs to be empty files")
+ }
+ }
+}
+
+func TestAddSubscriber(t *testing.T) {
+ process.LogsDir = TmpFile()
+ defer cleanupLogsDir()
+
+ outputLines := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
+
+ // create and start a process
+ pb := process.NewBuilder().
+ CmdName("test").
+ CmdType("test").
+ CmdLine("printf \"" + strings.Join(outputLines, "\n") + "\"")
+
+ // add a new subscriber
+ eventsChan := make(chan *rpc.Event)
+ pb.FirstSubscriber(process.Subscriber{
+ Id: "test",
+ Mask: process.DefaultMask,
+ Channel: eventsChan,
+ })
+
+ // start a new process
+ if _, err := pb.Start(); err != nil {
+ t.Fatal(err)
+ }
+
+ // read all the process output events
+ done := make(chan bool)
+ var received []string
+ go func() {
+ event := <-eventsChan
+ for event.EventType != process.DiedEventType {
+ if event.EventType == process.StdoutEventType {
+ out := event.Body.(*process.ProcessOutputEventBody)
+ received = append(received, out.Text)
+ }
+ event = <-eventsChan
+ }
+ done <- true
+ }()
+
+ // wait until process is done
+ <-done
+
+ if len(outputLines) != len(received) {
+ t.Fatalf("Expected the same size but got %d != %d", len(outputLines), len(received))
+ }
+
+ for idx, value := range outputLines {
+ if value != received[idx] {
+ t.Fatalf("Expected %s but got %s", value, received[idx])
+ }
+ }
+}
+
+func TestRestoreSubscriberForDeadProcess(t *testing.T) {
+ process.LogsDir = TmpFile()
+ defer cleanupLogsDir()
+ beforeStart := time.Now()
+ p := startAndWaitTestProcess("echo test", t)
+
+ // Read all the data from channel
+ channel := make(chan *rpc.Event)
+ done := make(chan bool)
+ var received []*rpc.Event
+ go func() {
+ statusReceived := false
+ timeoutReached := false
+ for !statusReceived && !timeoutReached {
+ select {
+ case v := <-channel:
+ received = append(received, v)
+ if v.EventType == process.DiedEventType {
+ statusReceived = true
+ }
+ case <-time.After(time.Second):
+ timeoutReached = true
+ }
+ }
+ done <- true
+ }()
+
+ process.RestoreSubscriber(p.Pid, process.Subscriber{
+ "test",
+ process.DefaultMask,
+ channel,
+ }, beforeStart)
+
+ <-done
+
+ if len(received) != 2 {
+ t.Fatalf("Expected to recieve 2 events but got %d", len(received))
+ }
+ e1Type := received[0].EventType
+ e1Text := received[0].Body.(*process.ProcessOutputEventBody).Text
+ if received[0].EventType != process.StdoutEventType || e1Text != "test" {
+ t.Fatalf("Expected to receieve output event with text 'test', but got '%s' event with text %s",
+ e1Type,
+ e1Text)
+ }
+ if received[1].EventType != process.DiedEventType {
+ t.Fatal("Expected to get 'process_died' event")
+ }
+}
+
+func TestMachineProcessIsNotAliveAfterItIsDead(t *testing.T) {
+ p := startAndWaitTestProcess(testCmd, t)
+ defer cleanupLogsDir()
+ if p.Alive {
+ t.Fatal("Process should not be alive")
+ }
+}
+
+func TestItIsNotPossibleToAddSubscriberToDeadProcess(t *testing.T) {
+ p := startAndWaitTestProcess(testCmd, t)
+ defer cleanupLogsDir()
+ if err := process.AddSubscriber(p.Pid, process.Subscriber{}); err == nil {
+ t.Fatal("Should not be able to add subscriber")
+ }
+}
+
+func TestReadProcessLogs(t *testing.T) {
+ p := startAndWaitTestProcess(testCmd, t)
+ defer cleanupLogsDir()
+ logs, err := process.ReadLogs(p.Pid, time.Time{}, time.Now())
+ if err != nil {
+ t.Fatal(err)
+ }
+ expected := []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}
+
+ for idx := range expected {
+ if process.StdoutKind != logs[idx].Kind {
+ t.Fatalf("Expected log message kind to be '%s', while got '%s'", process.StdoutKind, logs[idx].Kind)
+ }
+ if expected[idx] != logs[idx].Text {
+ t.Fatalf("Expected log message to be '%s', but got '%s'", expected[idx], logs[idx].Text)
+ }
+ }
+}
+
+func startAndWaitTestProcess(cmd string, t *testing.T) process.MachineProcess {
+ process.LogsDir = TmpFile()
+ events := make(chan *rpc.Event)
+ done := make(chan bool)
+
+ // Create and start process
+ pb := process.NewBuilder().
+ CmdName("test").
+ CmdType("test").
+ CmdLine(cmd).
+ FirstSubscriber(process.Subscriber{
+ Id: "test",
+ Mask: process.DefaultMask,
+ Channel: events,
+ })
+
+ go func() {
+ statusReceived := false
+ timeoutReached := false
+ for !statusReceived && !timeoutReached {
+ select {
+ case event := <-events:
+ if event.EventType == process.DiedEventType {
+ statusReceived = true
+ }
+ case <-time.After(time.Second):
+ timeoutReached = true
+ }
+ }
+ done <- true
+ }()
+
+ p, err := pb.Start()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Wait until process is finished or timeout is reached
+ if ok := <-done; !ok {
+ t.Fatalf("Expected to receive %s process event", process.DiedEventType)
+ }
+
+ // Check process state after it is finished
+ result, err := process.Get(p.Pid)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return result
+}
+
+func TmpFile() string {
+ return os.TempDir() + string(os.PathSeparator) + randomName(10)
+}
+
+func cleanupLogsDir() {
+ os.RemoveAll(process.LogsDir)
+}
diff --git a/exec-agent/src/process/pumper.go b/exec-agent/src/process/pumper.go
new file mode 100644
index 00000000000..86c01656ecd
--- /dev/null
+++ b/exec-agent/src/process/pumper.go
@@ -0,0 +1,108 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "bufio"
+ "io"
+ "log"
+ "sync"
+ "time"
+)
+
+type acceptLine func(line string)
+
+// LogsPumper client consumes a message read by pumper
+type LogsConsumer interface {
+
+ // called on each line pumped from process stdout
+ OnStdout(line string, time time.Time)
+
+ // called on each line pumped from process stderr
+ OnStderr(line string, time time.Time)
+
+ // called when pumping is finished either by normal return or by error
+ Close()
+}
+
+// Pumps lines from the stdout and stderr
+type LogsPumper struct {
+ stdout io.Reader
+ stderr io.Reader
+ clients []LogsConsumer
+ waitGroup sync.WaitGroup
+}
+
+func NewPumper(stdout io.Reader, stderr io.Reader) *LogsPumper {
+ return &LogsPumper{
+ stdout: stdout,
+ stderr: stderr,
+ }
+}
+
+func (pumper *LogsPumper) AddConsumer(consumer LogsConsumer) {
+ pumper.clients = append(pumper.clients, consumer)
+}
+
+// Start 'pumping' logs from the stdout and stderr
+// The method execution is synchronous and waits for
+// both stderr and stdout to complete closing all the clients after
+func (pumper *LogsPumper) Pump() {
+ pumper.waitGroup.Add(2)
+
+ // reading from stdout & stderr
+ go pump(pumper.stdout, pumper.notifyStdout, &pumper.waitGroup)
+ go pump(pumper.stderr, pumper.notifyStderr, &pumper.waitGroup)
+
+ // cleanup after pumping is complete
+ pumper.waitGroup.Wait()
+ pumper.notifyClose()
+}
+
+func pump(r io.Reader, lineConsumer acceptLine, wg *sync.WaitGroup) {
+ defer wg.Done()
+ br := bufio.NewReader(r)
+ for {
+ line, err := br.ReadBytes('\n')
+
+ if err != nil {
+ if err != io.EOF {
+ log.Println("Error pumping: " + err.Error())
+ } else if len(line) != 0 {
+ lineConsumer(string(line))
+ }
+ return
+ }
+
+ lineConsumer(string(line[:len(line)-1]))
+ }
+}
+
+func (pumper *LogsPumper) notifyStdout(line string) {
+ t := time.Now()
+ for _, client := range pumper.clients {
+ client.OnStdout(line, t)
+ }
+}
+
+func (pumper *LogsPumper) notifyStderr(line string) {
+ t := time.Now()
+ for _, client := range pumper.clients {
+ client.OnStderr(line, t)
+ }
+}
+
+func (pumper *LogsPumper) notifyClose() {
+ for _, client := range pumper.clients {
+ client.Close()
+ }
+}
diff --git a/exec-agent/src/process/rest_service.go b/exec-agent/src/process/rest_service.go
new file mode 100644
index 00000000000..c2c4b497a06
--- /dev/null
+++ b/exec-agent/src/process/rest_service.go
@@ -0,0 +1,194 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "errors"
+ "fmt"
+ "github.com/eclipse/che/exec-agent/rest"
+ "github.com/eclipse/che/exec-agent/rest/restutil"
+ "github.com/eclipse/che/exec-agent/rpc"
+ "io"
+ "math"
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+)
+
+var HttpRoutes = rest.RoutesGroup{
+ "Process Routes",
+ []rest.Route{
+ {
+ "POST",
+ "Start Process",
+ "/process",
+ startProcessHF,
+ },
+ {
+ "GET",
+ "Get Process",
+ "/process/:pid",
+ getProcessHF,
+ },
+ {
+ "DELETE",
+ "Kill Process",
+ "/process/:pid",
+ killProcessHF,
+ },
+ {
+ "GET",
+ "Get Process Logs",
+ "/process/:pid/logs",
+ getProcessLogsHF,
+ },
+ {
+ "GET",
+ "Get Processes",
+ "/process",
+ getProcessesHF,
+ },
+ },
+}
+
+func startProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error {
+ command := Command{}
+ if err := restutil.ReadJson(r, &command); err != nil {
+ return err
+ }
+ if err := checkCommand(&command); err != nil {
+ return rest.BadRequest(err)
+ }
+
+ // If channel is provided then check whether it is ready to be
+ // first process subscriber and use it if it is
+ var subscriber *Subscriber
+ channelId := r.URL.Query().Get("channel")
+ if channelId != "" {
+ channel, ok := rpc.GetChannel(channelId)
+ if !ok {
+ m := fmt.Sprintf("Channel with id '%s' doesn't exist. Process won't be started", channelId)
+ return rest.NotFound(errors.New(m))
+ }
+ subscriber = &Subscriber{
+ Id: channelId,
+ Mask: parseTypes(r.URL.Query().Get("types")),
+ Channel: channel.Events,
+ }
+ }
+
+ pb := NewBuilder().Cmd(command)
+
+ if subscriber != nil {
+ pb.FirstSubscriber(*subscriber)
+ }
+
+ process, err := pb.Start()
+ if err != nil {
+ return err
+ }
+ return restutil.WriteJson(w, process)
+}
+
+func getProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error {
+ pid, err := parsePid(p.Get("pid"))
+ if err != nil {
+ return rest.BadRequest(err)
+ }
+
+ process, err := Get(pid)
+ if err != nil {
+ return asHttpError(err)
+ }
+ return restutil.WriteJson(w, process)
+}
+
+func killProcessHF(w http.ResponseWriter, r *http.Request, p rest.Params) error {
+ pid, err := parsePid(p.Get("pid"))
+ if err != nil {
+ return rest.BadRequest(err)
+ }
+ if err := Kill(pid); err != nil {
+ return asHttpError(err)
+ }
+ return nil
+}
+
+func getProcessLogsHF(w http.ResponseWriter, r *http.Request, p rest.Params) error {
+ pid, err := parsePid(p.Get("pid"))
+ if err != nil {
+ return rest.BadRequest(err)
+ }
+
+ // Parse 'from', if 'from' is not specified then read all the logs from the start
+ // if 'from' format is different from the DATE_TIME_FORMAT then return 400
+ from, err := parseTime(r.URL.Query().Get("from"), time.Time{})
+ if err != nil {
+ return rest.BadRequest(errors.New("Bad format of 'from', " + err.Error()))
+ }
+
+ // Parse 'till', if 'till' is not specified then 'now' is used for it
+ // if 'till' format is different from the DATE_TIME_FORMAT then return 400
+ till, err := parseTime(r.URL.Query().Get("till"), time.Now())
+ if err != nil {
+ return rest.BadRequest(errors.New("Bad format of 'till', " + err.Error()))
+ }
+
+ logs, err := ReadLogs(pid, from, till)
+ if err != nil {
+ return asHttpError(err)
+ }
+
+ // limit logs from the latest to the earliest
+ // limit - how many the latest logs will be present
+ // skip - how many log lines should be skipped from the end
+ limit := restutil.IntQueryParam(r, "limit", DefaultLogsPerPageLimit)
+ skip := restutil.IntQueryParam(r, "skip", 0)
+ if limit < 1 {
+ return rest.BadRequest(errors.New("Required 'limit' to be > 0"))
+ }
+ if skip < 0 {
+ return rest.BadRequest(errors.New("Required 'skip' to be >= 0"))
+ }
+ len := len(logs)
+ fromIdx := int(math.Max(float64(len-limit-skip), 0))
+ toIdx := len - int(math.Min(float64(skip), float64(len)))
+
+ // Respond with an appropriate logs format, default json
+ format := r.URL.Query().Get("format")
+ switch strings.ToLower(format) {
+ case "text":
+ for _, item := range logs[fromIdx:toIdx] {
+ line := fmt.Sprintf("[%s] %s \t %s\n", item.Kind, item.Time.Format(DateTimeFormat), item.Text)
+ io.WriteString(w, line)
+ }
+ default:
+ return restutil.WriteJson(w, logs[fromIdx:toIdx])
+ }
+ return nil
+}
+
+func getProcessesHF(w http.ResponseWriter, r *http.Request, _ rest.Params) error {
+ all, err := strconv.ParseBool(r.URL.Query().Get("all"))
+ if err != nil {
+ all = false
+ }
+ return restutil.WriteJson(w, GetProcesses(all))
+}
+
+func asHttpError(err error) error {
+ if npErr, ok := err.(*NoProcessError); ok {
+ return rest.NotFound(npErr.error)
+ }
+ return err
+}
diff --git a/exec-agent/src/process/ws_service.go b/exec-agent/src/process/ws_service.go
new file mode 100644
index 00000000000..a75ea8bd1d8
--- /dev/null
+++ b/exec-agent/src/process/ws_service.go
@@ -0,0 +1,348 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package process
+
+import (
+ "encoding/json"
+ "errors"
+ "github.com/eclipse/che/exec-agent/rpc"
+ "math"
+ "time"
+)
+
+const (
+ StartMethod = "process.start"
+ KillMethod = "process.kill"
+ SubscribeMethod = "process.subscribe"
+ UnsubscribeMethod = "process.unsubscribe"
+ UpdateSubscriberMethod = "process.updateSubscriber"
+ GetLogsMethod = "process.getLogs"
+ GetProcessMethod = "process.getProcess"
+ GetProcessesMethod = "process.getProcesses"
+
+ NoSuchProcessErrorCode = -32000
+ ProcessNotAliveErrorCode = -32001
+)
+
+var RpcRoutes = rpc.RoutesGroup{
+ "Process Routes",
+ []rpc.Route{
+ {
+ StartMethod,
+ func(body []byte) (interface{}, error) {
+ b := StartParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ startProcessReqHF,
+ },
+ {
+ KillMethod,
+ func(body []byte) (interface{}, error) {
+ b := KillParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ killProcessReqHF,
+ },
+ {
+ SubscribeMethod,
+ func(body []byte) (interface{}, error) {
+ b := SubscribeParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ subscribeReqHF,
+ },
+ {
+ UnsubscribeMethod,
+ func(body []byte) (interface{}, error) {
+ b := UnsubscribeParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ unsubscribeReqHF,
+ },
+ {
+ UpdateSubscriberMethod,
+ func(body []byte) (interface{}, error) {
+ b := UpdateSubscriberParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ updateSubscriberReqHF,
+ },
+ {
+ GetLogsMethod,
+ func(body []byte) (interface{}, error) {
+ b := GetLogsParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ getProcessLogsReqHF,
+ },
+ {
+ GetProcessMethod,
+ func(body []byte) (interface{}, error) {
+ b := GetProcessParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ getProcessReqHF,
+ },
+ {
+ GetProcessesMethod,
+ func(body []byte) (interface{}, error) {
+ b := GetProcessesParams{}
+ err := json.Unmarshal(body, &b)
+ return b, err
+ },
+ getProcessesReqHF,
+ },
+ },
+}
+
+type ProcessResult struct {
+ Pid uint64 `json:"pid"`
+ Text string `json:"text"`
+}
+
+//-- process start
+type StartParams struct {
+ Name string `json:"name"`
+ CommandLine string `json:"commandLine"`
+ Type string `json:"type"`
+ EventTypes string `json:"eventTypes"`
+}
+
+func startProcessReqHF(params interface{}, t *rpc.Transmitter) error {
+ startParams := params.(StartParams)
+ command := Command{
+ Name: startParams.Name,
+ CommandLine: startParams.CommandLine,
+ Type: startParams.Type,
+ }
+ if err := checkCommand(&command); err != nil {
+ return rpc.NewArgsError(err)
+ }
+
+ _, err := NewBuilder().
+ Cmd(command).
+ FirstSubscriber(Subscriber{
+ Id: t.Channel.Id,
+ Mask: parseTypes(startParams.EventTypes),
+ Channel: t.Channel.Events,
+ }).
+ BeforeEventsHook(func(process MachineProcess) {
+ t.Send(process)
+ }).
+ Start()
+ return err
+}
+
+//-- process kill
+type KillParams struct {
+ Pid uint64 `json:"pid"`
+ NativePid uint64 `json:"nativePid"`
+}
+
+func killProcessReqHF(params interface{}, t *rpc.Transmitter) error {
+ killParams := params.(KillParams)
+ if err := Kill(killParams.Pid); err != nil {
+ return asRpcError(err)
+ }
+ t.Send(&ProcessResult{
+ Pid: killParams.Pid,
+ Text: "Successfully killed",
+ })
+ return nil
+}
+
+//-- process subscribe
+type SubscribeResult struct {
+ Pid uint64 `json:"pid"`
+ EventTypes string `json:"eventTypes"`
+ Text string `json:"text"`
+}
+
+type SubscribeParams struct {
+ Pid uint64 `json:"pid"`
+ EventTypes string `json:"eventTypes"`
+ After string `json:"after"`
+}
+
+func subscribeReqHF(params interface{}, t *rpc.Transmitter) error {
+ subscribeParams := params.(SubscribeParams)
+
+ mask := maskFromTypes(subscribeParams.EventTypes)
+ if mask == 0 {
+ return rpc.NewArgsError(errors.New("Required at least 1 valid event type"))
+ }
+
+ subscriber := Subscriber{
+ Id: t.Channel.Id,
+ Mask: mask,
+ Channel: t.Channel.Events,
+ }
+ // Check whether subscriber should see previous logs or not
+ if subscribeParams.After == "" {
+ if err := AddSubscriber(subscribeParams.Pid, subscriber); err != nil {
+ return asRpcError(err)
+ }
+ } else {
+ after, err := time.Parse(DateTimeFormat, subscribeParams.After)
+ if err != nil {
+ return rpc.NewArgsError(errors.New("Bad format of 'after', " + err.Error()))
+ }
+ if err := RestoreSubscriber(subscribeParams.Pid, subscriber, after); err != nil {
+ return err
+ }
+ }
+ t.Send(&SubscribeResult{
+ Pid: subscribeParams.Pid,
+ EventTypes: subscribeParams.EventTypes,
+ Text: "Successfully subscribed",
+ })
+ return nil
+}
+
+//-- process unsubscribe
+type UnsubscribeParams struct {
+ Pid uint64 `json:"pid"`
+}
+
+func unsubscribeReqHF(params interface{}, t *rpc.Transmitter) error {
+ unsubscribeParams := params.(UnsubscribeParams)
+ if err := RemoveSubscriber(unsubscribeParams.Pid, t.Channel.Id); err != nil {
+ return asRpcError(err)
+ }
+ t.Send(&ProcessResult{
+ Pid: unsubscribeParams.Pid,
+ Text: "Successfully unsubscribed",
+ })
+ return nil
+}
+
+//-- process update subscriber
+type UpdateSubscriberParams struct {
+ Pid uint64 `json:"pid"`
+ EventTypes string `json:"eventTypes"`
+}
+
+func updateSubscriberReqHF(params interface{}, t *rpc.Transmitter) error {
+ updateParams := params.(UpdateSubscriberParams)
+ if updateParams.EventTypes == "" {
+ return rpc.NewArgsError(errors.New("'eventTypes' required for subscriber update"))
+ }
+ if err := UpdateSubscriber(updateParams.Pid, t.Channel.Id, maskFromTypes(updateParams.EventTypes)); err != nil {
+ return asRpcError(err)
+ }
+ t.Send(&SubscribeResult{
+ Pid: updateParams.Pid,
+ EventTypes: updateParams.EventTypes,
+ Text: "Subscriber successfully updated",
+ })
+ return nil
+}
+
+//-- process get logs
+type GetLogsParams struct {
+ Pid uint64 `json:"pid"`
+ From string `json:"from"`
+ Till string `json:"till"`
+ Limit int `json:"limit"`
+ Skip int `json:"skip"`
+}
+
+func getProcessLogsReqHF(params interface{}, t *rpc.Transmitter) error {
+ getLogsParams := params.(GetLogsParams)
+
+ if getLogsParams.Skip < 0 {
+ getLogsParams.Skip = 0
+ }
+ if getLogsParams.Limit < 0 {
+ getLogsParams.Limit = 0
+ }
+
+ from, err := parseTime(getLogsParams.From, time.Time{})
+ if err != nil {
+ return rpc.NewArgsError(errors.New("Bad format of 'from', " + err.Error()))
+ }
+
+ till, err := parseTime(getLogsParams.Till, time.Now())
+ if err != nil {
+ return rpc.NewArgsError(errors.New("Bad format of 'till', " + err.Error()))
+ }
+
+ logs, err := ReadLogs(getLogsParams.Pid, from, till)
+ if err != nil {
+ return asRpcError(err)
+ }
+
+ limit := DefaultLogsPerPageLimit
+ if getLogsParams.Limit != 0 {
+ if limit < 1 {
+ return rpc.NewArgsError(errors.New("Required 'limit' to be > 0"))
+ }
+ limit = getLogsParams.Limit
+ }
+
+ skip := 0
+ if getLogsParams.Skip != 0 {
+ if skip < 0 {
+ return rpc.NewArgsError(errors.New("Required 'skip' to be >= 0"))
+ }
+ skip = getLogsParams.Skip
+ }
+
+ logsLen := len(logs)
+ fromIdx := int(math.Max(float64(logsLen-limit-skip), 0))
+ toIdx := logsLen - int(math.Min(float64(skip), float64(logsLen)))
+
+ t.Send(logs[fromIdx:toIdx])
+ return nil
+}
+
+//-- get process
+type GetProcessParams struct {
+ Pid uint64 `json:"pid"`
+}
+
+func getProcessReqHF(body interface{}, t *rpc.Transmitter) error {
+ params := body.(GetProcessParams)
+ p, err := Get(params.Pid)
+ if err != nil {
+ return asRpcError(err)
+ }
+ t.Send(p)
+ return nil
+}
+
+//-- get processes
+type GetProcessesParams struct {
+ All bool `json:"all"`
+}
+
+func getProcessesReqHF(body interface{}, t *rpc.Transmitter) error {
+ params := body.(GetProcessesParams)
+ t.Send(GetProcesses(params.All))
+ return nil
+}
+
+func asRpcError(err error) error {
+ if npErr, ok := err.(*NoProcessError); ok {
+ return rpc.NewError(npErr.error, NoSuchProcessErrorCode)
+ } else if naErr, ok := err.(*NotAliveError); ok {
+ return rpc.NewError(naErr.error, ProcessNotAliveErrorCode)
+ }
+ return err
+}
diff --git a/exec-agent/src/rest/errors.go b/exec-agent/src/rest/errors.go
new file mode 100644
index 00000000000..096771b392b
--- /dev/null
+++ b/exec-agent/src/rest/errors.go
@@ -0,0 +1,41 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package rest
+
+import (
+ "net/http"
+)
+
+type ApiError struct {
+ error
+ Code int
+}
+
+func BadRequest(err error) error {
+ return ApiError{err, http.StatusBadRequest}
+}
+
+func NotFound(err error) error {
+ return ApiError{err, http.StatusNotFound}
+}
+
+func Conflict(err error) error {
+ return ApiError{err, http.StatusConflict}
+}
+
+func Forbidden(err error) error {
+ return ApiError{err, http.StatusForbidden}
+}
+
+func Unauthorized(err error) error {
+ return ApiError{err, http.StatusUnauthorized}
+}
diff --git a/exec-agent/src/rest/restutil/util.go b/exec-agent/src/rest/restutil/util.go
new file mode 100644
index 00000000000..10e667d234c
--- /dev/null
+++ b/exec-agent/src/rest/restutil/util.go
@@ -0,0 +1,45 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package restutil
+
+import (
+ "encoding/json"
+ "net/http"
+ "strconv"
+)
+
+// Writes body as json to the response writer
+func WriteJson(w http.ResponseWriter, body interface{}) error {
+ w.Header().Set("Content-Type", "application/json")
+ return json.NewEncoder(w).Encode(body)
+}
+
+// Reads json body from the request
+func ReadJson(r *http.Request, v interface{}) error {
+ return json.NewDecoder(r.Body).Decode(v)
+}
+
+func IntQueryParam(r *http.Request, name string, defaultValue int) int {
+ qp := r.URL.Query().Get(name)
+ if qp == "" {
+ return defaultValue
+ } else {
+ v, err := strconv.Atoi(qp)
+ if err != nil {
+ return defaultValue
+ }
+ if v < 0 {
+ return defaultValue
+ }
+ return v
+ }
+}
diff --git a/exec-agent/src/rest/route.go b/exec-agent/src/rest/route.go
new file mode 100644
index 00000000000..2531acd0e1e
--- /dev/null
+++ b/exec-agent/src/rest/route.go
@@ -0,0 +1,80 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package rest
+
+import (
+ "fmt"
+ "net/http"
+ "strings"
+)
+
+const (
+ maxNameLen = 40
+ maxMethodLen = len("DELETE")
+)
+
+// Handler for http routes
+// vars variable contain only path parameters if any specified for given route
+type HttpRouteHandlerFunc func(w http.ResponseWriter, r *http.Request, params Params) error
+
+// An interface for getting mapped path parameters by their names
+type Params interface {
+
+ // Gets path parameter by it's name e.g.
+ // for url template `/process/:id` and actual value `/process/123`
+ // this method will return string '123'
+ Get(name string) string
+}
+
+// Describes route for http requests
+type Route struct {
+
+ // Http method e.g. 'GET'
+ Method string
+
+ // The name of the http route, used in logs
+ // this name is unique for all the application http routes
+ // example: 'StartProcess'
+ Name string
+
+ // The path of the http route which this route is mapped to
+ // example: '/process'
+ Path string
+
+ // The function used for handling http request
+ HandleFunc HttpRouteHandlerFunc
+}
+
+// Named group of http routes, those groups
+// should be defined by separate apis, and then combined together
+type RoutesGroup struct {
+
+ // The name of this group e.g.: 'ProcessRoutes'
+ Name string
+
+ // The http routes of this group
+ Items []Route
+}
+
+func (r *Route) String() string {
+ name := r.Name + " " + strings.Repeat(".", maxNameLen-len(r.Name))
+ method := r.Method + strings.Repeat(" ", maxMethodLen-len(r.Method))
+ return fmt.Sprintf("%s %s %s", name, method, r.Path)
+}
+
+func WriteError(w http.ResponseWriter, err error) {
+ if apiErr, ok := err.(ApiError); ok {
+ http.Error(w, apiErr.Error(), apiErr.Code)
+ } else {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+}
diff --git a/exec-agent/src/rpc/channels.go b/exec-agent/src/rpc/channels.go
new file mode 100644
index 00000000000..9e73a2c05ec
--- /dev/null
+++ b/exec-agent/src/rpc/channels.go
@@ -0,0 +1,334 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package rpc
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "log"
+ "net/http"
+ "strconv"
+ "strings"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/eclipse/che-lib/websocket"
+ "github.com/eclipse/che/exec-agent/rest"
+)
+
+const (
+ ConnectedEventType = "connected"
+)
+
+var (
+ upgrader = websocket.Upgrader{
+ ReadBufferSize: 1024,
+ WriteBufferSize: 1024,
+ CheckOrigin: func(r *http.Request) bool {
+ return true
+ },
+ }
+
+ prevChanId uint64 = 0
+
+ channels = channelsMap{items: make(map[string]Channel)}
+
+ HttpRoutes = rest.RoutesGroup{
+ "Channel Routes",
+ []rest.Route{
+ {
+ "GET",
+ "Connect to Exec-Agent(webscoket)",
+ "/connect",
+ registerChannel,
+ },
+ },
+ }
+
+ messageHandler = jsonrpc2_0MessageHandler{}
+)
+
+// Published when websocket connection is established
+// and channel is ready for interaction
+type ChannelConnected struct {
+ Timed
+ ChannelId string `json:"channel"`
+ Text string `json:"text"`
+}
+
+// Describes channel which is websocket connection
+// with additional properties required by the application
+type Channel struct {
+ // Unique channel identifier
+ Id string `json:"id"`
+
+ // When the connection was established
+ Connected time.Time `json:"connected"`
+
+ // the uri of the request that established this connection
+ RequestURI string `json:"-"`
+
+ // Go channel for sending events to the websocket.
+ // All the events are encoded to the json messages and
+ // send to websocket connection defined by this channel.
+ Events chan *Event
+
+ // Everything passed to this channel will be encoded
+ // to json and send to the client.
+ output chan interface{}
+
+ // If any value is send to this channel then
+ // physical connection associated with it along with
+ // output channel will be immediately closed.
+ drop chan bool
+
+ // Websocket connection
+ conn *websocket.Conn
+}
+
+// A struct for reading raw websocket messages
+type WsMessage struct {
+ err error
+ bytes []byte
+}
+
+// Handles raw messages received from websocket channel
+type MessageHandler interface {
+ // handles a message in implementation specific way
+ handle(message *WsMessage, channel Channel)
+}
+
+// Defines lockable map for managing channels
+type channelsMap struct {
+ sync.RWMutex
+ items map[string]Channel
+}
+
+// Gets channel by the channel id, if there is no such channel
+// then returned 'ok' is false.
+func GetChannel(chanId string) (Channel, bool) {
+ channels.RLock()
+ defer channels.RUnlock()
+ item, ok := channels.items[chanId]
+ return item, ok
+}
+
+// Returns all the currently registered channels.
+func GetChannels() []Channel {
+ channels.RLock()
+ defer channels.RUnlock()
+ all := make([]Channel, len(channels.items))
+ idx := 0
+ for _, v := range channels.items {
+ all[idx] = v
+ idx++
+ }
+ return all
+}
+
+// Drops the channel with the given id.
+func DropChannel(id string) {
+ if c, ok := GetChannel(id); ok {
+ c.drop <- true
+ }
+}
+
+// Saves the channel with the given identifier and returns true.
+// If the channel with the given identifier already exists then false is returned
+// and the channel is not saved.
+func saveChannel(channel Channel) bool {
+ channels.Lock()
+ defer channels.Unlock()
+ _, ok := channels.items[channel.Id]
+ if ok {
+ return false
+ }
+ channels.items[channel.Id] = channel
+ return true
+}
+
+// Removes channel
+func removeChannel(channel Channel) {
+ channels.Lock()
+ defer channels.Unlock()
+ delete(channels.items, channel.Id)
+}
+
+func registerChannel(w http.ResponseWriter, r *http.Request, _ rest.Params) error {
+ conn, err := upgrader.Upgrade(w, r, nil)
+ if err != nil {
+ log.Println("Couldn't establish websocket connection " + err.Error())
+ return nil
+ }
+
+ channel := Channel{
+ Id: "channel-" + strconv.Itoa(int(atomic.AddUint64(&prevChanId, 1))),
+ Connected: time.Now(),
+ RequestURI: r.RequestURI,
+ Events: make(chan *Event),
+ output: make(chan interface{}),
+ drop: make(chan bool),
+ conn: conn,
+ }
+ saveChannel(channel)
+
+ log.Printf("A new channel with id '%s' successfully opened", channel.Id)
+
+ go transferAsJson(conn, channel.output)
+ go redirectEventsToOutput(channel)
+ go handleMessages(readMessages(conn), channel)
+
+ // Say hello to the client
+ channel.Events <- NewEvent(ConnectedEventType, &ChannelConnected{
+ Timed: Timed{Time: channel.Connected},
+ ChannelId: channel.Id,
+ Text: "Hello!",
+ })
+ return nil
+}
+
+// Handles all the messages from the given channel
+// until an error occurs or a drop signal is sent.
+// Clears all the associated resources.
+func handleMessages(messageChan chan *WsMessage, channel Channel) {
+ for {
+ select {
+ case message := <-messageChan:
+ if message.err == nil {
+ messageHandler.handle(message, channel)
+ } else {
+ closeErr, ok := message.err.(*websocket.CloseError)
+ if !ok || !isNormallyClosed(closeErr.Code) {
+ log.Println("Error reading message, " + message.err.Error())
+ }
+ closeChannel(channel)
+ return
+ }
+ case <-channel.drop:
+ closeChannel(channel)
+ return
+ }
+ }
+}
+
+// Closes all associated go channels(events, output, drop)
+// and physical websocket connection.
+func closeChannel(channel Channel) {
+ close(channel.Events)
+ close(channel.output)
+ close(channel.drop)
+ if err := channel.conn.Close(); err != nil {
+ log.Println("Error closing connection, " + err.Error())
+ }
+ removeChannel(channel)
+ log.Printf("Channel with id '%s' successfully closed", channel.Id)
+}
+
+// Reads the message from the websocket connection until error is received,
+// returns the channel which should be used for reading such messages.
+func readMessages(conn *websocket.Conn) chan *WsMessage {
+ messagesChan := make(chan *WsMessage)
+ go func() {
+ for {
+ _, bytes, err := conn.ReadMessage()
+ messagesChan <- &WsMessage{err: err, bytes: bytes}
+ if err != nil {
+ close(messagesChan)
+ break
+ }
+ }
+ }()
+ return messagesChan
+}
+
+func redirectEventsToOutput(channel Channel) {
+ for event := range channel.Events {
+ channel.output <- event
+ }
+}
+
+// transfers data from channel to physical connection,
+// tries to transform data to json.
+func transferAsJson(conn *websocket.Conn, c chan interface{}) {
+ for message := range c {
+ err := conn.WriteJSON(message)
+ if err != nil {
+ log.Printf("Couldn't write message to the channel. Message: %T, %v", message, message)
+ }
+ }
+}
+
+// handles messages as jsonrpc as described by package doc
+type jsonrpc2_0MessageHandler struct{}
+
+func (h *jsonrpc2_0MessageHandler) handle(message *WsMessage, channel Channel) {
+ req := &Request{}
+
+ // try to unmarshal the request
+ if err := json.Unmarshal(message.bytes, req); err != nil {
+ // Respond parse error according to specification
+ channel.output <- &Response{
+ Version: "2.0",
+ Error: &Error{
+ Code: ParseErrorCode,
+ Message: "Invalid json object",
+ },
+ }
+ log.Printf("Error decoding request '%s', Error: %s \n", string(message.bytes), err.Error())
+ return
+ }
+
+ // ensure provided version is supported
+ if req.Version != "" && strings.Trim(req.Version, " ") != "2.0" {
+ channel.output <- &Response{
+ Version: "2.0",
+ Error: &Error{
+ Code: InvalidRequestErrorCode,
+ Message: "'2.0' is the only supported version, use it or omit version at all",
+ },
+ }
+ return
+ }
+
+ transmitter := &Transmitter{Channel: channel, id: req.Id}
+
+ opRoute, ok := routes.get(req.Method)
+ if !ok {
+ m := fmt.Sprintf("No route for the operation '%s'", req.Method)
+ transmitter.SendError(NewError(errors.New(m), MethodNotFoundErrorCode))
+ return
+ }
+
+ decodedBody, err := opRoute.DecoderFunc(req.RawParams)
+ if err != nil {
+ m := fmt.Sprintf("Error decoding body for the operation '%s'. Error: '%s'", req.Method, err.Error())
+ transmitter.SendError(NewError(errors.New(m), InvalidRequestErrorCode))
+ return
+ }
+
+ if err := opRoute.HandlerFunc(decodedBody, transmitter); err != nil {
+ opError, ok := err.(Error)
+ if ok {
+ transmitter.SendError(opError)
+ } else {
+ transmitter.SendError(NewError(err, InternalErrorCode))
+ }
+ }
+}
+
+func isNormallyClosed(code int) bool {
+ return code == websocket.CloseGoingAway ||
+ code == websocket.CloseNormalClosure ||
+ code == websocket.CloseNoStatusReceived
+}
diff --git a/exec-agent/src/rpc/model.go b/exec-agent/src/rpc/model.go
new file mode 100644
index 00000000000..f9e461f7bef
--- /dev/null
+++ b/exec-agent/src/rpc/model.go
@@ -0,0 +1,173 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+// Provides a lightweight implementation of jsonrpc2.0 protocol.
+// The original jsonrpc2.0 specification - http://www.jsonrpc.org/specification
+//
+// The implementations does not fully implement the protocol
+// and introduces a few modifications to its terminology in
+// term of exec-agent transport needs.
+//
+// From the specification:
+// The Client is defined as the origin of Request objects and the handler of Response objects.
+// The Server is defined as the origin of Response objects and the handler of Request objects.
+//
+// Exec-agent serves as both, server and client as it receives
+// Responses and sends Notifications in the same time.
+//
+// Request.
+// It's a message from the physical websocket connection client to the exec-agent server.
+// Request(in it's origin form) is considered to be unidirectional.
+// WS Client =---> WS Server.
+//
+// Response.
+// It's a message from the the exec-agent server to a websocket client,
+// indicates the result of the operation execution requested by certain request.
+// Response doesn't exist without request. The response is considered to be unidirectional.
+// WS Client <---= WS Server
+//
+// Event.
+// Is a message from the exec-agent server to a websocket client, the analogue
+// from the specification is Notification, which is defined as a request
+// which doesn't need any response, that's also true for events.
+// Events may happen periodically and don't need to be indicated by request.
+// WS Client <---X WS Server
+package rpc
+
+import (
+ "encoding/json"
+ "time"
+)
+
+const (
+
+ // Invalid JSON was received by the server.
+ ParseErrorCode = -32700
+
+ // Request object is not valid, fails
+ // when route decoder can't decode params.
+ InvalidRequestErrorCode = -32600
+
+ // There is no route for such method.
+ MethodNotFoundErrorCode = -32601
+
+ // When handler parameters are considered as not valid
+ // this error type should be returned directly from the HandlerFunc
+ InvalidParamsErrorCode = -32602
+
+ // When error returned from the Route HandlerFunc is different from Error type
+ InternalErrorCode = -32603
+
+ // -32000 to -32099 Reserved for implementation-defined server-errors.
+)
+
+// Describes named operation which is called
+// on the websocket client's side and performed
+// on the servers's side, if appropriate Route exists.
+type Request struct {
+
+ // Version of this request e.g. '2.0'.
+ Version string `json:"jsonrpc"`
+
+ // The method name which should be proceeded by this call
+ // usually dot separated resource and action e.g. 'process.start'.
+ Method string `json:"method"`
+
+ // The unique identifier of this operation request.
+ // If a client needs to identify the result of the operation execution,
+ // the id should be passed by the client, then it is guaranteed
+ // that the client will receive the result frame with the same id.
+ // The uniqueness of the identifier must be controlled by the client,
+ // if client doesn't specify the identifier in the operation call,
+ // the response won't contain the identifier as well.
+ //
+ // It is preferable to specify identifier for those calls which may
+ // either validate data, or produce such information which can't be
+ // identified by itself.
+ Id interface{} `json:"id"`
+
+ // Request data, parameters which are needed for operation execution.
+ RawParams json.RawMessage `json:"params"`
+}
+
+// A message from the server to the client,
+// which represents the result of the certain operation execution.
+// The result is sent to the client only once per operation.
+type Response struct {
+
+ // Version of this response e.g. '2.0'.
+ Version string `json:"jsonrpc"`
+
+ // The operation call identifier, will be set only
+ // if the operation contains it. See 'rpc.Request.Id'
+ Id interface{} `json:"id"`
+
+ // The actual result data, the operation execution result.
+ Result interface{} `json:"result,omitempty"`
+
+ // Body and Error are mutual exclusive.
+ // Present only if the operation execution fails due to an error.
+ Error *Error `json:"error,omitempty"`
+}
+
+// A message from the server to the client,
+// which may notify client about any activity that the client is interested in.
+// The difference from the 'rpc.Response' is that the event may happen periodically,
+// before or even after some operation calls, while the 'rpc.Response' is more like
+// result of the operation call execution, which is sent to the client immediately
+// after the operation execution is done.
+type Event struct {
+
+ // Version of this notification e.g. '2.0'
+ Version string `json:"jsonrpc"`
+
+ // A type of this operation event, must be always set.
+ // The type must be generally unique.
+ EventType string `json:"method"`
+
+ // Event related data.
+ Body interface{} `json:"params"`
+}
+
+// May be returned by any of route HandlerFunc.
+type Error struct {
+ error `json:"-"`
+
+ // An error code
+ Code int `json:"code"`
+
+ // A short description of the occurred error.
+ Message string `json:"message"`
+}
+
+type Timed struct {
+ Time time.Time `json:"time"`
+}
+
+func NewEvent(eType string, body interface{}) *Event {
+ return &Event{
+ Version: "2.0",
+ EventType: eType,
+ Body: body,
+ }
+}
+
+func NewArgsError(err error) Error {
+ return NewError(err, InvalidParamsErrorCode)
+}
+
+func NewError(err error, code int) Error {
+ return Error{
+ error: err,
+ Code: code,
+ Message: err.Error(),
+ }
+}
diff --git a/exec-agent/src/rpc/route.go b/exec-agent/src/rpc/route.go
new file mode 100644
index 00000000000..f1653cba6f4
--- /dev/null
+++ b/exec-agent/src/rpc/route.go
@@ -0,0 +1,88 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package rpc
+
+import (
+ "log"
+ "sync"
+)
+
+var (
+ routes = &routesMap{items: make(map[string]Route)}
+)
+
+// Describes route for rpc requests
+type Route struct {
+
+ // The operation name like defined by Request.Method
+ Method string
+
+ // The decoder used for decoding raw request parameters
+ // into the certain object. If decoding is okay, then
+ // decoded value will be passed to the HandlerFunc
+ // of this request route, so it is up to the route
+ // - to define type safe couple of DecoderFunc & HandlerFunc.
+ DecoderFunc func(body []byte) (interface{}, error)
+
+ // Defines handler for decoded request parameters.
+ // If handler function can't perform the operation then
+ // handler function should either return an error, or
+ // send it directly within transmitter#SendError func.
+ // Params is a value returned from the DecoderFunc.
+ // If an error is returned from this function and the type
+ // of the error is different from rpc.Error, it will be
+ // published as internal rpc error(-32603).
+ HandlerFunc func(params interface{}, t *Transmitter) error
+}
+
+// Named group of rpc routes
+type RoutesGroup struct {
+ // The name of this group e.g.: 'ProcessRpcRoutes'
+ Name string
+
+ // Rpc routes of this group
+ Items []Route
+}
+
+// Defines lockable map for storing operation routes
+type routesMap struct {
+ sync.RWMutex
+ items map[string]Route
+}
+
+// Gets route by the operation name
+func (routes *routesMap) get(method string) (Route, bool) {
+ routes.RLock()
+ defer routes.RUnlock()
+ item, ok := routes.items[method]
+ return item, ok
+}
+
+// Returns true if route is added and false if route for such method
+// already present(won't override it).
+func (or *routesMap) add(r Route) bool {
+ routes.Lock()
+ defer routes.Unlock()
+ _, ok := routes.items[r.Method]
+ if ok {
+ return false
+ }
+ routes.items[r.Method] = r
+ return true
+}
+
+// Adds a new route, panics if such route already exists.
+func RegisterRoute(route Route) {
+ if !routes.add(route) {
+ log.Fatalf("Couldn't register a new route, route for the operation '%s' already exists", route.Method)
+ }
+}
diff --git a/exec-agent/src/rpc/transmitter.go b/exec-agent/src/rpc/transmitter.go
new file mode 100644
index 00000000000..72a2349352b
--- /dev/null
+++ b/exec-agent/src/rpc/transmitter.go
@@ -0,0 +1,41 @@
+//
+// Copyright (c) 2012-2016 Codenvy, S.A.
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// which accompanies this distribution, and is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// Contributors:
+// Codenvy, S.A. - initial API and implementation
+//
+
+package rpc
+
+// Transmitter is used for sending
+// results of the operation executions to the channel.
+type Transmitter struct {
+
+ // The id of the request behind this transmitter.
+ id interface{}
+
+ // The channel to which the message will be send.
+ Channel Channel
+}
+
+// Wraps the given message with 'rpc.Result' and sends it to the client.
+func (t *Transmitter) Send(message interface{}) {
+ t.Channel.output <- &Response{
+ Version: "2.0",
+ Id: t.id,
+ Result: message,
+ }
+}
+
+// Wraps the given error with 'rpc.Result' and sends it to the client.
+func (t *Transmitter) SendError(err Error) {
+ t.Channel.output <- &Response{
+ Version: "2.0",
+ Id: t.id,
+ Error: &err,
+ }
+}
diff --git a/exec-agent/src/static/term.js b/exec-agent/src/static/term.js
new file mode 100644
index 00000000000..9d80e4bfc30
--- /dev/null
+++ b/exec-agent/src/static/term.js
@@ -0,0 +1,5793 @@
+/**
+ * term.js - an xterm emulator
+ * Copyright (c) 2012-2013, Christopher Jeffrey (MIT License)
+ * https://github.com/chjj/term.js
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ * Originally forked from (with the author's permission):
+ * Fabrice Bellard's javascript vt100 for jslinux:
+ * http://bellard.org/jslinux/
+ * Copyright (c) 2011 Fabrice Bellard
+ * The original design remains. The terminal itself
+ * has been extended to include xterm CSI codes, among
+ * other features.
+ */
+
+;(function() {
+
+/**
+ * Terminal Emulation References:
+ * http://vt100.net/
+ * http://invisible-island.net/xterm/ctlseqs/ctlseqs.txt
+ * http://invisible-island.net/xterm/ctlseqs/ctlseqs.html
+ * http://invisible-island.net/vttest/
+ * http://www.inwap.com/pdp10/ansicode.txt
+ * http://linux.die.net/man/4/console_codes
+ * http://linux.die.net/man/7/urxvt
+ */
+
+'use strict';
+
+/**
+ * Shared
+ */
+
+var window = this
+ , document = this.document;
+
+/**
+ * EventEmitter
+ */
+
+function EventEmitter() {
+ this._events = this._events || {};
+}
+
+EventEmitter.prototype.addListener = function(type, listener) {
+ this._events[type] = this._events[type] || [];
+ this._events[type].push(listener);
+};
+
+EventEmitter.prototype.on = EventEmitter.prototype.addListener;
+
+EventEmitter.prototype.removeListener = function(type, listener) {
+ if (!this._events[type]) return;
+
+ var obj = this._events[type]
+ , i = obj.length;
+
+ while (i--) {
+ if (obj[i] === listener || obj[i].listener === listener) {
+ obj.splice(i, 1);
+ return;
+ }
+ }
+};
+
+EventEmitter.prototype.off = EventEmitter.prototype.removeListener;
+
+EventEmitter.prototype.removeAllListeners = function(type) {
+ if (this._events[type]) delete this._events[type];
+};
+
+EventEmitter.prototype.once = function(type, listener) {
+ function on() {
+ var args = Array.prototype.slice.call(arguments);
+ this.removeListener(type, on);
+ return listener.apply(this, args);
+ }
+ on.listener = listener;
+ return this.on(type, on);
+};
+
+EventEmitter.prototype.emit = function(type) {
+ if (!this._events[type]) return;
+
+ var args = Array.prototype.slice.call(arguments, 1)
+ , obj = this._events[type]
+ , l = obj.length
+ , i = 0;
+
+ for (; i < l; i++) {
+ obj[i].apply(this, args);
+ }
+};
+
+EventEmitter.prototype.listeners = function(type) {
+ return this._events[type] = this._events[type] || [];
+};
+
+/**
+ * States
+ */
+
+var normal = 0
+ , escaped = 1
+ , csi = 2
+ , osc = 3
+ , charset = 4
+ , dcs = 5
+ , ignore = 6;
+
+/**
+ * Terminal
+ */
+
+function Terminal(options) {
+ var self = this;
+
+ if (!(this instanceof Terminal)) {
+ return new Terminal(arguments[0], arguments[1], arguments[2]);
+ }
+
+ EventEmitter.call(this);
+
+ if (typeof options === 'number') {
+ options = {
+ cols: arguments[0],
+ rows: arguments[1],
+ handler: arguments[2]
+ };
+ }
+
+ options = options || {};
+
+ each(keys(Terminal.defaults), function(key) {
+ if (options[key] == null) {
+ options[key] = Terminal.options[key];
+ // Legacy:
+ if (Terminal[key] !== Terminal.defaults[key]) {
+ options[key] = Terminal[key];
+ }
+ }
+ self[key] = options[key];
+ });
+
+ if (options.colors.length === 8) {
+ options.colors = options.colors.concat(Terminal._colors.slice(8));
+ } else if (options.colors.length === 16) {
+ options.colors = options.colors.concat(Terminal._colors.slice(16));
+ } else if (options.colors.length === 10) {
+ options.colors = options.colors.slice(0, -2).concat(
+ Terminal._colors.slice(8, -2), options.colors.slice(-2));
+ } else if (options.colors.length === 18) {
+ options.colors = options.colors.slice(0, -2).concat(
+ Terminal._colors.slice(16, -2), options.colors.slice(-2));
+ }
+ this.colors = options.colors;
+
+ this.options = options;
+
+ // this.context = options.context || window;
+ // this.document = options.document || document;
+ this.parent = options.body || options.parent
+ || (document ? document.getElementsByTagName('body')[0] : null);
+
+ this.cols = options.cols || options.geometry[0];
+ this.rows = options.rows || options.geometry[1];
+
+ if (options.handler) {
+ this.on('data', options.handler);
+ }
+
+ this.ybase = 0;
+ this.ydisp = 0;
+ this.x = 0;
+ this.y = 0;
+ this.cursorState = 0;
+ this.cursorHidden = false;
+ this.convertEol;
+ this.state = 0;
+ this.queue = '';
+ this.scrollTop = 0;
+ this.scrollBottom = this.rows - 1;
+
+ // modes
+ this.applicationKeypad = false;
+ this.applicationCursor = false;
+ this.originMode = false;
+ this.insertMode = false;
+ this.wraparoundMode = false;
+ this.normal = null;
+
+ // select modes
+ this.selectMode = false;
+ this.visualMode = false;
+ this.searchMode = false;
+ this.searchDown;
+ this.entry = '';
+ this.entryPrefix = 'Search: ';
+ this._real;
+ this._selected;
+ this._textarea;
+
+ // charset
+ this.charset = null;
+ this.gcharset = null;
+ this.glevel = 0;
+ this.charsets = [null];
+
+ // mouse properties
+ this.decLocator;
+ this.x10Mouse;
+ this.vt200Mouse;
+ this.vt300Mouse;
+ this.normalMouse;
+ this.mouseEvents;
+ this.sendFocus;
+ this.utfMouse;
+ this.sgrMouse;
+ this.urxvtMouse;
+
+ // misc
+ this.element;
+ this.children;
+ this.refreshStart;
+ this.refreshEnd;
+ this.savedX;
+ this.savedY;
+ this.savedCols;
+
+ // stream
+ this.readable = true;
+ this.writable = true;
+
+ this.defAttr = (0 << 18) | (257 << 9) | (256 << 0);
+ this.curAttr = this.defAttr;
+
+ this.params = [];
+ this.currentParam = 0;
+ this.prefix = '';
+ this.postfix = '';
+
+ this.lines = [];
+ var i = this.rows;
+ while (i--) {
+ this.lines.push(this.blankLine());
+ }
+
+ this.tabs;
+ this.setupStops();
+}
+
+inherits(Terminal, EventEmitter);
+
+// back_color_erase feature for xterm.
+Terminal.prototype.eraseAttr = function() {
+ // if (this.is('screen')) return this.defAttr;
+ return (this.defAttr & ~0x1ff) | (this.curAttr & 0x1ff);
+};
+
+/**
+ * Colors
+ */
+
+// Colors 0-15
+Terminal.tangoColors = [
+ // dark:
+ '#2e3436',
+ '#cc0000',
+ '#4e9a06',
+ '#c4a000',
+ '#3465a4',
+ '#75507b',
+ '#06989a',
+ '#d3d7cf',
+ // bright:
+ '#555753',
+ '#ef2929',
+ '#8ae234',
+ '#fce94f',
+ '#729fcf',
+ '#ad7fa8',
+ '#34e2e2',
+ '#eeeeec'
+];
+
+Terminal.xtermColors = [
+ // dark:
+ '#000000', // black
+ '#cd0000', // red3
+ '#00cd00', // green3
+ '#cdcd00', // yellow3
+ '#0000ee', // blue2
+ '#cd00cd', // magenta3
+ '#00cdcd', // cyan3
+ '#e5e5e5', // gray90
+ // bright:
+ '#7f7f7f', // gray50
+ '#ff0000', // red
+ '#00ff00', // green
+ '#ffff00', // yellow
+ '#5c5cff', // rgb:5c/5c/ff
+ '#ff00ff', // magenta
+ '#00ffff', // cyan
+ '#ffffff' // white
+];
+
+// Colors 0-15 + 16-255
+// Much thanks to TooTallNate for writing this.
+Terminal.colors = (function() {
+ var colors = Terminal.tangoColors.slice()
+ , r = [0x00, 0x5f, 0x87, 0xaf, 0xd7, 0xff]
+ , i;
+
+ // 16-231
+ i = 0;
+ for (; i < 216; i++) {
+ out(r[(i / 36) % 6 | 0], r[(i / 6) % 6 | 0], r[i % 6]);
+ }
+
+ // 232-255 (grey)
+ i = 0;
+ for (; i < 24; i++) {
+ r = 8 + i * 10;
+ out(r, r, r);
+ }
+
+ function out(r, g, b) {
+ colors.push('#' + hex(r) + hex(g) + hex(b));
+ }
+
+ function hex(c) {
+ c = c.toString(16);
+ return c.length < 2 ? '0' + c : c;
+ }
+
+ return colors;
+})();
+
+// Default BG/FG
+Terminal.colors[256] = '#000000';
+Terminal.colors[257] = '#f0f0f0';
+
+Terminal._colors = Terminal.colors.slice();
+
+Terminal.vcolors = (function() {
+ var out = []
+ , colors = Terminal.colors
+ , i = 0
+ , color;
+
+ for (; i < 256; i++) {
+ color = parseInt(colors[i].substring(1), 16);
+ out.push([
+ (color >> 16) & 0xff,
+ (color >> 8) & 0xff,
+ color & 0xff
+ ]);
+ }
+
+ return out;
+})();
+
+/**
+ * Options
+ */
+
+Terminal.defaults = {
+ colors: Terminal.colors,
+ convertEol: false,
+ termName: 'xterm',
+ geometry: [80, 24],
+ cursorBlink: true,
+ visualBell: false,
+ popOnBell: false,
+ scrollback: 1000,
+ screenKeys: false,
+ debug: false,
+ useStyle: false
+ // programFeatures: false,
+ // focusKeys: false,
+};
+
+Terminal.options = {};
+
+each(keys(Terminal.defaults), function(key) {
+ Terminal[key] = Terminal.defaults[key];
+ Terminal.options[key] = Terminal.defaults[key];
+});
+
+/**
+ * Focused Terminal
+ */
+
+Terminal.focus = null;
+
+Terminal.prototype.focus = function() {
+ if (Terminal.focus === this) return;
+
+ if (Terminal.focus) {
+ Terminal.focus.blur();
+ }
+
+ if (this.sendFocus) this.send('\x1b[I');
+ this.showCursor();
+
+ try {
+ this.element.focus();
+ } catch (e) {
+ ;
+ }
+
+ this.emit('focus');
+
+ Terminal.focus = this;
+};
+
+Terminal.prototype.blur = function() {
+ if (Terminal.focus !== this) return;
+
+ this.cursorState = 0;
+ this.refresh(this.y, this.y);
+ if (this.sendFocus) this.send('\x1b[O');
+
+ // try {
+ // this.element.blur();
+ // } catch (e) {
+ // ;
+ // }
+
+ // this.emit('blur');
+
+ Terminal.focus = null;
+};
+
+/**
+ * Initialize global behavior
+ */
+
+Terminal.prototype.initGlobal = function() {
+ var document = this.document;
+
+ Terminal._boundDocs = Terminal._boundDocs || [];
+ if (~indexOf(Terminal._boundDocs, document)) {
+ return;
+ }
+ Terminal._boundDocs.push(document);
+
+ Terminal.bindPaste(document);
+
+ Terminal.bindKeys(document);
+
+ Terminal.bindCopy(document);
+
+ if (this.isMobile) {
+ this.fixMobile(document);
+ }
+
+ if (this.useStyle) {
+ Terminal.insertStyle(document, this.colors[256], this.colors[257]);
+ }
+};
+
+/**
+ * Bind to paste event
+ */
+
+Terminal.bindPaste = function(document) {
+ // This seems to work well for ctrl-V and middle-click,
+ // even without the contentEditable workaround.
+ var window = document.defaultView;
+ on(window, 'paste', function(ev) {
+ var term = Terminal.focus;
+ if (!term) return;
+ if (ev.clipboardData) {
+ term.send(ev.clipboardData.getData('text/plain'));
+ } else if (term.context.clipboardData) {
+ term.send(term.context.clipboardData.getData('Text'));
+ }
+ // Not necessary. Do it anyway for good measure.
+ term.element.contentEditable = 'inherit';
+ return cancel(ev);
+ });
+};
+
+/**
+ * Global Events for key handling
+ */
+
+Terminal.bindKeys = function(document) {
+ // We should only need to check `target === body` below,
+ // but we can check everything for good measure.
+ on(document, 'keydown', function(ev) {
+ if (!Terminal.focus) return;
+ var target = ev.target || ev.srcElement;
+ if (!target) return;
+ if (target === Terminal.focus.element
+ || target === Terminal.focus.context
+ || target === Terminal.focus.document
+ || target === Terminal.focus.body
+ || target === Terminal._textarea
+ || target === Terminal.focus.parent) {
+ return Terminal.focus.keyDown(ev);
+ }
+ }, true);
+
+ on(document, 'keypress', function(ev) {
+ if (!Terminal.focus) return;
+ var target = ev.target || ev.srcElement;
+ if (!target) return;
+ if (target === Terminal.focus.element
+ || target === Terminal.focus.context
+ || target === Terminal.focus.document
+ || target === Terminal.focus.body
+ || target === Terminal._textarea
+ || target === Terminal.focus.parent) {
+ return Terminal.focus.keyPress(ev);
+ }
+ }, true);
+
+ // If we click somewhere other than a
+ // terminal, unfocus the terminal.
+ on(document, 'mousedown', function(ev) {
+ if (!Terminal.focus) return;
+
+ var el = ev.target || ev.srcElement;
+ if (!el) return;
+
+ do {
+ if (el === Terminal.focus.element) return;
+ } while (el = el.parentNode);
+
+ Terminal.focus.blur();
+ });
+};
+
+/**
+ * Copy Selection w/ Ctrl-C (Select Mode)
+ */
+
+Terminal.bindCopy = function(document) {
+ var window = document.defaultView;
+
+ // if (!('onbeforecopy' in document)) {
+ // // Copies to *only* the clipboard.
+ // on(window, 'copy', function fn(ev) {
+ // var term = Terminal.focus;
+ // if (!term) return;
+ // if (!term._selected) return;
+ // var text = term.grabText(
+ // term._selected.x1, term._selected.x2,
+ // term._selected.y1, term._selected.y2);
+ // term.emit('copy', text);
+ // ev.clipboardData.setData('text/plain', text);
+ // });
+ // return;
+ // }
+
+ // Copies to primary selection *and* clipboard.
+ // NOTE: This may work better on capture phase,
+ // or using the `beforecopy` event.
+ on(window, 'copy', function(ev) {
+ var term = Terminal.focus;
+ if (!term) return;
+ if (!term._selected) return;
+ var textarea = term.getCopyTextarea();
+ var text = term.grabText(
+ term._selected.x1, term._selected.x2,
+ term._selected.y1, term._selected.y2);
+ term.emit('copy', text);
+ textarea.focus();
+ textarea.textContent = text;
+ textarea.value = text;
+ textarea.setSelectionRange(0, text.length);
+ setTimeout(function() {
+ term.element.focus();
+ term.focus();
+ }, 1);
+ });
+};
+
+/**
+ * Fix Mobile
+ */
+
+Terminal.prototype.fixMobile = function(document) {
+ var self = this;
+
+ var textarea = document.createElement('textarea');
+ textarea.style.position = 'absolute';
+ textarea.style.left = '-32000px';
+ textarea.style.top = '-32000px';
+ textarea.style.width = '0px';
+ textarea.style.height = '0px';
+ textarea.style.opacity = '0';
+ textarea.style.backgroundColor = 'transparent';
+ textarea.style.borderStyle = 'none';
+ textarea.style.outlineStyle = 'none';
+ textarea.autocapitalize = 'none';
+ textarea.autocorrect = 'off';
+
+ document.getElementsByTagName('body')[0].appendChild(textarea);
+
+ Terminal._textarea = textarea;
+
+ setTimeout(function() {
+ textarea.focus();
+ }, 1000);
+
+ if (this.isAndroid) {
+ on(textarea, 'change', function() {
+ var value = textarea.textContent || textarea.value;
+ textarea.value = '';
+ textarea.textContent = '';
+ self.send(value + '\r');
+ });
+ }
+};
+
+/**
+ * Insert a default style
+ */
+
+Terminal.insertStyle = function(document, bg, fg) {
+ var style = document.getElementById('term-style');
+ if (style) return;
+
+ var head = document.getElementsByTagName('head')[0];
+ if (!head) return;
+
+ var style = document.createElement('style');
+ style.id = 'term-style';
+
+ // textContent doesn't work well with IE for