Skip to main content

私有消息 - 第四部分

¥Private messaging - Part IV

本指南分为四个不同的部分:

¥This guide has four distinct parts:

这是 第三部分 结束时的情况:

¥Here's where we were at the end of the 3rd part:

Chat

现在我们将了解如何扩展到多个 Socket.IO 服务器,以实现高可用性/负载平衡目的。

¥We will see now how we can scale to multiple Socket.IO servers, for high availability / load-balancing purposes.

安装

¥Installation

让我们检查一下第四部分的分支:

¥Let's checkout the branch for part IV:

git checkout examples/private-messaging-part-4

你应该在当前目录中看到以下内容:

¥Here's what you should see in the current directory:

├── babel.config.js
├── package.json
├── public
│ ├── favicon.ico
│ ├── fonts
│ │ └── Lato-Regular.ttf
│ └── index.html
├── README.md
├── server
│ ├── cluster.js (created)
│ ├── docker-compose.yml (created)
│ ├── index.js (updated)
│ ├── messageStore.js (updated)
│ ├── package.json (updated)
│ └── sessionStore.js (updated)
└── src
├── App.vue
├── components
│ ├── Chat.vue
│ ├── MessagePanel.vue
│ ├── SelectUsername.vue
│ ├── StatusIcon.vue
│ └── User.vue
├── main.js
└── socket.js

完整的差异可以在 此处 中找到。

¥The complete diff can be found here.

更新服务器

¥Updating the server

对于最后一部分,我们需要在服务器端添加 3 个额外的依赖:

¥For this last part, we need 3 additional dependencies on the server-side:

我们还需要一个 Redis 实例。为了你的方便,提供了 docker-compose.yml 文件:

¥We also need a Redis instance. For your convenience, a docker-compose.yml file is provided:

cd server
docker-compose up -d

npm install
npm start

这将创建 4 个 Node.js 工作线程,每个工作线程运行相同的 index.js 文件。

¥This will create 4 Node.js workers, each running the same index.js file.

客户端不需要做任何改变,这里我们重点关注服务器端。

¥On the client-side, no change is needed, we will focus on the server-side here.

工作原理

¥How it works

创建多个服务器

¥Creating multiple servers

创建多个 Socket.IO 服务器时,需要做两件事:

¥When creating multiple Socket.IO servers, there are two things to do:

  • 你需要启用粘性会话(请参阅 此处 以获得完整说明)

    ¥you need to enable sticky-session (please see here for the complete explanation)

  • 你需要将默认内存适配器替换为 Redis 适配器(或其他兼容适配器)

    ¥you need to replace the default in-memory adapter with the Redis adapter (or another compatible adapter)

在我们的示例中,@socket.io/sticky 模块用于确保来自给定客户端的请求始终路由到同一 Socket.IO 服务器。这就是所谓的 "sticky-session":

¥In our example, the @socket.io/sticky module is used to ensure that requests from a given client are always routed to the same Socket.IO server. This is what is called "sticky-session":

Sticky session

注意:我们还可以创建多个监听不同端口(或使用多个主机)的进程,并在它们前面添加一个反向代理。documentation 中介绍了为 NginX 或 HAProxy 等常见反向代理解决方案启用粘性会话。

¥Note: we could also have created several processes listening to different ports (or used multiple hosts), and add a reverse-proxy in front of them. Enabling sticky-session for common reverse-proxy solutions like NginX or HAProxy is covered in the documentation.

集群在 server/cluster.js 文件中创建:

¥The cluster is created in the server/cluster.js file:

const cluster = require("cluster");
const http = require("http");
const { setupMaster } = require("@socket.io/sticky");

const WORKERS_COUNT = 4;

if (cluster.isMaster) {
console.log(`Master ${process.pid} is running`);

for (let i = 0; i < WORKERS_COUNT; i++) {
cluster.fork();
}

cluster.on("exit", (worker) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork();
});

const httpServer = http.createServer();
setupMaster(httpServer, {
loadBalancingMethod: "least-connection", // either "random", "round-robin" or "least-connection"
});
const PORT = process.env.PORT || 3000;

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);
} else {
console.log(`Worker ${process.pid} started`);
require("./index");
}

在我们现有的 server/index.js 文件中,有一个更改:工作进程创建的 HTTP 服务器实际上并不监听给定的端口,请求将由主进程处理,然后转发到正确的工作进程。

¥In our existing server/index.js file, there is a single change: the HTTP server created by the worker process does not actually listen to a given port, the requests will be handled by the master process and then forwarded to the right worker.

前:

¥Before:

httpServer.listen(PORT, () =>
console.log(`server listening at http://localhost:${PORT}`)
);

后:

¥After:

setupWorker(io);

@socket.io/sticky 提供的 setupWorker 方法将负责 master 和 worker 之间的同步。

¥The setupWorker method provided by the @socket.io/sticky will take care of the synchronization between the master and the worker.

会话和消息

¥Sessions & messages

现在粘性会话已启用,我们需要在 Socket.IO 服务器之间共享会话和消息。

¥Now that sticky-session is enabled, we need to share sessions and messages across the Socket.IO servers.

我们基于 Redis 创建一个新的 SessionStore。我们将使用 HSET 命令将每个会话存储在 Redis 哈希中:

¥We create a new SessionStore based on Redis. We will store each session in a Redis hash, with the HSET command:

class RedisSessionStore extends SessionStore {
// ...
saveSession(id, { userID, username, connected }) {
this.redisClient
.multi()
.hset(`session:${id}`, "userID", userID, "username", username, "connected", connected)
.expire(`session:${id}`, SESSION_TTL)
.exec();
}
// ...
}

我们还为密钥设置了过期时间,以便清理旧会话。

¥We also set an expiry to the key in order to clean up old sessions.

使用 HMGET 命令获取会话非常简单:

¥Fetching the session is quite straightforward, with the HMGET command:

const mapSession = ([userID, username, connected]) =>
userID ? { userID, username, connected: connected === "true" } : undefined;

class RedisSessionStore extends SessionStore {
// ...
findSession(id) {
return this.redisClient
.hmget(`session:${id}`, "userID", "username", "connected")
.then(mapSession);
}
// ...
}

获取所有会话有点复杂:

¥Fetching all sessions is a bit more complex:

class RedisSessionStore extends SessionStore {
// ...
async findAllSessions() {
// first, we fetch all the keys with the SCAN command
const keys = new Set();
let nextIndex = 0;
do {
const [nextIndexAsStr, results] = await this.redisClient.scan(
nextIndex,
"MATCH",
"session:*",
"COUNT",
"100"
);
nextIndex = parseInt(nextIndexAsStr, 10);
results.forEach((s) => keys.add(s));
} while (nextIndex !== 0);

// and then we retrieve the session details with multiple HMGET commands
const commands = [];
keys.forEach((key) => {
commands.push(["hmget", key, "userID", "username", "connected"]);
});
return this.redisClient
.multi(commands)
.exec()
.then((results) => {
return results
.map(([err, session]) => (err ? undefined : mapSession(session)))
.filter((v) => !!v);
});
}
}

同样,我们基于 Redis 创建一个新的 MessageStore。我们将使用 RPUSH 命令将链接到给定用户的所有消息存储在 Redis 列表中:

¥Similarly, we create a new MessageStore based on Redis. We will store all the messages linked to a given user in a Redis list, with the RPUSH command:

class RedisMessageStore extends MessageStore {
// ...
saveMessage(message) {
const value = JSON.stringify(message);
this.redisClient
.multi()
.rpush(`messages:${message.from}`, value)
.rpush(`messages:${message.to}`, value)
.expire(`messages:${message.from}`, CONVERSATION_TTL)
.expire(`messages:${message.to}`, CONVERSATION_TTL)
.exec();
}
// ...
}

使用 LRANGE 命令检索消息:

¥Retrieving the messages is done with the LRANGE command:

class RedisMessageStore extends MessageStore {
// ...
findMessagesForUser(userID) {
return this.redisClient
.lrange(`messages:${userID}`, 0, -1)
.then((results) => {
return results.map((result) => JSON.parse(result));
});
}
}

转发消息

¥Forwarding messages

还需要最后一项修改:我们需要确保消息确实到达接收者,即使该接收者没有连接到同一个 Socket.IO 服务器上:

¥There is one last modification that is needed: we need to make sure that messages actually reach the recipient, even if this recipient is not connected on the same Socket.IO server:

Broadcasting with the Redis adapter

这是 Redis 适配器的职责,它依靠 Redis 发布/订阅机制在 Socket.IO 服务器之间广播消息并最终到达所有客户端。

¥This is the duty of the Redis adapter, which relies on the Redis pub/sub mechanism to broadcast messages between the Socket.IO servers and eventually reach all clients.

const httpServer = require("http").createServer();
const Redis = require("ioredis");
const redisClient = new Redis();
const io = require("socket.io")(httpServer, {
cors: {
origin: "http://localhost:8080",
},
adapter: require("socket.io-redis")({
pubClient: redisClient,
subClient: redisClient.duplicate(),
}),
});

就是这样!如果你的计算机上有 Redis CLI,你可以检查通过网络发送的消息:

¥And that's it! If you have a Redis CLI on your machine, you can check the messages that are sent on the wire:

$ redis-cli
127.0.0.1:6379> PSUBSCRIBE socket.io*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "socket.io*"
3) (integer) 1
1) "pmessage"
2) "socket.io*"
3) "socket.io#/#"
4) "\x93\xa6XFD3OF\x83..."

文档:

¥Documentation:

注意:使用 Redis 适配器,"disconnect" 处理程序中使用的 allSockets() 方法会自动返回所有 Socket.IO 服务器上的 Socket ID,因此无需更新任何内容。

¥Note: with the Redis adapter, the allSockets() method which is used in the "disconnect" handler automatically returns the Socket IDs across all Socket.IO servers, so there is nothing to update.

审查

¥Review

好的,那么我们总结一下:我们已经创建了一个功能齐全的聊天(是的,再一次!),健壮,可以水平扩展,这使我们能够引入一些有用的 Socket.IO 功能:

¥OK, so let's sum it up: we have created a fully functional chat (yes, once again!), robust, ready to scale horizontally, which allowed us to introduce some useful Socket.IO features:

谢谢阅读!

¥Thanks for reading!