Работа с WebSocket в Effector
В этом руководстве мы рассмотрим как правильно организовать работу с WebSocket соединением используя Effector.
WebSocket API поддерживает передачу данных в виде строк или бинарных данных (Blob
/ArrayBuffer
). В этом руководстве мы сфокусируемся на работе со строками, так как это наиболее распространённый случай при обмене данными. При необходимости работы с бинарными данными, можно адаптировать примеры под нужный формат.
Базовая модель
Создадим простую, но рабочую модель WebSocket клиента. Для начала определим основные события и состояния:
import { createStore, createEvent, createEffect, sample } from "effector";
// События для работы с сокетом
const disconnected = createEvent();
const messageSent = createEvent<string>();
const rawMessageReceived = createEvent<string>();
const $connection = createStore<WebSocket | null>(null)
.on(connectWebSocketFx.doneData, (_, ws) => ws)
.reset(disconnected);
Создадим эффект для установки соединения:
const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
const ws = new WebSocket(url);
const scopeDisconnected = scopeBind(disconnected);
const scopeRawMessageReceived = scopeBind(rawMessageReceived);
return new Promise((res, rej) => {
ws.onopen = () => {
res(ws);
};
ws.onmessage = (event) => {
scopeRawMessageReceived(event.data);
};
ws.onclose = () => {
scopeDisconnected();
};
ws.onerror = (err) => {
scopeDisconnected();
rej(err);
};
});
});
Обратите внимание, что мы использовали здесь функцию scopeBind
, чтобы связать юниты с текущим скоупом выполнения, так как мы не знаем когда вызовется scopeMessageReceived
внутри socket.onmessage
. Иначе событие попадет в глобальный скоуп.
Читать более подробно.
Если вы по какой-то причине работаете в режиме без скоупа, то вам не нужно использовать scopeBind
.
Учитывайте, что работа со скоупом это рекомундуемый вариант работы!
Обработка сообщений
Создадим стор для последнего полученного сообщения:
const $lastMessage = createStore("");
$lastMessage.on(messageReceived, (_, newMessage) => newMessage);
А также реализуем эффект для отправки сообщения:
const sendMessageFx = createEffect((params: { socket: WebSocket; message: string }) => {
params.socket.send(params.message);
});
// Связываем отправку сообщения с текущим сокетом
sample({
clock: messageSent,
source: $connection,
filter: Boolean, // Отправляем только если есть соединение
fn: (socket, message) => ({
socket,
message,
}),
target: sendMessageFx,
});
WebSocket имеет несколько состояний подключения (CONNECTING
, OPEN
, CLOSING
, CLOSED
). В базовой модели мы упрощаем это до простой проверки через Boolean
, но в реальном приложении может потребоваться более детальное отслеживание состояния.
Обработка ошибок
При работе с WebSocket важно корректно обрабатывать различные типы ошибок для обеспечения надежности приложения.
Расширим нашу базовую модель добавив обработку ошибок:
const TIMEOUT = 5_000;
// Добавляем события для ошибок
const socketError = createEvent<Error>();
const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
const ws = new WebSocket(url);
const scopeDisconnected = scopeBind(disconnected);
const scopeRawMessageReceived = scopeBind(rawMessageReceived);
const scopeSocketError = scopeBind(socketError);
return new Promise((res, rej) => {
const timeout = setTimeout(() => {
const error = new Error("Connection timeout");
socketError(error);
reject(error);
socket.close();
}, TIMEOUT);
ws.onopen = () => {
clearTimeout(timeout);
res(ws);
};
ws.onmessage = (event) => {
scopeMessageReceived(event.data);
};
ws.onclose = () => {
disconnected();
};
ws.onerror = (err) => {
const error = new Error("WebSocket error");
scopeDisconnected();
scopeSocketError(error);
rej(err);
};
});
});
// Стор для хранения ошибки
const $error = createStore("")
.on(socketError, (_, error) => error.message)
.reset(connectWebSocketFx.done);
Всегда обрабатывайте ошибки WebSocket соединения, так как они могут возникнуть по множеству причин: проблемы с сетью, таймауты, невалидные данные и т.д.
Типизация сообщений
При работе с WebSocket важно обеспечить типобезопасность данных. Это позволяет предотвратить ошибки на этапе разработки и повысить надёжность приложения при обработке различных типов сообщений.
Для этого воспользуемся библиотекой Zod, хотя можно использовать любую другую библиотеку для валидации.
Даже если вы не используете Zod или другую библиотеку валидации, базовую типизацию WebSocket сообщений можно реализовать и с помощью обычных TypeScript-интерфейсов. Но помните — они проверяют типы только на этапе компиляции и не защитят вас от неожиданных данных во время выполнения.
Предположим, что мы ожидаем два типа сообщений: balanceChanged
и reportGenerated
, содержащие следующие поля:
export const messagesSchema = z.discriminatedUnion("type", [
z.object({
type: z.literal("balanceChanged"),
balance: z.number(),
}),
z.object({
type: z.literal("reportGenerated"),
reportId: z.string(),
reportName: z.string(),
}),
]);
// Получаем тип из схемы
type MessagesSchema = z.infer<typeof messagesSchema>;
Теперь добавим эффект обработки сообщений, чтобы гарантировать, что они соответствуют ожидаемым типам, а также логику получения сообщений:
const parsedMessageReceived = createEvent<MessagesSchema>();
const parseFx = createEffect((message: unknown): MessagesSchema => {
return messagesSchema.parse(JSON.parse(typeof message === "string" ? message : "{}"));
});
// Парсим сообщение при его получении
sample({
clock: rawMessageReceived,
target: parseFx,
});
// Если парсинг удался — отправляем сообщение дальше
sample({
clock: parseFx.doneData,
target: parsedMessageReceived,
});
Мы также должны обработать ситуацию, когда сообщение не соответствует схеме:
const validationError = createEvent<Error>();
// Если парсинг не удался — обрабатываем ошибку
sample({
clock: parseFx.failData,
target: validationError,
});
Вот и всё, теперь все входящие сообщения будут проверяться на соответствие схеме перед их обработкой, а также иметь типизацию.
Такой же подход можно применить и для исходящих сообщений. Это позволит проверять их структуру перед отправкой и избежать ошибок.
Если хочется более точечного контроля, можно сделать событие, которое будет срабатывать только для определенного типа сообщений:
type MessageType<T extends MessagesSchema["type"]> = Extract<MessagesSchema, { type: T }>;
export const messageReceivedByType = <T extends MessagesSchema["type"]>(type: T) => {
return sample({
clock: parsedMessageReceived,
filter: (message): message is MessageType<T> => {
return message.type === type;
},
});
};
Пример использования:
sample({
clock: messageReceivedByType("balanceChanged"),
fn: (message) => {
// Typescript знает структуру message
},
target: doWhateverYouWant,
});
Если вы не уверены, какие данные возвращает sample, рекомендуем ознакомиться с документацией по sample
.
Работа с Socket.IO
Socket.IO предоставляет более высокоуровневый API для работы с WebSocket, добавляя множество полезных возможностей “из коробки”.
- Автоматическое переподключение
- Поддержка комнат и пространств имён
- Fallback на HTTP Long-polling если WebSocket недоступен
- Встроенная поддержка событий и подтверждений (acknowledgments)
- Автоматическая сериализация/десериализация данных
import { io, Socket } from "socket.io-client";
import { createStore, createEvent, createEffect, sample } from "effector";
const API_URL = "wss://your.ws.server";
// События
const connected = createEvent();
const disconnected = createEvent();
const socketError = createEvent<Error>();
// Типизация для событий
type ChatMessage = {
room: string;
message: string;
author: string;
};
const messageSent = createEvent<ChatMessage>();
const messageReceived = createEvent<ChatMessage>();
const socketConnected = createEvent();
const connectSocket = createEvent();
const connectFx = createEffect((): Promise<Socket> => {
const socket = io(API_URL, {
//... ваша конфигурация
});
// нужно для корректной работы со скоупами
const scopeConnected = scopeBind(connected);
const scopeDisconnected = scopeBind(disconnected);
const scopeSocketError = scopeBind(socketError);
const scopeMessageReceived = scopeBind(messageReceived);
return new Promise((resolve, reject) => {
socket.on("connect", () => {
scopeConnected();
resolve(socket);
});
socket.on("disconnect", () => scopeDisconnected());
socket.on("connect_error", (error) => scopeSocketError(error));
socket.on("chat message", (msg: ChatMessage) => scopeMessageReceived(msg));
});
});
const sendMessageFx = createEffect(
({
socket,
name,
payload,
}: SocketResponse<any> & {
socket: Socket;
}) => {
socket.emit(name, payload);
},
);
// Состояния
const $socket = createStore<Socket | null>(null)
.on(connectFx.doneData, (_, socket) => socket)
.reset(disconnected);
// инициализация подключения
sample({
clock: connectSocket,
target: connectFx,
});
// вызываем событие после успешного подключения
sample({
clock: connectSocketFx.doneData,
target: socketConnected,
});
Документация на английском языке - самая актуальная, поскольку её пишет и обновляет команда effector. Перевод документации на другие языки осуществляется сообществом по мере наличия сил и желания.
Помните, что переведенные статьи могут быть неактуальными, поэтому для получения наиболее точной и актуальной информации рекомендуем использовать оригинальную англоязычную версию документации.