Работа с WebSocket в Effector

В этом руководстве мы рассмотрим как правильно организовать работу с WebSocket соединением используя Effector.

WebSocket и типы данных

WebSocket API поддерживает передачу данных в виде строк или бинарных данных (Blob/ArrayBuffer). В этом руководстве мы сфокусируемся на работе со строками, так как это наиболее распространённый случай при обмене данными. При необходимости работы с бинарными данными, можно адаптировать примеры под нужный формат.

Базовая модель

Создадим простую, но рабочую модель WebSocket клиента. Для начала определим основные события и состояния:

import { createStore, createEvent, createEffect, sample } from "effector";

// События для работы с сокетом
const disconnected = createEvent();
const messageSent = createEvent<string>();
const rawMessageReceived = createEvent<string>();

const $connection = createStore<WebSocket | null>(null)
  .on(connectWebSocketFx.doneData, (_, ws) => ws)
  .reset(disconnected);

Создадим эффект для установки соединения:

const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
  const ws = new WebSocket(url);

  const scopeDisconnected = scopeBind(disconnected);
  const scopeRawMessageReceived = scopeBind(rawMessageReceived);

  return new Promise((res, rej) => {
    ws.onopen = () => {
      res(ws);
    };

    ws.onmessage = (event) => {
      scopeRawMessageReceived(event.data);
    };

    ws.onclose = () => {
      scopeDisconnected();
    };

    ws.onerror = (err) => {
      scopeDisconnected();
      rej(err);
    };
  });
});

Обратите внимание, что мы использовали здесь функцию scopeBind, чтобы связать юниты с текущим скоупом выполнения, так как мы не знаем когда вызовется scopeMessageReceived внутри socket.onmessage. Иначе событие попадет в глобальный скоуп. Читать более подробно.

Работа в режиме 'без скоупа'

Если вы по какой-то причине работаете в режиме без скоупа, то вам не нужно использовать scopeBind.
Учитывайте, что работа со скоупом это рекомундуемый вариант работы!

Обработка сообщений

Создадим стор для последнего полученного сообщения:

const $lastMessage = createStore("");

$lastMessage.on(messageReceived, (_, newMessage) => newMessage);

А также реализуем эффект для отправки сообщения:

const sendMessageFx = createEffect((params: { socket: WebSocket; message: string }) => {
  params.socket.send(params.message);
});

// Связываем отправку сообщения с текущим сокетом
sample({
  clock: messageSent,
  source: $connection,
  filter: Boolean, // Отправляем только если есть соединение
  fn: (socket, message) => ({
    socket,
    message,
  }),
  target: sendMessageFx,
});
Состояния соединения

WebSocket имеет несколько состояний подключения (CONNECTING, OPEN, CLOSING, CLOSED). В базовой модели мы упрощаем это до простой проверки через Boolean, но в реальном приложении может потребоваться более детальное отслеживание состояния.

Обработка ошибок

При работе с WebSocket важно корректно обрабатывать различные типы ошибок для обеспечения надежности приложения.

Расширим нашу базовую модель добавив обработку ошибок:

const TIMEOUT = 5_000;

// Добавляем события для ошибок
const socketError = createEvent<Error>();

const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
  const ws = new WebSocket(url);

  const scopeDisconnected = scopeBind(disconnected);
  const scopeRawMessageReceived = scopeBind(rawMessageReceived);
  const scopeSocketError = scopeBind(socketError);

  return new Promise((res, rej) => {
    const timeout = setTimeout(() => {
      const error = new Error("Connection timeout");

      socketError(error);
      reject(error);
      socket.close();
    }, TIMEOUT);

    ws.onopen = () => {
      clearTimeout(timeout);
      res(ws);
    };

    ws.onmessage = (event) => {
      scopeMessageReceived(event.data);
    };

    ws.onclose = () => {
      disconnected();
    };

    ws.onerror = (err) => {
      const error = new Error("WebSocket error");
      scopeDisconnected();
      scopeSocketError(error);
      rej(err);
    };
  });
});

// Стор для хранения ошибки
const $error = createStore("")
  .on(socketError, (_, error) => error.message)
  .reset(connectWebSocketFx.done);
Обработка ошибок

Всегда обрабатывайте ошибки WebSocket соединения, так как они могут возникнуть по множеству причин: проблемы с сетью, таймауты, невалидные данные и т.д.

Типизация сообщений

При работе с WebSocket важно обеспечить типобезопасность данных. Это позволяет предотвратить ошибки на этапе разработки и повысить надёжность приложения при обработке различных типов сообщений.

Для этого воспользуемся библиотекой Zod, хотя можно использовать любую другую библиотеку для валидации.

TypeScript и проверка типов

Даже если вы не используете Zod или другую библиотеку валидации, базовую типизацию WebSocket сообщений можно реализовать и с помощью обычных TypeScript-интерфейсов. Но помните — они проверяют типы только на этапе компиляции и не защитят вас от неожиданных данных во время выполнения.

Предположим, что мы ожидаем два типа сообщений: balanceChanged и reportGenerated, содержащие следующие поля:

export const messagesSchema = z.discriminatedUnion("type", [
  z.object({
    type: z.literal("balanceChanged"),
    balance: z.number(),
  }),
  z.object({
    type: z.literal("reportGenerated"),
    reportId: z.string(),
    reportName: z.string(),
  }),
]);

// Получаем тип из схемы
type MessagesSchema = z.infer<typeof messagesSchema>;

Теперь добавим эффект обработки сообщений, чтобы гарантировать, что они соответствуют ожидаемым типам, а также логику получения сообщений:

const parsedMessageReceived = createEvent<MessagesSchema>();

const parseFx = createEffect((message: unknown): MessagesSchema => {
  return messagesSchema.parse(JSON.parse(typeof message === "string" ? message : "{}"));
});

// Парсим сообщение при его получении
sample({
  clock: rawMessageReceived,
  target: parseFx,
});

// Если парсинг удался — отправляем сообщение дальше
sample({
  clock: parseFx.doneData,
  target: parsedMessageReceived,
});

Мы также должны обработать ситуацию, когда сообщение не соответствует схеме:

const validationError = createEvent<Error>();

// Если парсинг не удался — обрабатываем ошибку
sample({
  clock: parseFx.failData,
  target: validationError,
});

Вот и всё, теперь все входящие сообщения будут проверяться на соответствие схеме перед их обработкой, а также иметь типизацию.

Типизация отправляемых сообщений

Такой же подход можно применить и для исходящих сообщений. Это позволит проверять их структуру перед отправкой и избежать ошибок.

Если хочется более точечного контроля, можно сделать событие, которое будет срабатывать только для определенного типа сообщений:

type MessageType<T extends MessagesSchema["type"]> = Extract<MessagesSchema, { type: T }>;

export const messageReceivedByType = <T extends MessagesSchema["type"]>(type: T) => {
  return sample({
    clock: parsedMessageReceived,
    filter: (message): message is MessageType<T> => {
      return message.type === type;
    },
  });
};

Пример использования:

sample({
  clock: messageReceivedByType("balanceChanged"),
  fn: (message) => {
    // Typescript знает структуру message
  },
  target: doWhateverYouWant,
});
Возвращаемые значения sample

Если вы не уверены, какие данные возвращает sample, рекомендуем ознакомиться с документацией по sample.

Работа с Socket.IO

Socket.IO предоставляет более высокоуровневый API для работы с WebSocket, добавляя множество полезных возможностей “из коробки”.

Преимущества Socket.IO
  • Автоматическое переподключение
  • Поддержка комнат и пространств имён
  • Fallback на HTTP Long-polling если WebSocket недоступен
  • Встроенная поддержка событий и подтверждений (acknowledgments)
  • Автоматическая сериализация/десериализация данных
import { io, Socket } from "socket.io-client";
import { createStore, createEvent, createEffect, sample } from "effector";

const API_URL = "wss://your.ws.server";

// События
const connected = createEvent();
const disconnected = createEvent();
const socketError = createEvent<Error>();

// Типизация для событий
type ChatMessage = {
  room: string;
  message: string;
  author: string;
};

const messageSent = createEvent<ChatMessage>();
const messageReceived = createEvent<ChatMessage>();
const socketConnected = createEvent();
const connectSocket = createEvent();

const connectFx = createEffect((): Promise<Socket> => {
  const socket = io(API_URL, {
    //... ваша конфигурация
  });

  // нужно для корректной работы со скоупами
  const scopeConnected = scopeBind(connected);
  const scopeDisconnected = scopeBind(disconnected);
  const scopeSocketError = scopeBind(socketError);
  const scopeMessageReceived = scopeBind(messageReceived);

  return new Promise((resolve, reject) => {
    socket.on("connect", () => {
      scopeConnected();
      resolve(socket);
    });

    socket.on("disconnect", () => scopeDisconnected());
    socket.on("connect_error", (error) => scopeSocketError(error));
    socket.on("chat message", (msg: ChatMessage) => scopeMessageReceived(msg));
  });
});

const sendMessageFx = createEffect(
  ({
    socket,
    name,
    payload,
  }: SocketResponse<any> & {
    socket: Socket;
  }) => {
    socket.emit(name, payload);
  },
);

// Состояния
const $socket = createStore<Socket | null>(null)
  .on(connectFx.doneData, (_, socket) => socket)
  .reset(disconnected);

// инициализация подключения
sample({
  clock: connectSocket,
  target: connectFx,
});

// вызываем событие после успешного подключения
sample({
  clock: connectSocketFx.doneData,
  target: socketConnected,
});
Перевод поддерживается сообществом

Документация на английском языке - самая актуальная, поскольку её пишет и обновляет команда effector. Перевод документации на другие языки осуществляется сообществом по мере наличия сил и желания.

Помните, что переведенные статьи могут быть неактуальными, поэтому для получения наиболее точной и актуальной информации рекомендуем использовать оригинальную англоязычную версию документации.

Соавторы