Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SYSTEST-9250 1-1 WS Mapping for Proxy Mode #115

Merged
merged 14 commits into from
Mar 15, 2023
9 changes: 2 additions & 7 deletions server/src/messageHandler.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,7 @@ async function handleMessage(message, userId, ws) {
//bypass JSON-RPC calls and hit proxy server endpoint
//init websocket connection for proxy request to be sent and use receiver client to send events back to caller.
try {
if(await proxyManagement.initialize(proxyManagement.actOnResponseObject, ws)) {
proxyManagement.sendRequest(JSON.stringify(oMsg))
response = await proxyManagement.getResponseMessageFromProxy(oMsg.id)
} else {
console.log("Websocket connection not initialized")
}
response = await proxyManagement.initializeAndSendRequest(ws, JSON.stringify(oMsg))
} catch (err) {
logger.error(`ERROR: Unable to establish proxy connection due to ${err}`)
process.exit(1)
Expand Down Expand Up @@ -285,7 +280,7 @@ async function handleMessage(message, userId, ws) {

logger.debug(`Sending response for method ${oMsg.method}`);
let finalResponse = (newResponse ? newResponse : response);
if (!process.env.proxy) {
if (!process.env.proxy || stateManagement.hasOverride(userId, oMsg.method)) {
ksentak marked this conversation as resolved.
Show resolved Hide resolved
const oResponseMessage = {
jsonrpc: '2.0',
id: oMsg.id,
Expand Down
169 changes: 92 additions & 77 deletions server/src/proxyManagement.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -21,97 +21,114 @@
import { parse } from 'url';
import WebSocket from 'ws';

let ws = null
let responseObject = {}
const wsMap = new Map();
const wsMsgMap = new Map();

async function initialize(callback, returnWS) {
if(ws) {
return true
async function initializeAndSendRequest(returnWs, command) {
ksentak marked this conversation as resolved.
Show resolved Hide resolved
let outgoingWs = wsMap.get(returnWs);
/* Checks to see if ws connection is in map.
* If connection exists and is active, it will be used.
* Else, a new connection will be created and mapped to returnWs.
*/

// Set up listeners once and only once
if (!outgoingWs) {
outgoingWs = await setupOutgoingWs(returnWs);

outgoingWs.on('message', (data) => {
const buf = Buffer.from(data, 'utf8');
const response = JSON.parse(buf.toString());

if (response.id === undefined) {
// In case of event, send the event to caller directly.
returnWs.send(buf.toString());
} else {
wsMsgMap.set(returnWs, buf.toString());
}
});
}
const url = buildWSUrl()
ws = new WebSocket(url)

outgoingWs.send(command);

return new Promise(async (res, rej) => {
try {
const response = await getResponseMessageFromProxy(returnWs);
// Clear for next message
wsMsgMap.set(returnWs, null);
res(response);
} catch (err) {
rej('Timeout occurred');
kschrief marked this conversation as resolved.
Show resolved Hide resolved
}
});
}

function setupOutgoingWs(returnWs) {
const url = buildWSUrl();
const ws = new WebSocket(url);
try {
return new Promise((res, rej) => {
ws.on('open', function open() {
console.log("Connection to websocket proxy server established")
res(true)
console.log('connection established');
kschrief marked this conversation as resolved.
Show resolved Hide resolved
// add ws connection to map
wsMap.set(returnWs, ws);
res(ws);
});

ws.on('close', function close() {
console.log('disconnected');
});

ws.on('message', function message(data) {
const buf = Buffer.from(data, 'utf8');
//send response to callback
callback(buf.toString(), returnWS)
// remove closed connection from map
wsMap.delete(returnWs);
});

ws.on('error', function message(err) {
rej(err)
rej(err);
});
})
});
} catch (err) {
return err
return err;
}
}

/* Consume response from server. In case of event, send the event to caller directly.
In case of response for requested method, store it in responseObject array
*/
function actOnResponseObject(data, returnWS) {
const response = JSON.parse(data)
if(response.id === undefined) {
returnWS.send(data)
}
responseObject[response.id] = data
}

function sendRequest(command) {
if(ws) {
ws.send(command)
console.log("Request sent to proxy server: ", command);
} else {
console.log("WS Client not initialized. Unable to send request to proxy server: ", command);
}
}

//Poll for proxy response. Fetch response using requestId. If timedout, terminate the connection
function getResponseMessageFromProxy(id) {
let timeout = 2000
let counter = 0
let interval = 100
return new Promise((resolve, reject) => {
var timer = setInterval(function() {
if(counter >= timeout) {
console.log("response not received for given id: " + id)
reject(false)
function getResponseMessageFromProxy(returnWs) {
let timeout = 10000;
let counter = 0;
let interval = 100;
return new Promise((res, rej) => {
var timer = setInterval(function () {
if (counter >= timeout) {
console.log('response not received for given returnWs');
rej(false);
}
if(responseObject[id]) {
counter = timeout + interval
//clear interval if response received for given id.
clearInterval(timer)
resolve(responseObject[id])

const returnMsg = wsMsgMap.get(returnWs);
if (returnMsg) {
counter = timeout + interval;
kschrief marked this conversation as resolved.
Show resolved Hide resolved
//clear interval if response received for given returnWs.
clearInterval(timer);
res(returnMsg);
}
counter = counter + interval
counter = counter + interval;
}, interval);
})

});
}

function buildWSUrl() {
let proxyUrl = process.env.proxyServerIP
if( ! proxyUrl ) {
throw Error('ERROR: Proxy Url not found in env')
} else if ( ! proxyUrl.includes(":") ) {
proxyUrl = proxyUrl + ":" + 9998
let proxyUrl = process.env.proxyServerIP;
if (!proxyUrl) {
throw Error('ERROR: Proxy Url not found in env');
} else if (!proxyUrl.includes(':')) {
proxyUrl = proxyUrl + ':' + 9998;
kschrief marked this conversation as resolved.
Show resolved Hide resolved
}
//support ws
const wsUrlProtocol = 'ws://'
const path = '/jsonrpc'
const hostPort = proxyUrl
return [wsUrlProtocol, hostPort, path, process.env.MF_TOKEN ? '?token=' + process.env.MF_TOKEN : null,
].join('')
const wsUrlProtocol = 'ws://';
const path = '/jsonrpc';
const hostPort = proxyUrl;
return [
wsUrlProtocol,
hostPort,
path,
process.env.MF_TOKEN ? '?token=' + process.env.MF_TOKEN : null,
].join('');
}

// Get token from request param or env variable
Expand All @@ -121,22 +138,20 @@ function getMFToken(request) {
error: '',
};
// If token already exists, return token
if(process.env.MF_TOKEN) {
output.token = process.env.MF_TOKEN
return output
if (process.env.MF_TOKEN) {
output.token = process.env.MF_TOKEN;
return output;
}
const { query } = parse(request.url);
if(query && query.includes("token=") && query.length > 6) {
if (query && query.includes('token=') && query.length > 6) {
const token = query.split('token=').pop().split('&')[0];
output.token = token
return output
output.token = token;
return output;
} else {
output.error = "Unable to get token from connection param or not present in env"
return output
output.error = 'Unable to get token from connection param or not present in env';
return output;
}
}

// --- Exports ---
export {
getMFToken, initialize, actOnResponseObject, getResponseMessageFromProxy, sendRequest
};
export { getMFToken, initializeAndSendRequest };
18 changes: 3 additions & 15 deletions server/test/suite/proxyManagement.test.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,19 @@ describe('sequentially run tests', () => {
expect(token.error).toBe("Unable to get token from connection param or not present in env");
});

test(`proxyManagement.actOnResponseObject works properly with mock`, async () => {
const data = {"jsonrpc":"2.0","id":1,"result":{"type":"device","value":"<XACT Token>"}}
proxyManagement.actOnResponseObject(JSON.stringify(data), null)
const res = await proxyManagement.getResponseMessageFromProxy(data.id)
expect(res).toBe(JSON.stringify(data))
})

test(`Handle error when url not passed`, async () => {
delete process.env.proxyServerIP
proxyManagement.initialize(null, null).catch(function (err) {
proxyManagement.initializeAndSendRequest(null, null).catch(function (err) {
// Only executed if rejects the promise
expect(err.toString()).toContain('Error: ERROR: Proxy Url not found in env')
});

})

test(`proxyManagement.sendRequest works properly`, async () => {
proxyManagement.sendRequest(null)
})

test(`proxyManagement.initialize works properly`, async () => {
test(`proxyManagement.initializeAndSendRequest works properly`, async () => {
try {
process.env.proxyServerIP = "localhost.test"
const response = await proxyManagement.initialize(proxyManagement.actOnResponseObject, null)
console.log("response: ", response)
await proxyManagement.initializeAndSendRequest(null, null)
} catch (e) {
expect(e.errno).toBe(-3008);
expect(e.code).toBe("ENOTFOUND");
Expand Down