如何构建基本的 Socket.IO 客户端
¥How to build a basic Socket.IO client
在本指南中,我们将用 JavaScript 实现一个基本的 Socket.IO 客户端,以便更好地理解 Socket.IO 协议。
¥In this guide, we will implement a basic Socket.IO client in JavaScript, in order to get a better understanding of the Socket.IO protocol.
我们将实现以下功能:
¥We will implement the following features:
创建 WebSocket 连接
¥creating a WebSocket connection
管理重新连接
¥managing reconnections
发送事件
¥sending events
接收事件
¥receiving events
手动断开连接
¥disconnecting manually
官方客户端显然包含更多功能:
¥The official client obviously contains a lot more features:
二进制有效负载
¥binary payloads
...
但这应该足以让你很好地了解该库的底层工作原理。
¥But that should be sufficient to give you a good overview of how the library works under the hood.
我们的目标是实现这样的目标:
¥Our goal is to achieve something like this:
import { io } from "./basic-client.js";
const socket = io();
// connection
socket.on("connect", () => {
// ...
});
// receiving an event
socket.on("foo", (value) => {
// ...
});
// sending an event
socket.emit("bar", "abc");
准备好?我们开工吧!
¥Ready? Let's do this!
事件触发器
¥Event emitter
Socket.IO API 很大程度上受到 Node.js EventEmitter 类的启发。
¥The Socket.IO API is heavily inspired from the Node.js EventEmitter class.
import { EventEmitter } from "node:events";
const myEmitter = new EventEmitter();
myEmitter.on("foo", () => {
console.log("foo!");
});
myEmitter.emit("foo");
该库提供了类似的 API,但在服务器和客户端之间:
¥The library provides a similar API, but between a server and a client:
服务器
¥server
io.on("connection", (socket) => {
// send a "foo" event to the client
socket.emit("foo");
// receive a "bar" event from the client
socket.on("bar", () => {
// ...
});
});
- client
import { io } from "socket.io-client";
const socket = io();
// receive a "foo" event from the server
socket.on("foo", () => {
// ...
});
// send a "bar" event to the server
socket.emit("bar");
服务器和客户端之间的底层连接(WebSocket 或 HTTP 长轮询)被抽象出来并由库管理。
¥The underlying connection between the server and the client (WebSocket or HTTP long-polling) is abstracted away and managed by the library.
让我们创建一个简约的 EventEmitter
类:
¥Let's create a minimalistic EventEmitter
class:
class EventEmitter {
#listeners = new Map();
on(event, listener) {
let listeners = this.#listeners.get(event);
if (!listeners) {
this.#listeners.set(event, listeners = []);
}
listeners.push(listener);
}
emit(event, ...args) {
const listeners = this.#listeners.get(event);
if (listeners) {
for (const listener of listeners) {
listener.apply(null, args);
}
}
}
}
然后,我们的 Socket
类将扩展该类,以便公开 on()
和 emit()
方法:
¥Our Socket
class will then extend this class, in order to expose both the on()
and the emit()
methods:
class Socket extends EventEmitter {
constructor(uri, opts) {
super();
}
}
在我们的构造函数中,uri
参数是:
¥In our constructor, the uri
argument is either:
用户提供:
¥provided by the user:
const socket = io("https://example.com");
或从
window.location
对象推断¥or inferred from the
window.location
object
const socket = io();
让我们创建一个入口点:
¥Let's create an entrypoint:
export function io(uri, opts) {
if (typeof uri !== "string") {
opts = uri;
uri = location.origin;
}
return new Socket(uri, opts);
}
好的,这是一个好的开始!
¥OK, so that's a good start!
WebSocket 连接
¥WebSocket connection
现在,让我们创建到服务器的 WebSocket 连接:
¥Now, let's create the WebSocket connection to the server:
class Socket extends EventEmitter {
+ #uri;
+ #opts;
+ #ws;
constructor(uri, opts) {
super();
+ this.#uri = uri;
+ this.#opts = Object.assign({
+ path: "/socket.io/"
+ }, opts);
+ this.#open();
}
+ #open() {
+ this.#ws = new WebSocket(this.#createUrl());
+ }
+
+ #createUrl() {
+ const uri = this.#uri.replace(/^http/, "ws");
+ const queryParams = "?EIO=4&transport=websocket";
+ return `${uri}${this.#opts.path}${queryParams}`;
+ }
}
参考:https://web.nodejs.cn/en-US/docs/Web/API/WebSocket
¥Reference: https://web.nodejs.cn/en-US/docs/Web/API/WebSocket
关于 createUrl()
方法的一些解释:
¥Some explanations about the createUrl()
method:
WebSocket URL 以
ws://
或wss://
开头,因此我们在replace()
调用中处理它¥a WebSocket URL starts with
ws://
orwss://
, so we handle this in thereplace()
callSocket.IO URL 始终包含特定的请求路径,默认为
/socket.io/
¥a Socket.IO URL always contains a specific request path, which defaults to
/socket.io/
有两个强制查询参数:
¥there are two mandatory query parameters:
EIO=4
:Engine.IO 协议的版本¥
EIO=4
: the version of the Engine.IO protocoltransport=websocket
:使用的传输工具¥
transport=websocket
: the transport used
所以最终的 URL 将如下所示:wss://example.com/socket.io/?EIO=4&transport=websocket
¥So the final URL will look like: wss://example.com/socket.io/?EIO=4&transport=websocket
Engine.IO 协议
¥The Engine.IO protocol
Socket.IO 代码库分为两个不同的层:
¥The Socket.IO codebase is split into two distinct layers:
低层管道:我们所说的 Engine.IO,Socket.IO 内部的引擎
¥the low-level plumbing: what we call Engine.IO, the engine inside Socket.IO
高级 API:Socket.IO 本身
¥the high-level API: Socket.IO itself
也可以看看:
¥See also:
使用 WebSocket 时,通过线路发送的消息的格式很简单:<packet type><payload>
¥When using WebSocket, the format of the messages sent over the wire is simply: <packet type><payload>
以下是协议第四版(因此是上面的 EIO=4
)中的不同数据包类型:
¥Here are the different packet types in the 4th version (hence the EIO=4
above) of the protocol:
名称 | 表示 | 描述 |
---|---|---|
打开 | 0 | 在握手时使用。 |
关闭 | 1 | 用于指示可以关闭传输。 |
PING | 2 | 用于心跳机制。 |
PONG | 3 | 用于心跳机制。 |
信息 | 4 | 用于向另一端发送有效负载。 |
升级 | 5 | 在升级过程中使用(此处未使用)。 |
NOOP | 6 | 在升级过程中使用(此处未使用)。 |
示例:
¥Example:
4hello
with:
4 => MESSAGE packet type
hello => message payload (UTF-8 encoded)
让我们处理 WebSocket 消息:
¥Let's handle the WebSocket messages:
+const EIOPacketType = {
+ OPEN: "0",
+ CLOSE: "1",
+ PING: "2",
+ PONG: "3",
+ MESSAGE: "4",
+};
+function noop() {}
class Socket extends EventEmitter {
[...]
#open() {
this.#ws = new WebSocket(this.#createUrl());
+ this.#ws.onmessage = ({ data }) => this.#onMessage(data);
+ this.#ws.onclose = () => this.#onClose("transport close");
}
+ #onMessage(data) {
+ if (typeof data !== "string") {
+ // TODO handle binary payloads
+ return;
+ }
+
+ switch (data[0]) {
+ case EIOPacketType.CLOSE:
+ this.#onClose("transport close");
+ break;
+
+ default:
+ this.#onClose("parse error");
+ break;
+ }
+ }
+
+ #onClose(reason) {
+ if (this.#ws) {
+ this.#ws.onclose = noop;
+ this.#ws.close();
+ }
+ }
+}
心跳
¥Heartbeat
实现了心跳机制来保证服务器和客户端之间的连接是健康的。
¥A heartbeat mechanism is implemented to ensure that the connection between the server and the client is healthy.
服务器在初始握手期间发送两个值:pingInterval
和 pingTimeout
¥The server sends two values during the initial handshake: pingInterval
and pingTimeout
然后,它将每 pingInterval
毫秒发送一个 PING 数据包,并期望从客户端返回一个 PONG 数据包。我们开工吧:
¥It will then send a PING packet every pingInterval
ms, and expect a PONG packet back from the client. Let's do this:
class Socket extends EventEmitter {
+ #pingTimeoutTimer;
+ #pingTimeoutDelay;
[...]
#onMessage(data) {
if (typeof data !== "string") {
// TODO handle binary payloads
return;
}
switch (data[0]) {
+ case EIOPacketType.OPEN:
+ this.#onOpen(data);
+ break;
+
case EIOPacketType.CLOSE:
this.#onClose("transport close");
break;
+ case EIOPacketType.PING:
+ this.#resetPingTimeout();
+ this.#send(EIOPacketType.PONG);
+ break;
default:
this.#onClose("parse error");
break;
}
}
+ #onOpen(data) {
+ let handshake;
+ try {
+ handshake = JSON.parse(data.substring(1));
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
+ this.#resetPingTimeout();
+ }
+
+ #resetPingTimeout() {
+ clearTimeout(this.#pingTimeoutTimer);
+ this.#pingTimeoutTimer = setTimeout(() => {
+ this.#onClose("ping timeout");
+ }, this.#pingTimeoutDelay);
+ }
+
+ #send(data) {
+ if (this.#ws.readyState === WebSocket.OPEN) {
+ this.#ws.send(data);
+ }
+ }
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
+ clearTimeout(this.#pingTimeoutTimer);
}
}
重连
¥Reconnection
当我们这样做时,我们还将处理重新连接。WebSocket 非常棒,但它们可能(并且在现实生活中)会断开连接,因此我们必须注意这一点:
¥While we're at it, we will also handle reconnections. WebSockets are awesome, but they can (and they will, in real-life conditions) get disconnected, so we must take care of that:
class Socket extends EventEmitter {
[...]
constructor(uri, opts) {
super();
this.#uri = uri;
this.#opts = Object.assign(
{
path: "/socket.io/",
+ reconnectionDelay: 2000,
},
opts
);
this.#open();
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
}
官方 Socket.IO 客户端使用具有一定随机性的奇特指数延迟,以防止大量客户端同时重新连接时出现负载峰值,但我们在这里保持简单并使用常量值。
¥The official Socket.IO client uses a fancy exponential delay with some randomness in order to prevent spikes of load when a lot of clients reconnect at the same time, but we'll keep it simple here and use a constant value.
好的,让我们总结一下,我们现在有一个客户端可以:
¥OK, so let's sum up, we now have a client that can:
打开与服务器的 WebSocket 连接
¥open a WebSocket connection to the server
通过响应 PING 数据包来遵守心跳机制
¥honor the heartbeat mechanism by responding to PING packets
失败时自动重连
¥automatically reconnect upon failure
Engine.IO 协议就这样了!现在让我们深入研究 Socket.IO 协议。
¥That's it for the Engine.IO protocol! Let's dig into the Socket.IO protocol now.
Socket.IO 协议
¥The Socket.IO protocol
Socket.IO 协议构建在 earlier 描述的 Engine.IO 协议之上,这意味着每个 Socket.IO 数据包在通过线路发送时都会以 "4"(Engine.IO MESSAGE 数据包类型)为前缀。
¥The Socket.IO protocol is built on top of the Engine.IO protocol described earlier, which means that every Socket.IO packet will be prefixed by "4" (the Engine.IO MESSAGE packet type) when sent over the wire.
参考:Socket.IO 协议
¥Reference: the Socket.IO protocol
如果没有二进制元素,则格式如下:
¥Without binary elements, the format is the following:
<packet type>[JSON-stringified payload]
以下是可用数据包类型的列表:
¥Here is the list of available packet types:
类型 | ID | 用法 |
---|---|---|
连接 | 0 | 在连接到命名空间期间使用。 |
断开 | 1 | 当与名称空间断开连接时使用。 |
事件 | 2 | 用于向对方发送数据。 |
确认 | 3 | 用于确认事件(此处未使用)。 |
连接错误 | 4 | 在连接到命名空间期间使用(此处未使用)。 |
BINARY_EVENT | 5 | 用于向对方发送二进制数据(此处未使用)。 |
二进制确认 | 6 | 用于确认事件(响应包括二进制数据)(此处未使用)。 |
示例:
¥Example:
2["hello","world"]
with:
2 => EVENT packet type
["hello","world"] => JSON.stringified() payload
正在连接
¥Connecting
客户端必须在 Socket.IO 会话开始时发送 CONNECT 数据包:
¥The client must send a CONNECT packet at the beginning of the Socket.IO session:
+const SIOPacketType = {
+ CONNECT: 0,
+ DISCONNECT: 1,
+ EVENT: 2,
+};
class Socket extends EventEmitter {
[...]
#onOpen(data) {
let handshake;
try {
handshake = JSON.parse(data.substring(1));
} catch (e) {
return this.#onClose("parse error");
}
this.#pingTimeoutDelay = handshake.pingInterval + handshake.pingTimeout;
this.#resetPingTimeout();
+ this.#doConnect();
}
+ #doConnect() {
+ this.#sendPacket({ type: SIOPacketType.CONNECT });
+ }
+
+ #sendPacket(packet) {
+ this.#send(EIOPacketType.MESSAGE + encode(packet));
+ }
}
+function encode(packet) {
+ let output = "" + packet.type;
+
+ return output;
+}
如果允许连接,那么服务器将发回一个 CONNECT 数据包:
¥If the connection is allowed, then the server will send a CONNECT packet back:
class Socket extends EventEmitter {
+ id;
[...]
#onMessage(data) {
switch (data[0]) {
[...]
+ case EIOPacketType.MESSAGE:
+ let packet;
+ try {
+ packet = decode(data);
+ } catch (e) {
+ return this.#onClose("parse error");
+ }
+ this.#onPacket(packet);
+ break;
}
}
+ #onPacket(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ this.#onConnect(packet);
+ break;
+ }
+ }
+ #onConnect(packet) {
+ this.id = packet.data.sid;
+
+ super.emit("connect");
+ }
}
+function decode(data) {
+ let i = 1; // skip "4" prefix
+
+ const packet = {
+ type: parseInt(data.charAt(i++), 10),
+ };
+
+ if (!isPacketValid(packet)) {
+ throw new Error("invalid format");
+ }
+
+ return packet;
+}
+
+function isPacketValid(packet) {
+ switch (packet.type) {
+ case SIOPacketType.CONNECT:
+ return typeof packet.data === "object";
+ default:
+ return false;
+ }
+}
我们使用 super.emit(...)
,以便稍后能够重写 emit()
方法来发送事件。
¥We are using super.emit(...)
so that we will be able to override the emit()
method later to send an event.
发送事件
¥Sending an event
让我们向服务器发送一些数据。我们需要跟踪底层连接的状态并缓冲数据包,直到连接准备好:
¥Let's send some data to the server. We need to track the state of the underlying connection and buffer the packets until the connection is ready:
class Socket extends EventEmitter {
+ connected = false;
+ #sendBuffer = [];
[...]
+ emit(...args) {
+ const packet = {
+ type: SIOPacketType.EVENT,
+ data: args,
+ };
+
+ if (this.connected) {
+ this.#sendPacket(packet);
+ } else {
+ this.#sendBuffer.push(packet);
+ }
+ }
#onConnect(packet) {
this.id = packet.data.sid;
+ this.connected = true;
+ this.#sendBuffer.forEach((packet) => this.#sendPacket(packet));
+ this.#sendBuffer.slice(0);
super.emit("connect");
}
}
function encode(packet) {
let output = "" + packet.type;
+ if (packet.data) {
+ output += JSON.stringify(packet.data);
+ }
return output;
}
接收事件
¥Receiving an event
相反,我们来处理服务器发送的 EVENT 数据包:
¥Conversely, let's handle the EVENT packets sent by the server:
class Socket extends EventEmitter {
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.EVENT:
+ super.emit.apply(this, packet.data);
+ break;
}
}
}
function decode(data) {
let i = 1; // skip "4" prefix
const packet = {
type: parseInt(data.charAt(i++), 10),
};
+ if (data.charAt(i)) {
+ packet.data = JSON.parse(data.substring(i));
+ }
if (!isPacketValid(packet)) {
throw new Error("invalid format");
}
return packet;
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.EVENT: {
+ const args = packet.data;
+ return (
+ Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
+ );
+ }
default:
return false;
}
}
手动断开连接
¥Disconnecting manually
最后,让我们处理套接字不应该尝试重新连接的几种情况:
¥And finally, let's handle the few cases where the socket shouldn't try to reconnect:
当客户端调用
socket.disconnect()
时¥when the client calls
socket.disconnect()
当服务器调用
socket.disconnect()
时¥when the server calls
socket.disconnect()
class Socket extends EventEmitter {
+ #reconnectTimer;
+ #shouldReconnect = true;
[...]
#onPacket(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
this.#onConnect(packet);
break;
+ case SIOPacketType.DISCONNECT:
+ this.#shouldReconnect = false;
+ this.#onClose("io server disconnect");
+ break;
case SIOPacketType.EVENT:
super.emit.apply(this, packet.data);
break;
}
}
#onClose(reason) {
if (this.#ws) {
this.#ws.onclose = noop;
this.#ws.close();
}
clearTimeout(this.#pingTimeoutTimer);
+ clearTimeout(this.#reconnectTimer);
+
+ if (this.#shouldReconnect) {
+ this.#reconnectTimer = setTimeout(
+ () => this.#open(),
+ this.#opts.reconnectionDelay
+ );
+ }
- setTimeout(() => this.#open(), this.#opts.reconnectionDelay);
}
+ disconnect() {
+ this.#shouldReconnect = false;
+ this.#onClose("io client disconnect");
+ }
}
function isPacketValid(packet) {
switch (packet.type) {
case SIOPacketType.CONNECT:
return typeof packet.data === "object";
+ case SIOPacketType.DISCONNECT:
+ return packet.data === undefined;
case SIOPacketType.EVENT: {
const args = packet.data;
return (
Array.isArray(args) && args.length > 0 && typeof args[0] === "string"
);
}
default:
return false;
}
}
结束语
¥Ending notes
这就是我们的基本 Socket.IO 客户端!让我们回顾一下。
¥That's it for our basic Socket.IO client! So let's recap.
我们实现了以下功能:
¥We have implemented the following features:
创建 WebSocket 连接
¥creating a WebSocket connection
管理重新连接
¥managing reconnections
发送事件
¥sending events
接收事件
¥receiving events
手动断开连接
¥disconnecting manually
希望你现在能够更好地了解该库的底层工作原理。
¥Hopefully, you now have a better understanding of how the library works under the hood.
完整的源代码可以找到 there。
¥The complete source code can be found there.
谢谢阅读!
¥Thanks for reading!