-
Notifications
You must be signed in to change notification settings - Fork 8
/
client.js
119 lines (103 loc) · 4.3 KB
/
client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
var net = require('net'),
events = require('events'),
util = require('util'),
os = require('os');
var AMF = require('./amf');
var RTMPHandshake = require('./handshake');
var RTMPMessage = require('./message');
var log = require('./log');
var RTMPClient = module.exports = function(socket) {
this.socket = socket;
this.state = 'connecting';
socket.on('connect', this.onSocketConnect.bind(this));
};
util.inherits(RTMPClient, events.EventEmitter);
RTMPClient.prototype.onSocketConnect = function() {
// On connection send handshake
this.handshake = new RTMPHandshake(this);
this.handshake.on('error', function(err) {
log.warn('handshake error:',err);
});
this.handshake.on('complete', (function() {
log('handshake complete');
this.socket.on('data', this.onData.bind(this));
this.emit('connect');
}).bind(this));
this.handshake.sendClientHandshake();
};
RTMPClient.prototype.onData = function(data) {
log("recieved RTMP data...", "(" + data.length + " bytes)");
log.logHex(data);
if (!this.message || this.message.bytesRemaining == 0) {
this.message = new RTMPMessage(data);
this.message.on('complete', this.onMessage.bind(this));
}
this.message.parseData(data);
}
RTMPClient.prototype.onMessage = function() {
this.emit("message", this.message);
if (this.message.messageHeader.messageType == RTMPMessage.RTMP_MESSAGE_TYPE_INVOKE) {
switch (this.message.data.commandName) {
case "_error":
this.emit("error", this.message.data.arguments);
break;
case "close":
this.socket.end();
this.emit("close");
break;
}
}
}
RTMPClient.prototype.sendInvoke = function(commandName, transactionId, commandObj, invokeArguments) {
// TODO: create RTMPInvoke class to parse and/or handle this (that inherits from a general RTMPPacket class)
var commandNameSerialiser = new AMF.AMFSerialiser(commandName);
var transactionIdSerialiser = new AMF.AMFSerialiser(transactionId);
var commandObjSerialiser = new AMF.AMFSerialiser(commandObj);
if (invokeArguments !== undefined) {
var invokeArgumentsSerialiser = new AMF.AMFSerialiser(commandObj);
}
var amfLength = commandNameSerialiser.byteLength + transactionIdSerialiser.byteLength + commandObjSerialiser.byteLength + ((invokeArguments !== undefined) ? invokeArgumentsSerialiser.byteLength : 0);
var amfOffset = 0;
var amfData = new Buffer(amfLength);
commandNameSerialiser.write(amfData.slice(amfOffset, commandNameSerialiser.byteLength));
amfOffset += commandNameSerialiser.byteLength;
transactionIdSerialiser.write(amfData.slice(amfOffset, amfOffset + transactionIdSerialiser.byteLength));
amfOffset += transactionIdSerialiser.byteLength
commandObjSerialiser.write(amfData.slice(amfOffset, amfOffset + commandObjSerialiser.byteLength));
amfOffset += commandObjSerialiser.byteLength;
if (invokeArguments !== undefined) {
invokeArgumentsSerialiser.write(amfData.slice(amfOffset, amfOffset + invokeArgumentsSerialiser.byteLength));
}
this.sendPacket(0x03, RTMPMessage.RTMP_MESSAGE_TYPE_INVOKE, amfData);
}
RTMPClient.prototype.sendPacket = function(channel, messageType, data) {
//TODO: Check if given a RTMPPacket object that specifies channel, messageType, data inside the object (e.g. RTMPInvoke)
// If we aren't handshaken, then defer sending until we have
if (!this.handshake || this.handshake.state != RTMPHandshake.STATE_HANDSHAKE_DONE) {
this.on('connect', (function(){ // TODO: test this works correctly and does not end up with undefined parameters
this.sendPacket(channel, messageType, data);
}).bind(this));
return;
}
var message = new RTMPMessage();
var rawData = message.sendData(channel, messageType, data);
log("sending RTMP packet...", "(" + rawData.length + " bytes)");
this.socket.write(rawData);
log.logHex(rawData);
}
RTMPClient.prototype.sendRawData = function(packet) {
log("sending raw data...", "(" + packet.length + " bytes)");
log.logHex(packet);
this.socket.write(packet);
}
RTMPClient.connect = function(host, port, connectListener) {
const DEFAULT_PORT = 1935;
if (!connectListener && typeof port == "function") {
connectListener = port;
port = DEFAULT_PORT;
}
var client = new RTMPClient(net.connect(port || DEFAULT_PORT, host));
if (connectListener && typeof connectListener == "function")
client.on('connect', connectListener)
return client;
}