const MAX_IN_FLIGHT = 5 // underscore means PRIVATE // NO TOUCHY class ChatConnection { constructor(debug) { this._debug = debug || false this._currentlyInFlight = 0 this._waitingBuffer = [] this._inFlightMessages = [] this._authenticated = false this._connected = false this._socket = undefined this._messagesSubject = new Rx.Subject() this._connStatusSubject = new Rx.Subject() this._connStatusSubject.next(ConnectionStatus.DISCONNECTED) this._timeouts = {} } connect(userId, token) { this._userId = userId this._token = token if (this._socket) { this._socket.disconnect() this._socket.close() } this._socket = io() this._socket.on('connect', () => { if (this._debug) { console.log('connected') } this._connStatusSubject.next(ConnectionStatus.CONNECTED) this._connected = true this._triggerBufferFlush() this._authenticate() }) this._socket.on('error', err => { console.error(err) }) this._socket.on('message', msg => { if (msg.type === MsgType.DISCONNECT) { if (msg.payload.reason === 'kicked') { this._connected = false this._connStatusSubject.next(ConnectionStatus.KICKED) } else if (msg.payload.reason === 'new-session-opened') { this._connected = false this._connStatusSubject.next(ConnectionStatus.NEW_SESSION_OPENED) } } else { this._messagesSubject.next(msg) } }) this._socket.on('disconnect', () => { if (this._connected === true) { this._connected = false this._connStatusSubject.next(ConnectionStatus.DISCONNECTED) } this._inFlightMessages = [] this._waitingBuffer = [] if (this._debug) { console.warn('disconnected') } }) this._socket.on('ack', this._handleAck.bind(this)) this._socket.on('err', this._handleErr.bind(this)) } connectionStatus() { return this._connStatusSubject.asObservable() } listen() { return this._messagesSubject.asObservable() } send(envelopedMessage) { if (envelopedMessage.type === MsgType.CHAT_MESSAGE && !envelopedMessage.messageId) { throw new Error('Chat messages have to have externally generated UUID messageId !!!') } if (this._debug) { console.log('received send message request:', envelopedMessage) } if (envelopedMessage.type !== MsgType.CHAT_MESSAGE) { const messageWithId = ChatConnection.addMessageId(envelopedMessage) this._waitingBuffer.push(messageWithId) } else { this._waitingBuffer.push(envelopedMessage) } this._triggerBufferFlush() } _authenticate() { const authMessage = { messageId: ChatConnection.getNextMessageId(), type: MsgType.AUTH, payload: { userId: this._userId, token: this._token } } this._inFlightMessages.push(authMessage) this._currentlyInFlight++ this._socket.emit('auth', authMessage) if (this._debug) { console.log('authenticating', authMessage) } } _triggerBufferFlush() { if (this._debug) { console.log('triggered buffer flush') } if (this._waitingBuffer.length > 0) { if (this._debug) { console.log('there are ' + this._waitingBuffer.length + ' messages to send') } if (this._connected && this._authenticated && this._currentlyInFlight <= MAX_IN_FLIGHT) { const nextMessage = this._waitingBuffer.shift() if (this._debug) { console.log('sending next message', nextMessage) } this._inFlightMessages.push(nextMessage) this._socket.emit('message', nextMessage) this._currentlyInFlight++ if (this._debug) { console.log('currently in flight: ', this._currentlyInFlight) } this._timeouts[nextMessage.messageId] = setTimeout(() => { this._inFlightMessages = this._inFlightMessages.filter(msg => msg.messageId !== nextMessage.messageId) this._messagesSubject.next({error: 'timeout', messageId: nextMessage.messageId}) }, 3000) } } } _handleAck(ack) { const msg = this._inFlightMessages.find(msg => msg.messageId === ack.messageId) if (this._debug) { console.log('got ack, tried to find matching message', ack, msg) } if (msg) { this._removeFromInFlight(msg) if (msg.type === MsgType.AUTH) { this._authenticated = true } if (msg.type === MsgType.CHAT_MESSAGE) { if (this._debug) { console.log('got ack for sent chat message, propagating', msg) } this._messagesSubject.next({ type: MsgType.ACK, messageId: msg.messageId, roomId: msg.payload.roomId }) } if (this._debug) { console.log('currently in flight: ', this._currentlyInFlight) } this._triggerBufferFlush() } else { console.error('Received ack for message that is not in flight!', ack, this._inFlightMessages) } } _handleErr(err) { const msg = this._inFlightMessages.find(msg => msg.messageId === err.messageId) if (this._debug) { console.log('got err, tired to find matchin message', err, msg) } if (msg) { this._removeFromInFlight(msg) this._messagesSubject.next(err) if (this._debug) { console.log('currently in flight: ', this._currentlyInFlight) } this._triggerBufferFlush() } else { console.error('Received err for message that is not in flight!', err, this._inFlightMessages) } } _removeFromInFlight(msg) { if (this._timeouts.hasOwnProperty(msg.messageId)) { clearTimeout(this._timeouts[msg.messageId]) delete this._timeouts[msg.messageId] } this._inFlightMessages = this._inFlightMessages.filter(inFlightMsg => inFlightMsg !== msg) this._currentlyInFlight-- } static addMessageId(envelopedMessage) { return Object.assign({}, envelopedMessage, { messageId: ChatConnection.getNextMessageId() }) } static getNextMessageId() { return uuid.v4() } }