私有消息 - 第四部分
¥Private messaging - Part IV
本指南分为四个不同的部分:
¥This guide has four distinct parts:
第一部分:初步实现
¥Part I: initial implementation
第二部分:持久用户 ID
¥Part II : persistent user ID
第三部分:持久消息
¥Part III : persistent messages
第四部分(当前):扩大
¥Part IV (current): scaling up
这是 第三部分 结束时的情况:
¥Here's where we were at the end of the 3rd part:

现在我们将了解如何扩展到多个 Socket.IO 服务器,以实现高可用性/负载平衡目的。
¥We will see now how we can scale to multiple Socket.IO servers, for high availability / load-balancing purposes.
安装
¥Installation
让我们检查一下第四部分的分支:
¥Let's checkout the branch for part IV:
git checkout examples/private-messaging-part-4
你应该在当前目录中看到以下内容:
¥Here's what you should see in the current directory:
├── babel.config.js
├── package.json
├── public
│ ├── favicon.ico
│ ├── fonts
│ │ └── Lato-Regular.ttf
│ └── index.html
├── README.md
├── server
│ ├── cluster.js (created)
│ ├── docker-compose.yml (created)
│ ├── index.js (updated)
│ ├── messageStore.js (updated)
│ ├── package.json (updated)
│ └── sessionStore.js (updated)
└── src
├── App.vue
├── components
│ ├── Chat.vue
│ ├── MessagePanel.vue
│ ├── SelectUsername.vue
│ ├── StatusIcon.vue
│ └── User.vue
├── main.js
└── socket.js
完整的差异可以在 此处 中找到。
¥The complete diff can be found here.
更新服务器
¥Updating the server
对于最后一部分,我们需要在服务器端添加 3 个额外的依赖:
¥For this last part, we need 3 additional dependencies on the server-side:
ioredis
:一个很棒的 Redis 客户端¥
ioredis
: a great Redis clientsocket.io-redis
:基于 Redis 发布/订阅机制 的 Socket.IO 适配器¥
socket.io-redis
: a Socket.IO adapter based on Redis pub/sub mechanism@socket.io/sticky
:用于在 Node.js cluster 中运行 Socket.IO 的模块¥
@socket.io/sticky
: a module for running Socket.IO within a Node.js cluster
我们还需要一个 Redis 实例。为了你的方便,提供了 docker-compose.yml
文件:
¥We also need a Redis instance. For your convenience, a docker-compose.yml
file is provided:
cd server
docker-compose up -d
npm install
npm start
这将创建 4 个 Node.js 工作线程,每个工作线程运行相同的 index.js
文件。
¥This will create 4 Node.js workers, each running the same index.js
file.
客户端不需要做任何改变,这里我们重点关注服务器端。
¥On the client-side, no change is needed, we will focus on the server-side here.
工作原理
¥How it works
创建多个服务器
¥Creating multiple servers
创建多个 Socket.IO 服务器时,需要做两件事:
¥When creating multiple Socket.IO servers, there are two things to do:
你需要启用粘性会话(请参阅 此处 以获得完整说明)
¥you need to enable sticky-session (please see here for the complete explanation)
你需要将默认内存适配器替换为 Redis 适配器(或其他兼容适配器)
¥you need to replace the default in-memory adapter with the Redis adapter (or another compatible adapter)
在我们的示例中,@socket.io/sticky
模块用于确保来自给定客户端的请求始终路由到同一 Socket.IO 服务器。这就是所谓的 "sticky-session":
¥In our example, the @socket.io/sticky
module is used to ensure that requests from a given client are always routed to the same Socket.IO server. This is what is called "sticky-session":

注意:我们还可以创建多个监听不同端口(或使用多个主机)的进程,并在它们前面添加一个反向代理。documentation 中介绍了为 NginX 或 HAProxy 等常见反向代理解决方案启用粘性会话。
¥Note: we could also have created several processes listening to different ports (or used multiple hosts), and add a reverse-proxy in front of them. Enabling sticky-session for common reverse-proxy solutions like NginX or HAProxy is covered in the documentation.
集群在 server/cluster.js
文件中创建:
¥The cluster is created in the server/cluster.js
file:
const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");
const WORKERS_COUNT = 4;
if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);
for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}
cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});
const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}
在我们现有的 server/index.js
文件中,有一个更改:工作进程创建的 HTTP 服务器实际上并不监听给定的端口,请求将由主进程处理,然后转发到正确的工作进程。
¥In our existing server/index.js
file, there is a single change: the HTTP server created by the worker process does not actually listen to a given port, the requests will be handled by the master process and then forwarded to the right worker.
前:
¥Before:
httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
后:
¥After:
setupWorker(io);
@socket.io/sticky
提供的 setupWorker
方法将负责 master 和 worker 之间的同步。
¥The setupWorker
method provided by the @socket.io/sticky
will take care of the synchronization between the master and the worker.
会话和消息
¥Sessions & messages
现在粘性会话已启用,我们需要在 Socket.IO 服务器之间共享会话和消息。
¥Now that sticky-session is enabled, we need to share sessions and messages across the Socket.IO servers.
我们基于 Redis 创建一个新的 SessionStore。我们将使用 HSET 命令将每个会话存储在 Redis 哈希中:
¥We create a new SessionStore based on Redis. We will store each session in a Redis hash, with the HSET command:
class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}
我们还为密钥设置了过期时间,以便清理旧会话。
¥We also set an expiry to the key in order to clean up old sessions.
使用 HMGET 命令获取会话非常简单:
¥Fetching the session is quite straightforward, with the HMGET command:
const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;
class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}
获取所有会话有点复杂:
¥Fetching all sessions is a bit more complex:
class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);
// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}
同样,我们基于 Redis 创建一个新的 MessageStore。我们将使用 RPUSH 命令将链接到给定用户的所有消息存储在 Redis 列表中:
¥Similarly, we create a new MessageStore based on Redis. We will store all the messages linked to a given user in a Redis list, with the RPUSH command:
class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}
使用 LRANGE 命令检索消息:
¥Retrieving the messages is done with the LRANGE command:
class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}
转发消息
¥Forwarding messages
还需要最后一项修改:我们需要确保消息确实到达接收者,即使该接收者没有连接到同一个 Socket.IO 服务器上:
¥There is one last modification that is needed: we need to make sure that messages actually reach the recipient, even if this recipient is not connected on the same Socket.IO server:

这是 Redis 适配器的职责,它依靠 Redis 发布/订阅机制在 Socket.IO 服务器之间广播消息并最终到达所有客户端。
¥This is the duty of the Redis adapter, which relies on the Redis pub/sub mechanism to broadcast messages between the Socket.IO servers and eventually reach all clients.
const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});
就是这样!如果你的计算机上有 Redis CLI,你可以检查通过网络发送的消息:
¥And that's it! If you have a Redis CLI on your machine, you can check the messages that are sent on the wire:
$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."
文档:
¥Documentation:
注意:使用 Redis 适配器,"disconnect" 处理程序中使用的 allSockets()
方法会自动返回所有 Socket.IO 服务器上的 Socket ID,因此无需更新任何内容。
¥Note: with the Redis adapter, the allSockets()
method which is used in the "disconnect" handler automatically returns the Socket IDs across all Socket.IO servers, so there is nothing to update.
审查
¥Review
好的,那么我们总结一下:我们已经创建了一个功能齐全的聊天(是的,再一次!),健壮,可以水平扩展,这使我们能够引入一些有用的 Socket.IO 功能:
¥OK, so let's sum it up: we have created a fully functional chat (yes, once again!), robust, ready to scale horizontally, which allowed us to introduce some useful Socket.IO features:
缩放至 多个 Socket.IO 服务器
¥scaling to multiple Socket.IO servers
谢谢阅读!
¥Thanks for reading!