import { Subject, BehaviorSubject, Observable } from 'rxjs';
import {distinctUntilChanged, filter, takeUntil} from 'rxjs/operators';
import Envelope from '../../common/messaging/envelope';
import SocketFactory from './socketFactory';
import LoggerFactory from '../../common/utils/loggerFactory';

const logger = LoggerFactory.createLogger('socketSubject.ts');

export default class SocketSubject {
	private _socket: WebSocket;
	private _message$: Subject<Envelope<any>>;
	private _socketIsValid$: BehaviorSubject<boolean>;
	private _messageBuffer: Array<Envelope> = [];

	public constructor(private _socketFactory: SocketFactory) {
		this._socketIsValid$ = new BehaviorSubject(undefined);
		this._message$ = new Subject();

		this._setupSocket();
		this._handleDisconnection();
	}

	public get socketValid$() {
		return this._socketIsValid$.pipe(
			distinctUntilChanged()
		);
	}

	public getMessageStream(envelope: Envelope): Observable<Envelope> {
		return this._message$.pipe(
			filter((m: Envelope<any>) => m.correlationId === envelope.correlationId)
		);
	}

	public send = (envelope: Envelope<any>): void => {
		if (this._socketIsValid$.getValue()) {
			this.sendToSocket(envelope);
		} else {
			this.sendToBuffer(envelope);
		}
	}

	private sendToBuffer(envelope: Envelope) {
		this._messageBuffer.push(envelope);
	}

	private sendToSocket(envelope: Envelope) {
		const msg = envelope.serialize();
		logger.info('--> ', envelope);
		this._socket.send(msg);
	}

	private _handleDisconnection(): void {
		let intervalId;

		this._socketIsValid$.pipe(
			distinctUntilChanged()
		)
			.subscribe(isValid => {
				if (!isValid) {
					intervalId = setInterval(() => {
						this._socket && this._socket.close();
						this._setupSocket();
					}, 5000);
				} else {
					if (intervalId) {
						clearInterval(intervalId);
					}
				}
			});
	}

	private _setupSocket(): void {
		this._socket = this._socketFactory.getSocket();

		this._socket.addEventListener('open', () => {
			logger.info('Socket has been opened');
			this._socketIsValid$.next(true);
			this._createMessageStream()
				.pipe(takeUntil(this._socketIsValid$.pipe(filter(x => !x))))
				.subscribe(next => this._message$.next(next), error => this._message$.error(error), () => {});

			this._messageBuffer.forEach(this.send);
			this._messageBuffer = [];
		});

		this._socket.addEventListener('close', () => {
			logger.warn('Socket has been closed');
			this._socketIsValid$.next(false);
		});

		this._socket.addEventListener('error', (e) => {
			logger.error('Socked errored', e);
		});
	}

	private _createMessageStream(): Observable<Envelope<any>> {
		return new Observable(observer => {
			logger.info('Creating new message stream');
			const handler = (event: MessageEvent) => {
				const data: Envelope<any> = Envelope.deserialize(event.data);
				logger.info('<-- ', data);
				observer.next(data);
			};
			this._socket.addEventListener('message', handler);
			return () => {
				logger.info('Unsubscribing from message stream');
				this._socket.removeEventListener('message', handler);
			};
		});
	}
}
