Работа с WebSocket в Effector

В этом руководстве мы рассмотрим как правильно организовать работу с WebSocket соединением используя Effector.

WebSocket и типы данных

WebSocket API поддерживает передачу данных в виде строк или бинарных данных (Blob/ArrayBuffer). В этом руководстве мы сфокусируемся на работе со строками, так как это наиболее распространённый случай при обмене данными. При необходимости работы с бинарными данными, можно адаптировать примеры под нужный формат.

Базовая модель

Создадим простую, но рабочую модель WebSocket клиента. Для начала определим основные события и состояния:

import { createStore, createEvent, createEffect, sample } from "effector";

// События для работы с сокетом
const disconnected = createEvent();
const messageSent = createEvent<string>();
const messageReceived = createEvent<string>();

const $connection = createStore<WebSocket | null>(null)
  .on(connectWebSocketFx.doneData, (_, ws) => ws)
  .reset(disconnected);

Создадим эффект для установки соединения:

const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
  const ws = new WebSocket(url);

  const scopeDisconnected = scopeBind(disconnected);
  const scopeMessageReceived = scopeBind(messageReceived);

  return new Promise((res, rej) => {
    ws.onopen = () => {
      res(ws);
    };

    ws.onmessage = (event) => {
      scopeMessageReceived(event.data);
    };

    ws.onclose = () => {
      scopeDisconnected();
    };

    ws.onerror = (err) => {
      scopeDisconnected();
      rej(err);
    };
  });
});

Обратите внимание, что мы использовали здесь функцию scopeBind, чтобы связать юниты с текущим скоупом выполнения, так как мы не знаем когда вызовется scopeMessageReceived внутри socket.onmessage. Иначе событие попадет в глобальный скоуп. Читать более подробно.

Работа в режиме 'без скоупа'

Если вы по какой-то причине работаете в режиме без скоупа, то вам не нужно использовать scopeBind.
Учитывайте, что работа со скоупом это рекомундуемый вариант работы!

Обработка сообщений

Создадим хранилище для последнего полученного сообщения:

const $lastMessage = createStore("");

$lastMessage.on(messageReceived, (_, newMessage) => newMessage);

А также реализуем эффект для отправки сообщения:

const sendMessageFx = createEffect((params: { socket: WebSocket; message: string }) => {
  params.socket.send(params.message);
});

// Связываем отправку сообщения с текущим сокетом
sample({
  clock: messageSent,
  source: $connection,
  filter: Boolean, // Отправляем только если есть соединение
  fn: (socket, message) => ({
    socket,
    message,
  }),
  target: sendMessageFx,
});
Состояния соединения

WebSocket имеет несколько состояний подключения (CONNECTING, OPEN, CLOSING, CLOSED). В базовой модели мы упрощаем это до простой проверки через Boolean, но в реальном приложении может потребоваться более детальное отслеживание состояния.

Обработка ошибок

При работе с WebSocket важно корректно обрабатывать различные типы ошибок для обеспечения надежности приложения.

Расширим нашу базовую модель добавив обработку ошибок:

const TIMEOUT = 5_000;

// Добавляем события для ошибок
const socketError = createEvent<Error>();

const connectWebSocketFx = createEffect((url: string): Promise<WebSocket> => {
  const ws = new WebSocket(url);

  const scopeDisconnected = scopeBind(disconnected);
  const scopeMessageReceived = scopeBind(messageReceived);
  const scopeSocketError = scopeBind(socketError);

  return new Promise((res, rej) => {
    const timeout = setTimeout(() => {
      const error = new Error("Connection timeout");

      socketError(error);
      reject(error);
      socket.close();
    }, TIMEOUT);

    ws.onopen = () => {
      clearTimeout(timeout);
      res(ws);
    };

    ws.onmessage = (event) => {
      scopeMessageReceived(event.data);
    };

    ws.onclose = () => {
      disconnected();
    };

    ws.onerror = (err) => {
      const error = new Error("WebSocket error");
      scopeDisconnected();
      scopeSocketError(error);
      rej(err);
    };
  });
});

// Стор для хранения ошибки
const $error = createStore("")
  .on(socketError, (_, error) => error.message)
  .reset(connectWebSocketFx.done);
Обработка ошибок

Всегда обрабатывайте ошибки WebSocket соединения, так как они могут возникнуть по множеству причин: проблемы с сетью, таймауты, невалидные данные и т.д.

Работа с Socket.IO

Socket.IO предоставляет более высокоуровневый API для работы с WebSocket, добавляя множество полезных возможностей “из коробки”.

Преимущества Socket.IO
  • Автоматическое переподключение
  • Поддержка комнат и пространств имён
  • Fallback на HTTP Long-polling если WebSocket недоступен
  • Встроенная поддержка событий и подтверждений (acknowledgments)
  • Автоматическая сериализация/десериализация данных
import { io, Socket } from "socket.io-client";
import { createStore, createEvent, createEffect, sample } from "effector";

const API_URL = "wss://your.ws.server";

// События
const connected = createEvent();
const disconnected = createEvent();
const socketError = createEvent<Error>();

// Типизация для событий
type ChatMessage = {
  room: string;
  message: string;
  author: string;
};

const messageSent = createEvent<ChatMessage>();
const messageReceived = createEvent<ChatMessage>();
const socketConnected = createEvent();
const connectSocket = createEvent();

const connectFx = createEffect((): Promise<Socket> => {
  const socket = io(API_URL, {
    //... ваша конфигурация
  });

  // нужно для корректной работы со скоупами
  const scopeConnected = scopeBind(connected);
  const scopeDisconnected = scopeBind(disconnected);
  const scopeSocketError = scopeBind(socketError);
  const scopeMessageReceived = scopeBind(messageReceived);

  return new Promise((resolve, reject) => {
    socket.on("connect", () => {
      scopeConnected();
      resolve(socket);
    });

    socket.on("disconnect", () => scopeDisconnected());
    socket.on("connect_error", (error) => scopeSocketError(error));
    socket.on("chat message", (msg: ChatMessage) => scopeMessageReceived(msg));
  });
});

const sendMessageFx = createEffect(
  ({
    socket,
    name,
    payload,
  }: SocketResponse<any> & {
    socket: Socket;
  }) => {
    socket.emit(name, payload);
  },
);

// Состояния
const $socket = createStore<Socket | null>(null)
  .on(connectFx.doneData, (_, socket) => socket)
  .reset(disconnected);

// инициализация подключения
sample({
  clock: connectSocket,
  target: connectFx,
});

// вызываем событие после успешного подключения
sample({
  clock: connectSocketFx.doneData,
  target: socketConnected,
});
Перевод поддерживается сообществом

Документация на английском языке - самая актуальная, поскольку её пишет и обновляет команда effector. Перевод документации на другие языки осуществляется сообществом по мере наличия сил и желания.

Помните, что переведенные статьи могут быть неактуальными, поэтому для получения наиболее точной и актуальной информации рекомендуем использовать оригинальную англоязычную версию документации.

Соавторы