import { Injectable } from '@angular/core';
import * as io from 'socket.io-client';
import { environment } from 'src/environments/environment';
import { HttpClient } from '@angular/common/http';
import { Observable, EMPTY, fromEvent, Subscription, Subject } from 'rxjs';
import { tap, map, switchMap, take } from 'rxjs/operators';

export interface Channel {
  type: ChannelType;
  token?: string;
  subscription?: Subscription;
  url?: string;
  renewing?: boolean;
  // Optional formatter for socket messages.
  formatter?;
}

export enum ChannelType {
  CHARGE_POINT,
  USER,
  ROAMING_SESSION,
  TRANSACTION_PRICES,
}

@Injectable({ providedIn: 'root' })
export class WebsocketService {
  private websocketSubject = new Subject();
  public websocketMessages$ = this.websocketSubject.asObservable();

  private chargePointSubject = new Subject();
  public chargePointMessages$ = this.chargePointSubject.asObservable();

  private userSubject = new Subject();
  public userMessages$ = this.userSubject.asObservable();

  private transactionSubject = new Subject();
  public transactionMessages$ = this.transactionSubject.asObservable();

  private roamingSessionSubject = new Subject();
  public roamingSessionMessages$ = this.roamingSessionSubject.asObservable();

  private socket: SocketIOClient.Socket;
  private socketId = new Subject();
  private inited = false;
  private connectionCheckInterval;

  // private socketId$ = this.socketId.asObservable();

  private userChannel: Channel = {
    type: ChannelType.USER,
  };

  private transactionChannel: Channel = {
    type: ChannelType.TRANSACTION_PRICES,
  };

  private chargePointChannel: Channel = {
    type: ChannelType.CHARGE_POINT,
  };

  private roamingSessionChannel: Channel = {
    type: ChannelType.ROAMING_SESSION,
    formatter: function (message) {
      return {
        chargeBoxId: message.EvseID,
        chargeBoxUniqueId: message.EvseID,
        energy: message.ConsumedEnergyInWh,
        id: message.SessionID,
        price: { price: { total: message.Total || 0 } },
        roaming: true,
        timestampStart: message.SessionStart,
        timestampStop: message.SessionEnd,
        transactionId: message.SessionID,
      };
    },
  };

  constructor(private http: HttpClient) {}

  private getChannel(type: ChannelType) {
    let channel;
    switch (type) {
      case ChannelType.CHARGE_POINT:
        channel = this.chargePointChannel;
        break;
      case ChannelType.USER:
        channel = this.userChannel;
        break;
      case ChannelType.ROAMING_SESSION:
        channel = this.roamingSessionChannel;
        break;
      case ChannelType.TRANSACTION_PRICES:
        channel = this.transactionChannel;
        break;
    }
    return channel;
  }

  private getChannelSubject(type: ChannelType) {
    let channelSubject;
    switch (type) {
      case ChannelType.CHARGE_POINT:
        channelSubject = this.chargePointSubject;
        break;
      case ChannelType.USER:
        channelSubject = this.userSubject;
        break;
      case ChannelType.ROAMING_SESSION:
        channelSubject = this.roamingSessionSubject;
        break;
      case ChannelType.TRANSACTION_PRICES:
        channelSubject = this.transactionSubject;
        break;
    }
    return channelSubject;
  }

  /**
   * Create socket io channel for charge point or user.
   * @param type socket channel type. 0 = Charge Point , 1 = User
   * @param url websocket url, where socket token is fetched
   */
  public createChannel(type: ChannelType, url: string) {
    // If socket has no connection, then wait for it. (If socket has no id, that means the connection is not ready)
    if (!this.socket.id) {
      this.socketId.pipe(take(1)).subscribe(() => {
        this.establishChannel(type, this.socket.id, url);
      });
    } else {
      this.establishChannel(type, this.socket.id, url);
    }
  }

  /**
   * Initialize socket channel values and establish socket channel connection
   * @param type socket channel type.  0 = Charge Point , 1 = User
   * @param url websocket url, where socket token is fetched
   */
  private establishChannel(type: ChannelType, socketId: string, url: string) {
    const channel: Channel = this.getChannel(type);
    const channelSubject: Subject<object> = this.getChannelSubject(type);

    channel.url = url;
    channel.renewing = true;
    const subscription = this.getSocketToken(url, socketId)
      .pipe(
        tap((token) => {
          this.destroyChannel(type);
          channel.subscription = subscription;
          this.setSocketToken(token, type);
          channel.renewing = false;
        }),
        switchMap((token) => this.listenChannel<any>(token, type))
      )
      .subscribe((message) => {
        if (message['messageType'] !== 'Alert') {
          if (channel.formatter) {
            message = channel.formatter(message);
          }
          channelSubject.next(message);
        }
      });
  }

  /**
   * Get Socket token for channel
   * @param socketUrl socket url where token is fetched
   * @param socketId current socket.io connection id. This should be known.
   */
  private getSocketToken(
    socketUrl: string,
    socketId: string
  ): Observable<string> {
    if (!socketId) {
      return EMPTY;
    } else {
      return this.http
        .post(`${socketUrl}`, { socketId })
        .pipe(map((res) => res['token']));
    }
  }

  /**
   * Removes channel listeners for old token and sets new token for channel
   * @param token Backend Socket token
   * @param type Channel type
   */
  private setSocketToken(token: string, type: ChannelType) {
    const channel = this.getChannel(type);

    this.socket.emit('unsubscribe', channel.token);
    this.socket.removeListener(channel.token);

    channel.token = token;
  }

  /**
   * Setup socket.io websocket connection
   */
  public setupConnection() {
    this.socket = io(environment.SOCKET_URL, {
      transports: ['websocket'],
      reconnection: true,
      reconnectionDelay: 1000,
    });

    // This happens on socket connect
    this.socket.on('connect', () => {
      console.log('socket connected');
      this.socketId.next(this.socket.id);
      if (this.inited) {
        this.renewChannels();
      } else {
        this.connectionCheckInterval = setInterval(() => {
          this.isConnected();
        }, 5000);
      }
      this.inited = true;
    });
    this.socket.on('connect_error', (error) => {
      console.error(error);
    });
    this.socket.on('connect_failed', (error) => {
      console.error(error);
    });
    this.socket.on('connect_timeout', (timeout) => {
      console.error(timeout);
    });
    this.socket.on('error', (error) => {
      console.error(error);
    });
    this.socket.on('disconnect', (disconnect) => {
      console.error(disconnect);
    });
    this.socket.on('reconnect', (reconnect) => {
      console.log('socket reconnected');
      this.renewChannels();
    });
  }

  public destroyChannel(type: ChannelType) {
    const channel = this.getChannel(type);
    if (channel.subscription) {
      channel.subscription.unsubscribe();
      channel.subscription = undefined;
    }
    if (channel.token) {
      this.socket.emit('unsubscribe', channel.token);
      this.socket.removeListener(channel.token);
      channel.token = undefined;
    }
  }

  renewChannels() {
    if (this.socket) {
      if (this.userChannel.token) {
        this.socket.emit('unsubscribe', this.userChannel.token);
      }
      if (this.transactionChannel.token) {
        this.socket.emit('unsubscribe', this.transactionChannel.token);
      }
      if (this.userChannel.subscription) {
        this.userChannel.subscription.unsubscribe();
        this.userChannel.subscription = null;
        this.createChannel(ChannelType.USER, this.userChannel.url);
      }
      if (this.transactionChannel.subscription) {
        this.transactionChannel.subscription.unsubscribe();
        this.transactionChannel.subscription = null;
        this.createChannel(
          ChannelType.TRANSACTION_PRICES,
          this.transactionChannel.url
        );
      }
      if (this.chargePointChannel.token) {
        this.socket.emit('unsubscribe', this.chargePointChannel.token);
      }
      if (this.chargePointChannel.subscription) {
        this.chargePointChannel.subscription.unsubscribe();
        this.chargePointChannel.subscription = null;
        this.createChannel(
          ChannelType.CHARGE_POINT,
          this.chargePointChannel.url
        );
      }
      if (this.roamingSessionChannel.token) {
        this.socket.emit('unsubscribe', this.roamingSessionChannel.token);
      }
      if (this.roamingSessionChannel.subscription) {
        this.roamingSessionChannel.subscription.unsubscribe();
        this.roamingSessionChannel.subscription = null;
        this.createChannel(
          ChannelType.ROAMING_SESSION,
          this.roamingSessionChannel.url
        );
      }
    }
  }

  private renewChannelAfterAlert(type: ChannelType) {
    const channel: Channel = this.getChannel(type);

    if (!channel.renewing) {
      channel.renewing = true;
      this.createChannel(type, channel.url);
    }
  }

  private listenChannel<T>(
    eventToken: string,
    type: ChannelType
  ): Observable<T> {
    return fromEvent<T>(this.socket, eventToken).pipe(
      tap((data) => {
        if (data['messageType'] && data['messageType'] === 'Alert') {
          this.renewChannelAfterAlert(type);
        }
      })
    );
  }

  /**
   * Check if the socket is connected. If not, connect it manually.
   * Connect will trigger channel renewing.
   */
  isConnected() {
    if (!this.socket.connected) {
      this.socket.disconnect();
      this.socket.connect();
    }
  }

  /**
   * Call this only at app destroy.
   */
  public destroySocket() {
    clearInterval(this.connectionCheckInterval);
    Object.keys(ChannelType).forEach((type) => {
      this.destroyChannel(ChannelType[type]);
    });
    this.socket.disconnect();
    this.socket.removeAllListeners();
  }
}
