Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetchSocket not returning response for redis-stream-adapter ioredis #13

Open
nilkanth987 opened this issue May 20, 2024 · 2 comments
Open

Comments

@nilkanth987
Copy link

nilkanth987 commented May 20, 2024

Bug description

Hello Guys, I am facing an issue where fetchSockets is not returning a response in cluster mode. I have tried using ioredis as well as redis connecter, removed polling so do not require sticky, tried using both redis-adapter and redis-stream-adapter but same results.

Steps to reproduce

I was able to reproduce it via a mocha test. Here is my configuration.

node v19.7.0
// package.json
"ioredis": "^5.3.2",
"socket.io": "^4.7.4",
"@socket.io/redis-streams-adapter": "^0.2.2",

socket.js

import { createAdapter } from '@socket.io/redis-streams-adapter'
import { getIoRedisClientForSocket } from 'redis.connection.js'
const io = new Server(httpServer, {
      transports: ['websocket']
    })
    io.adapter(createAdapter(getIoRedisClientForSocket()))
    io.on('connection', (socket) => {
        socket.on('join_a_room', async (params, callback) => {
        try {
          const socketsInRoom = (await io.in(params.roomName).fetchSockets())?.length
          console.log(':::::::::: SOCKETS IN ROOM ::::::: ', socketsInRoom)
          const isRoomEmpty = socketsInRoom === 0
          if (isRoomEmpty) {
            // Do some action here
          }
          socket.join(params.roomName)
          callback(true, { message: 'Joined room', socketsInRoom })
        } catch (error) {
          console.log('>>> ERRR::: ', error)
          callback(false, { message: error.message })
        }
      })
    })

redis.connection.js

import { Redis } from 'ioredis'
export const getIoRedisClientForSocket = () => {
  return new Redis(config.redis.url)
}

socket.test.js

describe.only('Multiple instance testing', () => {
    before(() => {
       // Default server is listing to 4001. Creating new server to listen to 4002. 
      const httpServer = http.Server(app)
      new Socket().listen(httpServer)
      httpServer.listen(4002, () => { console.log('Test server started on 4002') })
    })
    it.only('should get proper response', async () => {
      const userA = await createUser()
      const userB = await createUser({ phone: '9898989888', email: '[email protected]' })
      const userAToken = await getLoginToken(userA)
      const userBToken = await getLoginToken(userB)
      const userAsocket = await connectUserSocket(userAToken)
      const userBsocket = await connectUserSocket(userBToken, 'ws://127.0.0.1:4002')
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userA joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userAsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
      await new Promise((resolve) => {
        const eventCallback = (isSuccess, data) => {
          console.log('>>>>> userB joined:: ', data)
          expect(isSuccess).to.equal(true)
          resolve()
        }
        userBsocket.emit('join_a_room', {
          roomName: 'test'
        }, eventCallback)
      })
    })
})

OUTPUT

Have added logs at some places

>>>> Stream Adapter >>  {"type":1,"uid":"c42282ebea7991ca","nsp":"/"}
Test server started on 4002
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":1}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":1} 1716207983034-0
[0d79ac3ff828190f] new event of type 1 from c42282ebea7991ca
>>>> Stream Adapter >>  {"type":2,"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":2}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":2} 1716207983037-0
[c42282ebea7991ca] new event of type 2 from 0d79ac3ff828190f
>>>> SOC : broadcast operator rooms Set(1) { '664b416ea5b08951db0a5c5d' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416ea5b08951db0a5c5d"],"except":[],"flags":{}},"requestId":"6a404d7bfcfbcffa"}} 1716207983152-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}} 1716207983343-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"6a404d7bfcfbcffa","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  6a404d7bfcfbcffa
>>>> SOC : broadcast operator rooms Set(1) { '664b416fa5b08951db0a5c5f' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["664b416fa5b08951db0a5c5f"],"except":[],"flags":{}},"requestId":"49de3c88dbea8de6"}} 1716207983546-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[c42282ebea7991ca] new event of type 8 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  49de3c88dbea8de6
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":8,"data":{"requestId":"49de3c88dbea8de6","sockets":[]}} 1716207983748-0
[0d79ac3ff828190f] ignore message from self
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"},"uid":"0d79ac3ff828190f","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[0d79ac3ff828190f] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}} 1716207983951-0
[c42282ebea7991ca] new event of type 7 from 0d79ac3ff828190f
>>>> SOC onMessage ::  {"uid":"0d79ac3ff828190f","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"de66c29c37ca54b5"}}
>>>> SOC :: cluster adapter :: [c42282ebea7991ca] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>> Stream Adapter >>  {"type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[c42282ebea7991ca] ignore message from self
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}} 1716207984153-0
[0d79ac3ff828190f] new event of type 8 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":8,"data":{"requestId":"de66c29c37ca54b5","sockets":[]}}
>>>>> SOC cluster adapter Fetch response received ::  de66c29c37ca54b5
:::::::::: SOCKETS IN ROOM :::::::  0
>>>>> userA joined::  { message: 'Joined room', socketsInRoom: 0 }
>>>> SOC : broadcast operator rooms Set(1) { 'test' }
>>>> SOC : broadcast operator flags {}
>>>>> SOC cluster adapter counts [] 2
>>>> Stream Adapter >>  {"type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"},"uid":"c42282ebea7991ca","nsp":"/"}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[0d79ac3ff828190f] new event of type 7 from c42282ebea7991ca
>>>> SOC onMessage ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>> SOC :: cluster adapter :: [0d79ac3ff828190f] calling fetchSockets with opts {"rooms":["test"],"except":[],"flags":{}}
>>>>> Stream Adapter :: OnRawMessage   {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}}
>>>>>> SOC Cluster adapter message ::  {"uid":"c42282ebea7991ca","nsp":"/","type":7,"data":{"opts":{"rooms":["test"],"except":[],"flags":{}},"requestId":"3d9e7ef46e884ca0"}} 1716207984356-0
[c42282ebea7991ca] ignore message from self
>>> ERRR:::  Error: timeout reached: missing 1 responses
    at Timeout._onTimeout (/Users/nilkanthparmar/Documents/Projects/Ellu/ellu_backend/node_modules/socket.io-adapter/dist/cluster-adapter.js:611:28)
    at listOnTimeout (node:internal/timers:568:17)
    at processTimers (node:internal/timers:511:7)
>>>> Stream Adapter >>  {"type":2,"uid":"977b7eaed0b6bef1","nsp":"/"}
>>>>> userB joined::  { message: 'timeout reached: missing 1 responses' }
      1) should get proper response
      1) should get proper response
@nilkanth987
Copy link
Author

Looks like related to #11

@nilkanth987
Copy link
Author

I have been debugging the library and was able to find the bug in library.
I have assigned an object with functions in socket.data and it was causing encoding library to fail and crash. Below is the line where the server crashed and didn't respond.

https://github.com/socketio/socket.io-redis-streams-adapter/blob/71ed4e4450ce0e59dd99e9d8c60e4e62bc8b15e0/lib/adapter.ts#L189

Proposed Solution

  • Update the Doc so that we know that you cannot assign object with functions/circular dependencies. I referred this SOCKET DOCUMENTATION LINK
  • Add try catch to the statement and display the error in the logs/ handle the error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant