import * as Sentry from "@sentry/react";
import { WsMessageType } from "Enums";
import { errorToast } from "Services";
import { updateEmployeeLive } from "Store/employees";
import { createWsInstance, parseWsMessage, serializeWsMessage } from "Utils";
import { eventChannel, Subscribe, type EventChannel } from "redux-saga";
import {
  all,
  call,
  cancel,
  delay,
  fork,
  put,
  select,
  take,
  takeEvery,
} from "redux-saga/effects";
import { AUTH_ACTION_TYPES } from "../auth/action-types";
import { refreshTokenSaga } from "../auth/sagas";
import { updateDriverLive, updateMachineLive } from "../dashboard/actions";
import { GENERAL_ACTION_TYPES } from "./action-types";
import { updateJobLive } from "./actions";

let socketSubscriptionsQuery: string[] = [];

export function* handleSocketMessage(message: MessageEvent, socket: WebSocket) {
  try {
    const data = parseWsMessage(message);
    if (!data) {
      return;
    }

    switch (data.t) {
      case WsMessageType.ONLINE_QUERY:
        yield put(updateEmployeeLive(data));
        break;

      case WsMessageType.MACHINES:
        yield put(updateMachineLive(data));
        break;

      case WsMessageType.JOB:
        yield put(updateJobLive(data));
        break;

      case WsMessageType.DRIVERS:
        yield put(updateDriverLive(data));
        break;

      case WsMessageType.AUTHENTICATED:
        socketSubscriptionsQuery.forEach((message) => socket.send(message));
        socketSubscriptionsQuery = [];
        break;

      case WsMessageType.NOT_AUTHORIZED:
      case WsMessageType.ABOUT_TO_EXPIRE:
        yield call(refreshTokenSaga);
        break;

      case WsMessageType.ERROR:
        errorToast(undefined, undefined, Error(data.payload));
        break;

      default:
        break;
    }
  } catch (_) {
    // do not handle
  }
}

export const createSocketChannel = (
  socketInstance: WebSocket,
): EventChannel<Event> => {
  const subscribe: Subscribe<Event> = (emitter) => {
    socketInstance.addEventListener("open", emitter);
    socketInstance.addEventListener("close", emitter);
    socketInstance.addEventListener("message", emitter);
    socketInstance.addEventListener("error", emitter);
    return () => {
      socketInstance.close();
    };
  };

  return eventChannel(subscribe);
};

export function* openSocketConnection(socket: WebSocket, token: string) {
  const authMessage = serializeWsMessage(
    WsMessageType.AUTHENTICATE,
    token,
    undefined,
  );

  yield call([socket, socket.send], authMessage);
}

export function* changeSubscriptionsSaga(socket: WebSocket) {
  while (true) {
    const {
      payload: { subscriptionMessage },
    } = yield take(GENERAL_ACTION_TYPES.CHANGE_WS_SUBSCRIPTION);

    if (socket.readyState !== 1) {
      socketSubscriptionsQuery.push(subscriptionMessage);
      continue;
    }

    yield call([socket, socket.send], subscriptionMessage);
  }
}

export function* reAuthorizeWsSaga(socket: WebSocket) {
  while (true) {
    const {
      payload: { accessToken },
    } = yield take(AUTH_ACTION_TYPES.REFRESH_TOKEN_SUCCESS);

    if (socket.readyState === 1) {
      const authMessage = serializeWsMessage(
        WsMessageType.AUTHENTICATE,
        accessToken,
        undefined,
      );

      yield call([socket, socket.send], authMessage);
      continue;
    }

    yield call(controlWsSaga);
  }
}

export function* openWsSaga() {
  let socketChannel: EventChannel<WebSocket> | undefined;

  while (true) {
    try {
      const store = yield select();
      const authReducer = store.auth;
      const socketInstance = yield call(createWsInstance);
      socketChannel = yield call(createSocketChannel, socketInstance);

      yield fork(changeSubscriptionsSaga, socketInstance);

      yield fork(reAuthorizeWsSaga, socketInstance);

      messageLoop: while (true) {
        const message = yield take(socketChannel!);

        switch (message.type) {
          case "open":
            yield call(
              openSocketConnection,
              socketInstance,
              authReducer.tokens.accessToken,
            );
            break;
          case "message":
            yield call(handleSocketMessage, message, socketInstance);
            break;
          case "close":
            break messageLoop;
          case "error":
            throw new Error(message.data);
          default:
            break;
        }
      }
    } catch (error) {
      Sentry.captureException(error);
    } finally {
      if (socketChannel) socketChannel.close();
    }
    // In case the web socket was closed, we would next reestablish the connection,
    // but we'll wait 30s so that we do not flood anything in case there is a permanent error.
    yield delay(30000);
  }
}

export function* controlWsSaga() {
  const sagaTask = yield fork(openWsSaga);

  yield take(GENERAL_ACTION_TYPES.CLOSE_WEB_SOCKET);

  yield cancel(sagaTask);
}

export default function* generalSaga() {
  yield all([takeEvery(GENERAL_ACTION_TYPES.OPEN_WEB_SOCKET, controlWsSaga)]);
}
