import _ from 'lodash';
import angular from 'angular';
import moment from 'moment-timezone';
import pako from 'pako';
import { LoggerService } from '@/services/logger.service';
import { SubscriptionsApi } from 'sdk/api/SubscriptionsApi';
import { MonitorsApi } from 'sdk/api/MonitorsApi';
import { NotificationsService } from '@/services/notifications.service';
import { APP_STATE, APPSERVER_PATHS } from '@/main/app.constants';
import { MESSAGES } from '@/services/socket.constants';
import { UtilitiesService } from '@/services/utilities.service';
import SocketWorker from '@/webWorkers/socket.worker.shim';
import { AsyncResponsesService } from '@/services/asyncResponses.service';
import { PendingRequestsService } from '@/services/pendingRequests.service';
import { SeeqNames } from '@/main/app.constants.seeqnames';

const dependencies = [
  'Sq.Services.Logger'
];

/**
 * Angular service that manages a WebSocket connection to the Seeq Server.
 */
angular.module('Sq.Services.Socket', dependencies)
  .factory('sqSocket', sqSocket);

export type SocketService = ReturnType<typeof sqSocket>;

interface ChannelSubscription {
  /**
   * The URI that identifies the message channel on which to listen
   */
  channelId: string;
  /**
   * The callback to invoke when a message of the specified channel is received
   */
  onMessage: (message: any) => void;
  /**
   * Determines if this subscription should use the subscription API to subscribe and unsubscribe to the channel.
   * Defaults to true because there are only a few channels that the backend auto-subscribes a client to, such as
   * the async response channel. If this is true then when adding a callback the API will also be subscribed and
   * when the last callback for a channel is removed the API will be unsubscribed.
   */
  useSubscriptionsApi?: boolean;
  /**
   * Used when useSubscriptionsApi is true this callback is invoked after any successful API request to subscribe to
   * the channel.
   */
  onSubscribe?: (isReconnect: boolean) => void;
  /**
   * This callback is invoked when a socket closed event is received.
   */
  onClose?: (event: CloseEvent) => void;
  /**
   * This callback is invoked if an error occurs when either subscribing or unsubscribing from the API or if the
   * socket never opens successfully.
   */
  onError?: (error: any, action: string) => void;

  /**
   * A list of parameters associated with this subscription.
   */
  subscriberParameters?: {};
}

interface ChannelSubscriptionInput extends Omit<ChannelSubscription, 'channelId'> {
  channelId: string[];
}

// JSON structure of client-originated messages.
export type ClientMessage = {
  channelId: string,
  data: any,
};

function sqSocket(
  $injector: ng.auto.IInjectorService,
  $location: ng.ILocationService,
  $rootScope: ng.IRootScopeService,
  $q: ng.IQService,
  $interval: ng.IIntervalService,
  sqLogger: LoggerService,
  sqUtilities: UtilitiesService
) {

  let webWorker;
  let livelinessRetry;
  let reconnectStart = moment.invalid();
  let hasMessageBeenReceived = false;
  let errorCount = 0;
  let connectionTimer;
  let isCreated = false;
  let isOpen = false;
  const errorCountMax = 2;
  const channelSubscriptions: ChannelSubscription[] = [];
  const openCallbacks: ((evt: Event) => void)[] = [];
  const closeCallbacks: ((evt: CloseEvent) => void)[] = [];
  const errorStateCallbacks: (() => void)[] = [];

  const service = {
    open,
    close,
    subscribe,
    emit,
    onOpen, // Only for testing
    onClose,
    waitForOpen,
    init,

    /**
     * Indicator if a socket is created. Does not indicate whether the socket is ready.
     */
    get isCreated() {
      return isCreated;
    },

    /**
     * Indicator if a socket is open and ready to send/receive messages.
     */
    get isOpen() {
      return isOpen;
    }
  };

  $rootScope.$on('$destroy', destroy);

  return service;

  /**
   * Main HTML5 message handler for messages from the WebWorker thread. All messages are routed to the correct handler
   * function based on the type specified in the data.
   *
   * @param {Object} message - HTML5 message received from the WebWorker thread
   * @param {Object} message.data - Data payload for this message, as sent from the WebWorker thread
   * @param {String} message.data.type - Type of message, to use for routing to the appropriate handler
   * @param {String} message.data.payload - Data payload for the message (varies by type)
   */
  function onWorkerMessage({ data }: MessageEvent) {
    // WebWorker is outside AngularJS environment, so manually trigger a $digest
    $rootScope.$evalAsync(() => {
      switch (data.type) {
        case MESSAGES.ON_WEBSOCKET_DATA:
          onMessage(data.payload);
          break;
        case MESSAGES.ON_OPEN_COMPLETE:
          onOpenCallback(data.payload);
          break;
        case MESSAGES.ON_CLOSE_COMPLETE:
          onCloseCallback(data.payload);
          break;
        case MESSAGES.ON_ERROR:
          onErrorCallback();
          break;
        case MESSAGES.COMMAND_LOG:
          sqLogger.info(data.payload);
          break;
        default:
          sqLogger.warn(`Unknown message from webworker: ${JSON.stringify(data)}`);
          break;
      }
    });
  }

  /**
   * Initialize SocketWorker. Pulled into a function so service can be properly mocked for testing.
   */
  function init() {
    webWorker = new SocketWorker();
    webWorker.onmessage = onWorkerMessage;
  }

  /**
   * Opens a websocket connection using the specified interactiveId. Any previous websocket connection is automatically
   * closed.
   *
   * @param {string} interactiveId - Interactive ID to use when opening connection. This ID is used to associate HTTP
   *   requests with the websocket from the same client connection.
   * @param {string} csrfToken - The cross-site request forgery token. Necessary because websockets are vulnerable
   * to cross-site attacks since their authentication id is sent via cookie.
   * @returns {Promise} that resolves when the socket request to open has been sent, or rejects if the open fails
   */
  function open(interactiveId, csrfToken) {
    if (service.isCreated) {
      return $q.resolve();
    }

    const interactiveIdParamName = SeeqNames.API.QueryParams.ClientInteractiveSessionId;

    const url = `${$location.protocol() === 'https' ? 'wss' : 'ws'}://${$location.host()}:${$location.port()}` +
      `${APPSERVER_PATHS.WEBSOCKET_EVENTS}?${interactiveIdParamName}=${encodeURIComponent(interactiveId)}`;
    service.close();

    // Register this wait prior to sending the open attempt, so that there's no possibility of missing an open or error
    // response
    const waitingForOpen = service.waitForOpen()
      .then(() => {
        // These channels are automatically created by the backend for the unique interactiveSessionId and should be
        // subscribed for all users
        $injector.get<AsyncResponsesService>('sqAsyncResponses').subscribe(interactiveId);
        $injector.get<PendingRequestsService>('sqPendingRequests').subscribe(interactiveId);
      });

    sendToWorkerThread(MESSAGES.COMMAND_OPEN, { url, csrfToken });
    isCreated = true;

    livelinessRetry = () => {
      $injector.get<MonitorsApi>('sqMonitorsApi')
        .postMeterData({ monitor: 'Network/ClientWebSocket/LivelinessTimeoutExpired' })
        .catch(e => sqLogger.error(sqLogger.format`sqMonitorsApi.postMeterData Error: ${e}`));
      service.close();
      service.open(interactiveId, csrfToken);
    };

    restartConnectionTimer();
    return waitingForOpen;
  }

  /**
   * Closes the existing websocket connection, if open. Does not clear existing subscriptions since they are
   * expected to persist across network interruptions.
   */
  function close() {
    sendToWorkerThread(MESSAGES.COMMAND_CLOSE);

    isCreated = false;
    isOpen = false;
    errorCount = 0;
    stopConnectionTimer();
  }

  /**
   * Subscribes to a particular message channel, returning a function that is used to unsubscribe. It is assumed that
   * socket callbacks should persist across network interruptions. For that reason, if useSubscriptionsApi is
   * true then the user will be subscribed initially and also if the socket disconnects and reconnects. It is the
   * responsibility of the caller to ensure that the unsubscribe callback that is returned is invoked when the
   * subscription is no longer needed. If useSubscriptionsApi is true and the last callback for a channel is being
   * removed, the subscription will also be unsubscribed from the API. If called multiple times with the same
   * definition, the callback will only be registered once.
   *
   * @param definition - The channel subscription definition
   * @returns A callback that unregisters the listener locally and, if useSubscriptionsApi is true, also
   *   from the API.
   */
  function subscribe(definition: ChannelSubscriptionInput): () => void {
    const sqSubscriptionsApi = $injector.get<SubscriptionsApi>('sqSubscriptionsApi');
    const channelId = constructChannel(definition.channelId);
    const channelSubscription: ChannelSubscription = { ...definition, channelId };
    const useSubscriptionsApi = _.defaultTo(definition.useSubscriptionsApi, true);
    const onError = _.defaultTo(definition.onError, _.noop);
    let unsubscribeOnOpen = _.noop;
    let unsubscribeOnClose = _.noop;

    const errorHandler = action => (error) => {
      const errorMessage = sqLogger.format`sqSocket failed to ${action} to: ${channelId}, message: ${error}`;
      // If the caller has already unsubscribed, don't try to unsubscribe again and don't call the onError callback
      if (!_.some(channelSubscriptions, channelSubscription) && action !== 'unsubscribe') {
        sqLogger.warn(`${errorMessage} after unsubscribe`);
        return;
      }

      if (action !== 'unsubscribe') {
        unsubscribeCallback();
      }

      sqLogger.error(errorMessage);
      onError(error, action);
    };

    const onOpen = (isReconnect) => {
      if (useSubscriptionsApi && _.some(channelSubscriptions, { channelId })) {
        sqSubscriptionsApi.subscribe({
            channelId,
            subscriberParameters: channelSubscription.subscriberParameters || {},
          })
          .then(() => {
            if (_.isFunction(channelSubscription.onSubscribe)) {
              return channelSubscription.onSubscribe(isReconnect);
            }
          })
          .catch(errorHandler('subscribe'));
      }
    };

    const unsubscribeCallback = () => {
      _.remove(channelSubscriptions, channelSubscription);
      unsubscribeOnOpen();
      unsubscribeOnClose();
      if (useSubscriptionsApi && !_.some(channelSubscriptions, { channelId })) {
        sqSubscriptionsApi.unsubscribe({ channelId }).catch(errorHandler('unsubscribe'));
      }
    };

    if (_.some(channelSubscriptions, channelSubscription)) {
      return unsubscribeCallback;
    }

    channelSubscriptions.push(channelSubscription);
    $q.resolve(service.isOpen || service.waitForOpen())
      .then(() => {
        // If the subscription was unsubscribed while we were waiting for open, don't take any further action
        if (_.some(channelSubscriptions, channelSubscription)) {
          onOpen(false);
          unsubscribeOnOpen = service.onOpen(() => onOpen(true));
          if (_.isFunction(channelSubscription.onClose)) {
            unsubscribeOnClose = service.onClose(event => channelSubscription.onClose(event));
          }
        }
      })
      .catch(errorHandler('open'));

    return unsubscribeCallback;
  }

  /**
   * Send a message over the socket
   *
   * @param channelId - URI tokens that identifies the message channel
   * @param data - Data to send
   */
  function emit(channelId: string[], data: string | {}) {
    emitClientMessage({ data, channelId: constructChannel(channelId) });
  }

  function emitClientMessage(message: ClientMessage) {
    // Important! If socket isn't connected then this message will be lost!
    // Currently the messages emitted by the client don't contain critical
    // data. If this changes in the future, a queue/buffer may be needed to
    // store messages until the socket is connected
    sendToWorkerThread(MESSAGES.COMMAND_TRANSMIT, JSON.stringify(message));
  }

  /**
   * Construct a channelId by joining encoded parts
   *
   * @param channelIdTokens - Token of channelId to be constructed
   */
  function constructChannel(channelIdTokens: string[]) {
    return `channel://${_.map(channelIdTokens, encodeURIComponent).join('/')}`;
  }

  /**
   * Registers a callback that is called whenever the socket is opened, either during the initial connect or when there
   * is a re-connection after a broken connection.
   *
   * @param {Function} callback - the callback that handles the open event. The callback takes one parameter: the
   * event sent to WebSocket.onopen (https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/onopen)
   * @returns {Function} that unregisters the listener when invoked
   */
  function onOpen(callback) {
    openCallbacks.push(callback);
    return () => _.pull(openCallbacks, callback);
  }

  /**
   * Registers a callback that is called whenever the socket is closed.
   *
   * @param {Function} callback - the callback that handle the close event. The callback takes one parameter: the
   * event sent to WebSocket.onclose (https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent)
   * @returns {Function} that unregisters the listener when invoked
   */
  function onClose(callback) {
    closeCallbacks.push(callback);
    return () => _.pull(closeCallbacks, callback);
  }

  /**
   * Wait for the socket to be created and/or opened.
   *
   * @returns {Promise} resolves when the websocket is open or rejects if the websocket permanently fails to open
   */
  function waitForOpen() {
    if (service.isOpen) {
      return $q.resolve({});
    }
    return new $q((resolve, reject) => {
      const unsubscribes = [
        service.onOpen(() => {
          _.forEach(unsubscribes, u => u());
          resolve();
        }),
        onErrorState(() => {
          _.forEach(unsubscribes, u => u());
          reject();
        })
      ];
    });
  }

  /**
   * Cleans up all resources when destroying the application, closing the websocket and terminating the worker thread.
   */
  function destroy() {
    service.close();
    if (webWorker) {
      webWorker.terminate();
    }

  }

  /**** CONNECTION HANDLERS ****/

  /**
   * Starts a timer that will fire if no data is received via the socket within SOCKET_LIVELINESS_TIMEOUT. If
   * triggered, the timer will close the existing socket and attempt to open a new one.
   */
  function restartConnectionTimer() {
    stopConnectionTimer();
    connectionTimer = $interval(livelinessRetry, SOCKET_LIVELINESS_TIMEOUT, 1);
  }

  /**
   * Cancels any existing connection timer created with restartConnectionTimer.
   */
  function stopConnectionTimer() {
    $interval.cancel(connectionTimer);
    connectionTimer = undefined;
  }

  /**
   * Permanently redirect to the error state with "Websocket unsupported" error
   */
  function redirectToErrorState() {
    const $state = $injector.get<ng.ui.IStateService>('$state');
    $state.go(APP_STATE.LOAD_ERROR, {
      returnState: APP_STATE.LOGIN,
      returnParams: JSON.stringify($state.params),
      header: 'LOAD_ERROR.WEBSOCKET_HEADER',
      message1: 'LOAD_ERROR.WEBSOCKET_MESSAGE1',
      message2: 'LOAD_ERROR.WEBSOCKET_MESSAGE2',
      retryInterval: 0
    });
    _.forEach(_.clone(errorStateCallbacks), callback => callback());
  }

  /**** WEBSOCKET CALLBACK HANDLERS ****/

  /**
   * Registers a callback that is called when we navigate to the error page (when the websocket permanently fails).
   *
   * @param {Function} callback - a callback function (e.g. function())
   * @returns {Function} that unregisters the listener when invoked
   */
  function onErrorState(callback) {
    errorStateCallbacks.push(callback);
    return () => _.pull(errorStateCallbacks, callback);
  }

  /**
   * Service handler for socket onOpen notification
   *
   * @param {Object} event - the event
   */
  function onOpenCallback(event) {
    isOpen = true;
    if (reconnectStart.isValid()) {
      $injector.get<MonitorsApi>('sqMonitorsApi')
        .postMeterData({ monitor: 'Network/ClientWebSocket/Reconnect' })
        .catch(e => sqLogger.error(sqLogger.format`sqMonitorsApi.postMeterData Error: ${e}`));
    }

    reconnectStart = moment.invalid();
    _.forEach(_.clone(openCallbacks), callback => callback(event));
  }

  /**
   * Service handler for socket onClose notification
   *
   * @param {CloseEvent} event
   * @param {Number} event.code - status code for the close event
   * @param {boolean} event.wasClean - true if socket was closed 'cleanly', otherwise false
   */
  function onCloseCallback(event) {
    const { code, wasClean } = event;
    // See https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent for what codes mean
    if (code !== 1000) { // 1000 is a 'normal' close code; anything else indicates we'll try to reconnect
      reconnectStart = moment.utc();
    } else {
      reconnectStart = moment.invalid();
    }

    if (!wasClean) {
      sqLogger.trace(`sqSocket closed unexpectedly (${code})`);
      $injector.get<NotificationsService>('sqNotifications').errorTranslate(
        'NOT_CONNECTED', undefined, undefined, { skipTracking: true });
      service.close();
    }
    _.forEach(_.clone(closeCallbacks), callback => callback(event));
  }

  /**
   * Service handler for socket onError notification.
   */
  function onErrorCallback() {
    errorCount += 1;
    sqLogger.error(`sqSocket error (count: ${errorCount})`);
    // If we haven't successfully received any data over this socket before we get multiple errors, then we assume
    // that this is a result of a "failure to communicate", aka a late-arriving failure of the open request.
    if (!hasMessageBeenReceived && errorCount >= errorCountMax) {
      service.close();
      redirectToErrorState();
    }
  }

  /**
   * Internal processor that calls each registered handler when a message received via the websocket.
   *
   * @param {Blob|String} stringOrBinaryMessage - Gzipped or string JSON message
   */
  function onMessage(stringOrBinaryMessage : Blob | string) {
    hasMessageBeenReceived = true;
    restartConnectionTimer();

    $q.resolve(stringOrBinaryMessage instanceof Blob ? unzipBlob(stringOrBinaryMessage) : stringOrBinaryMessage)
      .then((message) => {
        const jsonMessage = _.attempt(JSON.parse, message);

        if (_.isError(jsonMessage)) {
          sqLogger.error(sqLogger.format`sqSocket message parse error: ${jsonMessage.toString()}, message: ${message}`);
          return;
        }

        const channelId = jsonMessage.channelId;

        if (channelId) {
          _.chain(channelSubscriptions)
            .filter(subscription => channelId === subscription.channelId)
            .forEach(subscription => subscription.onMessage(jsonMessage))
            .value();
        }
      })
      .catch((e) => {
        sqLogger.error(e);
      });
  }

  /**** HELPER FUNCTIONS ****/

  /**
   * Helper method that converts a Blob to an ArrayBuffer and then unzips it.
   *
   * @param {Blob} blob - The blob to convert
   * @return {Promise<String>} - Promise that resolves with the unzipped string
   */
  function unzipBlob(blob: Blob): angular.IPromise<string> {
    return new $q((resolve, reject) => {
      const reader = new FileReader();
      reader.readAsArrayBuffer(blob);
      reader.onerror = reject;
      reader.onabort = reject;
      reader.onloadend = () => {
        try {
          resolve(pako.ungzip(reader.result, { to: 'string' }));
        } catch (e) {
          reject(sqLogger.format`sqSocket message decompress error: ${e}`);
        }
      };
    });
  }

  /**
   * Helper function to send messages to the worker thread
   *
   * @param {string} type - One of the defined message types
   * @param {object|string} [payload] - data payload to send
   */
  function sendToWorkerThread(type: MESSAGES, payload?) {
    if (webWorker) {
      webWorker.postMessage({ type, payload });
    }
  }
}

export const SOCKET_LIVELINESS_TIMEOUT = 5 * 60 * 1000;
