如何统计已连接用户数量
¥How to count the number of connected users
计算已连接用户比 统计客户端数量 稍微复杂一些,因为单个用户可以跨多个选项卡、浏览器甚至设备连接。
¥Counting connected users is a bit more complex than counting clients, because a single user can be connected across multiple tabs, browsers or even devices.
独立式
¥Standalone
使用单个 Socket.IO 服务器时,映射 应该足够:
¥When using a single Socket.IO server, a Map should be sufficient:
function computeUserId(socket) {
// parse cookie / read JWT token / ... and retrieve the user ID (to be implemented)
}
const users = new Map();
function handleConnection(userId) {
const count = users.get(userId) || 0;
users.set(userId, count + 1);
return count === 0;
}
function handleDisconnection(userId) {
const count = users.get(userId) - 1;
if (count === 0) {
users.delete(userId);
} else {
users.set(userId, count);
}
return count === 0;
}
io.on("connection", (socket) => {
const userId = computeUserId(socket);
const hasConnected = handleConnection(userId);
if (hasConnected) {
io.emit("user has connected", userId);
}
socket.on("disconnect", () => {
const hasDisconnected = handleDisconnection(userId);
if (hasDisconnected) {
io.emit("user has disconnected", userId);
}
});
});
在这种情况下,计算用户存在非常简单:
¥In that case, computing the user presence is quite straightforward:
function isUserConnected(userId) {
return users.has(userId);
}
function usersCount() {
return users.size;
}
function usersList() {
return [...users.keys()];
}
集群
¥Cluster
单个用户在线
¥Single user presence
检查给定用户是否在线的一个简单而有效的解决方案是使用 fetchSockets()
方法,每个用户 ID 对应一个房间:
¥A simple yet effective solution to check whether a given user is online is to use the fetchSockets()
method with one room per user ID:
fetchSockets()
方法向集群中的每个节点发送请求,这些节点使用其本地套接字实例(当前连接到节点的实例)进行响应。
¥The fetchSockets()
method sends a request to every node in the cluster, which respond with their local socket instances (the ones that are currently connected to the node).
¥Reference: fetchSockets()
io.on("connection", (socket) => {
const userId = computeUserId(socket);
socket.join(userId);
});
async function isUserConnected(userId) {
const sockets = await io.in(userId).fetchSockets();
return sockets.length > 0;
}
这有效,但是 fetchSockets()
方法包含有关套接字实例的大量详细信息(id、房间、握手数据)。这可以通过 serverSideEmit()
方法稍微改进:
¥This works, however the fetchSockets()
method includes a lot of details about the socket instances (id, rooms, handshake data). This can be slightly improved with the serverSideEmit()
method:
serverSideEmit()
方法向集群中的每个节点发送事件,并等待它们的响应。
¥The serverSideEmit()
method sends an event to every node in the cluster, and waits for their responses.
¥Reference: serverSideEmitWithAck()
const users = new Map();
io.on("connection", (socket) => {
// update the `users` map (see the "Standalone" section above)
});
function isUserConnectedOnThisNode(userId) {
return users.has(userId);
}
io.on("isUserConnected", (userId, cb) => {
cb(isUserConnectedOnThisNode(userId));
});
async function isUserConnected(userId) {
if (isUserConnectedOnThisNode(userId)) {
return true;
}
const responses = await io.serverSideEmitWithAck("isUserConnected", userId);
return responses.some(r => r);
}
但是,这两种方法都不能有效地计数和/或列出所有连接的用户。
¥However, both methods do not allow to efficiently count and/or list all connected users.
所有用户在线
¥All users presence
此用例最有效的解决方案是使用 Redis 等外部存储。
¥The most efficient solution for this use case is to use an external store like Redis.
在 Redis 中:
¥In Redis:
键 | 类型 | 内容 |
---|---|---|
processes | Set | [process1, process2] |
process1:is-up | 字符串(+ 到期时间) | 1 |
process2:is-up | 字符串(+ 到期时间) | 1 |
users | 哈希 | { user1: 2, user2: 1 } |
process1:users | 哈希 | { user1: 1, user2: 1 } |
process2:users | 哈希 | { user1: 1 } |
注意:
¥Notes:
users
哈希跟踪每个用户 ID 的套接字实例数¥the
users
hash tracks the number of socket instances per user ID如果一台服务器突然崩溃并且无法更新
users
哈希,则使用<processId>:users
哈希¥the
<processId>:users
hashes are used in case one server abruptly crashes and fails to update theusers
hash
让我们从创建一个自定义 HDECR
方法开始,它将减少哈希的一个字段,并在它降至 0 时删除它。我们将使用 Lua 脚本,因此两个命令都是原子执行的:
¥Let's start by creating a custom HDECR
method, which will decrement a field of the hash, and delete it if it drops to 0. We will use a Lua script, so both commands are executed atomically:
import { createClient, defineScript } from "redis";
const redisClient = createClient({
url: "redis://...",
scripts: {
hDecr: defineScript({
NUMBER_OF_KEYS: 1,
SCRIPT:
`
local count = redis.call('HINCRBY', KEYS[1], ARGV[1], -1)
if count == 0 then
redis.call('HDEL', KEYS[1], ARGV[1])
end
return count
`,
transformArguments(key, userId) {
return [key, userId];
}
}),
},
});
现在我们只需:
¥Now we'll simply:
连接时调用
HINCRBY
¥call
HINCRBY
upon connection断开连接时调用我们的自定义
HDECR
命令¥call our custom
HDECR
command upon disconnection
开始:
¥Here we go:
const processId = randomUUID();
// add the process ID to the "processes" set
await redisClient.multi()
.sAdd("processes", processId)
.set(`${processId}:is-up`, "1", { EX: 10 })
.exec();
setInterval(async () => {
// notify that the process is still alive
await redisClient.expire(`${processId}:is-up`, 10);
}, 5000);
async function handleConnection(userId) {
// atomically increment the `userId` field in both hashes
const [res] = await redisClient.multi()
.hIncrBy("users", userId, 1)
.hIncrBy(`${processId}:users`, userId, 1)
.exec();
return res === 1;
}
async function handleDisconnection(userId) {
// atomically decrement the `userId` field in both hashes
const [res] = await redisClient.multi()
.hDecr("users", userId)
.hDecr(`${processId}:users`, userId)
.exec();
return res === 0;
}
io.on("connection", async (socket) => {
const userId = computeUserId(socket);
const hasConnected = await handleConnection(userId);
if (hasConnected) {
io.emit("user has connected", userId);
}
socket.on("disconnect", async () => {
const hasDisconnected = await handleDisconnection(userId);
if (hasDisconnected) {
io.emit("user has disconnected", userId);
}
});
});
最后,我们可以使用 users
哈希来计算用户存在:
¥Finally, we can use the users
hash to compute the user presence:
function isUserConnected(userId) {
return redisClient.hExists("users", userId);
}
function usersCount() {
return redisClient.hLen("users");
}
function usersList() {
return redisClient.hKeys("users");
}
清理过程定期检查死进程:
¥The cleanup process periodically checks for dead processes:
import { createClient, defineScript } from "redis";
const redisClient = createClient({
url: "redis://...",
scripts: {
cleanup: defineScript({
NUMBER_OF_KEYS: 2,
SCRIPT:
`
local disconnected_users = {}
local values = redis.call('HGETALL', KEYS[2])
for i = 1, #values, 2 do
local user_id = values[i]
local socket_count = tonumber(values[i + 1])
local count = redis.call('HINCRBY', KEYS[1], user_id, -socket_count)
if count == 0 then
redis.call('HDEL', KEYS[1], user_id)
table.insert(disconnected_users, user_id)
end
end
redis.call('DEL', KEYS[2])
return disconnected_users
`,
transformArguments(key, processKey) {
return [key, processKey];
}
}),
},
});
await redisClient.connect();
setInterval(async () => {
const processes = await redisClient.sMembers("processes");
const states = await redisClient.mGet(processes.map(p => `${p}:is-up`));
for (let i = 0; i < processes.length; i++) {
if (states[i] === "1") {
continue;
}
const processId = processes[i];
await redisClient.multi()
.cleanup("users", `${processId}:users`)
.sRem("processes", processId)
.exec();
// TODO emit the "user has disconnected" events
}
}, 5000);
以上就是全部内容了,感谢大家的阅读!
¥That's all folks, thanks for reading!
也可以看看:如何统计已连接客户端数量
¥See also: How to count the number of connected clients