Skip to main content
Version: 4.x

客户端交付

¥Client delivery

让我们看看如何确保服务器始终收到客户端发送的消息。

¥Let's see how we can make sure that the server always receives the messages sent by the clients.

info

默认情况下,Socket.IO 提供 "最多一次" 传送保证(也称为 "即发即忘"),这意味着如果消息未到达服务器,则不会重试。

¥By default, Socket.IO provides an "at most once" guarantee of delivery (also known as "fire and forget"), which means that there will be no retry in case the message does not reach the server.

缓冲事件

¥Buffered events

当客户端断开连接时,对 socket.emit() 的任何调用都会被缓冲,直到重新连接:

¥When a client gets disconnected, any call to socket.emit() is buffered until reconnection:

/videos/tutorial/buffered-events.mp4

在上面的视频中,"realtime" 消息被缓冲,直到重新建立连接。

¥In the video above, the "realtime" message is buffered until the connection is reestablished.

这种行为对于你的应用来说可能完全足够了。但是,在某些情况下消息可能会丢失:

¥This behavior might be totally sufficient for your application. However, there are a few cases where a message could be lost:

  • 发送事件时连接被切断

    ¥the connection is severed while the event is being sent

  • 服务器在处理事件时崩溃或重新启动

    ¥the server crashes or get restarted while processing the event

  • 数据库暂时不可用

    ¥the database is temporarily not available

至少一次

¥At least once

我们可以实现 "至少一次" 保证:

¥We can implement an "at least once" guarantee:

  • 手动确认:

    ¥manually with an acknowledgement:

function emit(socket, event, arg) {
socket.timeout(5000).emit(event, arg, (err) => {
if (err) {
// no ack from the server, let's retry
emit(socket, event, arg);
}
});
}

emit(socket, 'hello', 'world');
  • 或使用 retries 选项:

    ¥or with the retries option:

const socket = io({
ackTimeout: 10000,
retries: 3
});

socket.emit('hello', 'world');

在这两种情况下,客户端都会重试发送消息,直到收到服务器的确认:

¥In both cases, the client will retry to send the message until it gets an acknowledgement from the server:

io.on('connection', (socket) => {
socket.on('hello', (value, callback) => {
// once the event is successfully handled
callback();
});
})
tip

使用 retries 选项,可以保证消息的顺序,因为消息会被排队并一条一条地发送。第一个选项的情况并非如此。

¥With the retries option, the order of the messages is guaranteed, as the messages are queued and sent one by one. This is not the case with the first option.

正好一次

¥Exactly once

重试的问题是,服务器现在可能会多次收到相同的消息,因此它需要一种方法来唯一标识每条消息,并且只在数据库中存储一次。

¥The problem with retries is that the server might now receive the same message multiple times, so it needs a way to uniquely identify each message, and only store it once in the database.

让我们看看如何在聊天应用中实现 "正好一次" 保证。

¥Let's see how we can implement an "exactly once" guarantee in our chat application.

我们首先为客户端的每条消息分配一个唯一的标识符:

¥We will start by assigning a unique identifier to each message on the client side:

index.html
<script>
let counter = 0;

const socket = io({
auth: {
serverOffset: 0
},
// enable retries
ackTimeout: 10000,
retries: 3,
});

const form = document.getElementById('form');
const input = document.getElementById('input');
const messages = document.getElementById('messages');

form.addEventListener('submit', (e) => {
e.preventDefault();
if (input.value) {
// compute a unique offset
const clientOffset = `${socket.id}-${counter++}`;
socket.emit('chat message', input.value, clientOffset);
input.value = '';
}
});

socket.on('chat message', (msg, serverOffset) => {
const item = document.createElement('li');
item.textContent = msg;
messages.appendChild(item);
window.scrollTo(0, document.body.scrollHeight);
socket.auth.serverOffset = serverOffset;
});
</script>
note

socket.id 属性是分配给每个连接的随机 20 个字符标识符。

¥The socket.id attribute is a random 20-characters identifier which is assigned to each connection.

我们还可以使用 getRandomValues() 来生成唯一的偏移量。

¥We could also have used getRandomValues() to generate a unique offset.

然后我们将这个偏移量与消息一起存储在服务器端:

¥And then we store this offset alongside the message on the server side:

index.js
// [...]

io.on('connection', async (socket) => {
socket.on('chat message', async (msg, clientOffset, callback) => {
let result;
try {
result = await db.run('INSERT INTO messages (content, client_offset) VALUES (?, ?)', msg, clientOffset);
} catch (e) {
if (e.errno === 19 /* SQLITE_CONSTRAINT */ ) {
// the message was already inserted, so we notify the client
callback();
} else {
// nothing to do, just let the client retry
}
return;
}
io.emit('chat message', msg, result.lastID);
// acknowledge the event
callback();
});

if (!socket.recovered) {
try {
await db.each('SELECT id, content FROM messages WHERE id > ?',
[socket.handshake.auth.serverOffset || 0],
(_err, row) => {
socket.emit('chat message', row.content, row.id);
}
)
} catch (e) {
// something went wrong
}
}
});

// [...]

这样,client_offset 列上的 UNIQUE 约束就可以防止消息重复。

¥This way, the UNIQUE constraint on the client_offset column prevents the duplication of the message.

caution

不要忘记确认该事件,否则客户端将不断重试(最多 retries 次)。

¥Do not forget to acknowledge the event, or else the client will keep retrying (up to retries times).

socket.on('chat message', async (msg, clientOffset, callback) => {
// ... and finally
callback();
});
info

同样,默认保证 ("最多一次") 可能足以满足你的应用的需求,但现在你知道如何使其更加可靠。

¥Again, the default guarantee ("at most once") might be sufficient for your application, but now you know how it can be made more reliable.

在下一步中,我们将了解如何水平扩展我们的应用。

¥In the next step, we will see how we can scale our application horizontally.

info

你可以直接在浏览器中运行此示例:

¥You can run this example directly in your browser on: