'use strict'
var EventEmitter = require('events').EventEmitter,
FrameEncoder
/**
* Fired when the connection is ready
* @event Peer#connect
*/
/**
* @event Peer#error
* @type {Error}
*/
/**
* Emited when the connection is closed and no more activity will happen
* @event Peer#close
*/
/**
* @callback Peer~AuthHandler
* @this {Peer}
* @param {string} user
* @param {string} password
* @param {function(?Error)} done
*/
/**
* @typedef {Object} Peer~Auth
* @property {string} user
* @property {boolean} required
* @property {Peer~AuthHandler} handler
* @property {string} remoteUser
*/
/**
* @typedef {Object} Peer~PendingCall
* @property {Function} callback
* @property {Call} call
* @property {?Timer} timer - timeout timer
*/
/**
* Represent one of the sides in the protocol.
* This class exposes most of the interface the final user will interact with
* NOTE: This class should not instantiated directly. See {@link Context#_createPeer}
* @class
* @extends EventEmitter
* @param {Object} calls
* @param {Object} messages
* @param {Object} auth
* @param {string} [auth.user='']
* @param {string} [auth.password='']
* @param {boolean} [auth.required=false] - if true, will require remote authentication
* @param {Peer~AuthHandler} [auth.handler] - the default handler accepts any user/password
* @param {Connection} connection
*/
function Peer(calls, messages, auth, connection) {
var that = this
EventEmitter.call(this)
/**
* @member {Peer~Auth}
*/
this.auth = {
user: auth.user || '',
required: Boolean(auth.required),
handler: auth.handler || function (user, password, done) {
done()
},
remoteUser: ''
}
/**
* Whether the connection has been closed and nothing more can be done.
* Note that 'error' events may still be raised after 'close'
* @member {boolean}
* @readonly
*/
this.closed = false
/**
* Whether the handshake has been completed
* @member {boolean} Peer#handshakeDone
* @readonly
*/
Object.defineProperty(this, 'handshakeDone', {
enumerable: true,
get: function () {
return this._localHandshake === Peer._HANDSHAKE_STATE.DONE &&
this._remoteHandshake === Peer._HANDSHAKE_STATE.DONE
}
})
/**
* @member {Object}
* @property {Object} local
* @property {Object<Call>} local.map
* @property {Array<Call>} local.list
* @property {Object<boolean>} local.available - a map of available calls (true=compatible)
* @property {Object} remote
* @property {Object<Call>} remote.map
* @property {Array<Call>} remote.list
* @private
*/
this._calls = {
local: {
map: calls.local.map,
list: calls.local.list,
available: Object.create(null)
},
remote: {
map: calls.remote.map,
list: calls.remote.list
}
}
/**
* @member {Object}
* @property {Object} local
* @property {Object<Message>} local.map
* @property {Array<Message>} local.list
* @property {Object<boolean>} local.available - a map of available messages (true=compatible)
* @property {Object} remote
* @property {Object<Message>} remote.map
* @property {Array<Message>} remote.list
* @private
*/
this._messages = {
local: {
map: messages.local.map,
list: messages.local.list,
available: Object.create(null)
},
remote: {
map: messages.remote.map,
list: messages.remote.list
}
}
/**
* @member {Connection}
* @private
*/
this._connection = connection
/**
* @member {FrameEncoder}
* @private
*/
this._encoder = new FrameEncoder(this, connection)
/**
* @member {Array<Peer~PendingCall>}
* @private
*/
this._pendingCalls = []
/**
* @member {Peer.HANDSHAKE_STATE}
* @private
*/
this._localHandshake = Peer._HANDSHAKE_STATE.STARTING
/**
* @member {Peer.HANDSHAKE_STATE}
* @private
*/
this._remoteHandshake = Peer._HANDSHAKE_STATE.STARTING
// Set up listeners
connection.once('close', this.close.bind(this))
connection.on('frame', this._processFrame.bind(this))
connection.on('error', this._error.bind(this))
// Start the handshake process (or wait for connect)
if (connection.hasConnected) {
start()
} else {
connection.once('connect', start)
}
function start() {
var password = auth.password || ''
that._encoder.doHandshake(that.auth.user, password, calls.remote.list, messages.remote.list)
that._localHandshake = Peer._HANDSHAKE_STATE.WAITING
}
}
require('util').inherits(Peer, EventEmitter)
module.exports = Peer
FrameEncoder = require('./FrameEncoder')
/**
* Protocol error codes
* Negative codes are always generated locally. Codes 1xxx are reserved for protocol use
* @enum {number}
*/
Peer.ERROR = {
/**
* The call has timed out.
* If the answer is received after the timeout, it will be ignored
*/
TIMEOUT: -1,
/**
* The connection was closed before this call could be answer.
*/
CLOSED: -2,
/**
* The remote has not attached a handler to this call type
*/
NOT_IMPLEMENTED: 1000,
/**
* The call input data was badly formatted.
* You should never see this error, since the local code validates the data before sending it
*/
INVALID_DATA: 1001,
/**
* The remote misbehaved and could not answer the call correctly
* It's caused by invalid answer format
*/
INTERNAL: 1002
}
/**
* @enum {number}
* @private
*/
Peer._HANDSHAKE_STATE = {
STARTING: 0,
WAITING: 1,
DONE: 2
}
/**
* Check if the remote can answer to a call
* @param {string} name
*/
Peer.prototype.canCall = function (name) {
return Boolean(this._calls.local.available[name])
}
/**
* Make a call to the other side (remote)
* The callback will be executed asynchronously and only once
* Making an unsupported call is considered error
* If the remote answers the call after the timeout has fired, the response will be ignored
* If no response is expected, you should send messages, not make calls!
* All errors related to this call will be routed to the callback (including invalid input format)
* One can check if the error was raised locally or by the remote (as a response) by checking the `isLocal` flag in the Error object
* @param {string} name - call name
* @param {*} [data=null] - must follow the call input format
* @param {number} [timeout=10e3] - call timeout in ms (0 means no timeout)
* @param {function(?Error,*)} callback - required
*/
Peer.prototype.call = function (name, data, timeout, callback) {
var sequenceId, timer
if (typeof data === 'function') {
callback = data
data = null
timeout = 10e3
} else if (typeof timeout === 'function') {
callback = timeout
timeout = 10e3
} else if (typeof callback !== 'function') {
throw new TypeError('A callback must be supplied. ' +
'If no response is expected, you should send messages, not make calls')
}
var localCall = this._calls.local.map[name]
if (this.closed) {
return asyncError(this._createLocalError('Connection is closed', Peer.ERROR.CLOSED))
} else if (!this.handshakeDone) {
return asyncError(this._createLocalError('Connection is not ready'))
} else if (!localCall) {
return asyncError(this._createLocalError('Local call ' + name + ' not found'))
} else if (!this.canCall(name)) {
return asyncError(this._createLocalError('The remote does not give support for call ' + name))
}
sequenceId = this._pendingCalls.length
// Encode the data
try {
this._encoder.doCall(sequenceId, localCall.id, data, localCall.input)
} catch (e) {
e.isLocal = true
return asyncError(e)
}
// Set timeout
if (timeout) {
timer = setTimeout(this._timeoutCall.bind(this, sequenceId), timeout)
}
// Save call info
this._pendingCalls.push({
callback: callback,
call: localCall,
timer: timer
})
/**
* Mixing async and sync is a bad idea, so we make sure the callback is called async-ly
* @param {Error} err
*/
function asyncError(err) {
process.nextTick(function () {
callback(err)
})
}
}
/**
* Check if the remote accepts a message
* @param {string} name
*/
Peer.prototype.canSend = function (name) {
return Boolean(this._messages.local.available[name])
}
/**
* Send a message to the other side (remote)
* This is a fire-and-forget operation, any error will not be informed
* If it is important to know whether the remote has received the message, consider using calls
* By default (strict=false), no feedback is given
* @param {string} name - message name
* @param {*} [data] - must follow the message format
* @param {boolean} [strict=false] - whether to throw error if the operation is invalid
* @throws if strict and some error occurs locally
*/
Peer.prototype.send = function (name, data, strict) {
var localMessage = this._messages.local.map[name],
that = this
if (strict) {
// Let errors be thrown
return _send()
}
try {
_send()
} catch (e) {
// Ignore errors
}
function _send() {
if (that.closed) {
throw new Error('Connection is closed')
} else if (!that.handshakeDone) {
throw new Error('Connection is not ready')
} else if (!localMessage) {
throw new Error('Local message ' + name + ' not found. Have you added it?')
} else if (!that.canSend(name)) {
throw new Error('The remote does not give support for message ' + name)
}
// Encode and send (let errors be thrown)
that._encoder.doSend(localMessage.id, data, localMessage.input)
}
}
/**
* Close the connection and drop all pending calls
*/
Peer.prototype.close = function () {
if (this.closed) {
return
}
this.closed = true
this._connection.close()
// Send closed error to all pending calls
this._pendingCalls.forEach(function (pendingCall) {
var err = this._createLocalError('Connection has closed', Peer.ERROR.CLOSED)
process.nextTick(pendingCall.callback.bind(this, err))
pendingCall.timer && clearTimeout(pendingCall.timer)
}, this)
this._pendingCalls = []
// Clean up events
this._connection.removeAllListeners('close').removeAllListeners('frame').removeAllListeners('connect')
this.emit('close')
this.removeAllListeners('close')
}
/**
* Process the timeout of a call
* @param {number} sid - sequential id
* @private
*/
Peer.prototype._timeoutCall = function (sid) {
var pendingCall = this._pendingCalls[sid],
err = this._createLocalError('Timed out', Peer.ERROR.TIMEOUT)
if (pendingCall) {
delete this._pendingCalls[sid]
process.nextTick(pendingCall.callback.bind(this, err))
}
}
/**
* Process each incoming frame (called by the Connection)
* @param {Buffer} frame
* @private
*/
Peer.prototype._processFrame = function (frame) {
if (this.closed) {
return
}
try {
if (this.handshakeDone) {
this._encoder.processFrame(frame)
} else if (this._remoteHandshake === Peer._HANDSHAKE_STATE.STARTING) {
this._processHandshake(this._encoder.readHandshake(frame))
} else if (this._localHandshake === Peer._HANDSHAKE_STATE.WAITING) {
this._processHandshakeAnswer(this._encoder.readHandshakeAnswer(frame))
} else {
throw new Error('Unexpected frame before handshake is completed')
}
} catch (e) {
this._error(e)
}
}
/**
* Emit an error event and close the connection
* @param {Error} error
* @private
*/
Peer.prototype._error = function (error) {
this.close()
this.emit('error', error)
}
/**
* @param {string} message
* @param {number} [code] - protocol errors are negative
* @return {Error}
* @private
*/
Peer.prototype._createLocalError = function (message, code) {
var err = new Error(message)
err.isLocal = true
if (code) {
err.code = code
}
return err
}
/**
* Process incoming handshake data
* @param {Object} data
* @param {Object} data.auth
* @param {string} data.auth.user
* @param {string} data.auth.password
* @param {Array} data.calls
* @param {number} data.calls.id
* @param {Buffer} data.calls.hash
* @param {Array} data.messages
* @param {number} data.messages.id
* @param {Buffer} data.messages.hash
* @private
*/
Peer.prototype._processHandshake = function (data) {
var that = this
this._remoteHandshake = Peer._HANDSHAKE_STATE.WAITING
// Check if remote has authenticated when we want it to
if (this.auth.required && !(data.auth.user || data.auth.password)) {
this._encoder.doAnswerHandshake('Authentication is required')
this._remoteHandshake = Peer._HANDSHAKE_STATE.DONE
return this._error(new Error('Remote authentication was expected'))
}
this.auth.remoteUser = data.auth.user
// Save available calls and messages
var localCalls = this._calls.local,
localMessages = this._messages.local
data.calls.forEach(function (call) {
var localCall = localCalls.map[call.id]
if (localCall) {
localCalls.available[localCall.name] = checkHash(call.hash, localCall.hash)
}
})
data.messages.forEach(function (message) {
var localMessage = localMessages.map[message.id]
if (localMessage) {
localMessages.available[localMessage.name] = checkHash(message.hash, localMessage.hash)
}
}, this)
// Execute handshake handler
process.nextTick(this.auth.handler.bind(this, data.auth.user, data.auth.password, function (err) {
that._remoteHandshake = Peer._HANDSHAKE_STATE.DONE
if (that.closed) {
// Nothing to do here
return
}
that._encoder.doAnswerHandshake(err)
if (err) {
that._error(err)
} else if (that.handshakeDone) {
that.emit('connect')
that.removeAllListeners('connect')
}
}))
}
/**
* Check if two hashes are equal
* @param {Buffer} a
* @param {Buffer} b
* @return {boolean}
* @priavate
*/
function checkHash(a, b) {
var i, len
if (a.length !== b.length) {
return false
}
for (i = 0, len = a.length; i < len; i++) {
if (a[i] !== b[i]) {
return false
}
}
return true
}
/**
* Process the handshake answer
* @param {Object} data
* @param {boolean} data.ok
* @private
*/
Peer.prototype._processHandshakeAnswer = function (data) {
this._localHandshake = Peer._HANDSHAKE_STATE.DONE
if (data.error !== undefined) {
this._error(new Error('Handshake failed: ' + data.error))
} else if (this.handshakeDone) {
this.emit('connect')
this.removeAllListeners('connect')
}
}
/**
* Process incoming call (called by FrameEncoder)
* @param {number} sid
* @param {number} id
* @param {ReadState} data
* @private
*/
Peer.prototype._processCall = function (sid, id, data) {
var remoteCall = this._calls.remote.map[id],
inputData = null,
done
if (!remoteCall || !remoteCall.handler) {
return this._encoder.doAnswerError(sid, 'Not implemented', Peer.ERROR.NOT_IMPLEMENTED)
}
if (remoteCall.input) {
try {
inputData = remoteCall.input.read(data)
} catch (e) {
// This should not happen normally, because signatures are checked on handshake
// and data format is checked by remote peer
return this._encoder.doAnswerError(sid, 'Invalid input data', Peer.ERROR.INVALID_DATA)
}
}
done = this._encoder.doAnswer.bind(this._encoder, sid, remoteCall)
process.nextTick(remoteCall.handler.bind(this, inputData, done))
}
/**
* Process incoming message (called by FrameEncoder)
* @param {number} id
* @param {ReadState} data
* @private
*/
Peer.prototype._processMessage = function (id, data) {
var remoteMessage = this._messages.remote.map[id],
inputData = null
if (!remoteMessage || !remoteMessage.handler) {
// Not implemented
return
}
if (remoteMessage.input) {
try {
inputData = remoteMessage.input.read(data)
} catch (e) {
// Encoding errors are ignored for messages
return
}
}
process.nextTick(remoteMessage.handler.bind(this, inputData))
}
/**
* Process incoming call success response (called by FrameEncoder)
* @param {number} sid
* @param {ReadState} data
* @private
*/
Peer.prototype._processResponse = function (sid, data) {
var pendingCall = this._pendingCalls[sid],
outputData = null
if (!pendingCall) {
// Not found (may have already been answered or timed out)
return
}
// Remove it from the list of pending calls
delete this._pendingCalls[sid]
pendingCall.timer && clearTimeout(pendingCall.timer)
if (pendingCall.call.output) {
try {
outputData = pendingCall.call.output.read(data)
} catch (e) {
e.isLocal = true
return process.nextTick(pendingCall.callback.bind(this, e))
}
}
process.nextTick(pendingCall.callback.bind(this, null, outputData))
}
/**
* Process incoming call error response (called by FrameEncoder)
* @param {number} sid
* @param {string} reason
* @param {number} code
* @private
*/
Peer.prototype._processError = function (sid, reason, code) {
var pendingCall = this._pendingCalls[sid],
err
if (!pendingCall) {
// Not found (may have already been answered or timed out)
return
}
// Remove it from the list of pending calls
delete this._pendingCalls[sid]
pendingCall.timer && clearTimeout(pendingCall.timer)
// Call with error
err = new Error(reason)
err.isLocal = false
if (code) {
err.code = code
}
process.nextTick(pendingCall.callback.bind(this, err))
}