import { Observable } from 'rxjs';
import Envelope from '../../common/messaging/envelope';
import SocketSubject from './socketSubject';
import SocketFactory from './socketFactory';
import isProduction from '../../common/utils/isProduction';
import MessagingTopics from '../../common/messaging/messagingTopics';
import LoggerFactory from '../../common/utils/loggerFactory';
import {tap,takeWhile} from 'rxjs/operators';

const logger = LoggerFactory.createLogger('messagingHub.ts');

export default class MessagingHub {
	private static _instance: MessagingHub;
	private _underlyingSocketSubject: SocketSubject;
	private _socketFactory: SocketFactory;
	private _currentStreams = new Map<string, Envelope>();
	private _previousSocketStatus = undefined;

	private constructor(socketFactory: SocketFactory) {
		this._socketFactory = socketFactory;

		this.socketValid$.subscribe(isValid => {
			if ((this._previousSocketStatus === false) && isValid && (this._currentStreams.size > 0)) {
				logger.info('Connection recovered, going to reconnect to streams', this._currentStreams.size);
				this._currentStreams.forEach(envelope => {
					this.getStream(envelope.topic, envelope.payload, envelope.correlationId);
				});
			}
			this._previousSocketStatus = isValid;
		});
	}

	public static getInstance() {
		if (this._instance) {
			return this._instance;
		}
		const protocol = window.location.protocol;
		const wsProtocol = protocol === 'https:' ? 'wss' : 'ws';
		const socketFactory = new SocketFactory(`${wsProtocol}://${window.location.host}`);
		this._instance = new MessagingHub(socketFactory);
		return this._instance;
	}

	//lazy creation of socket
	private get _socketSubject(): SocketSubject {
		if (this._underlyingSocketSubject) {
			return this._underlyingSocketSubject;
		}
		this._underlyingSocketSubject = new SocketSubject(this._socketFactory);
		return this._underlyingSocketSubject;
	}

	public get socketValid$() {
		return this._socketSubject.socketValid$;
	}

	public getStream(topic: string, payload?: any, correlationId?: string): Observable<Envelope> {
		const topicRegistrationMessage: Envelope<string> = Envelope.create(topic, payload, correlationId);
		this._socketSubject.send(topicRegistrationMessage);

		if (!correlationId) {
		    // if this is set it means we are retrying an existing stream -- no need to add it again
            this._currentStreams.set(topicRegistrationMessage.correlationId, topicRegistrationMessage);
        }

		return new Observable(observer => {
		    let isCompleted = false;

			const subscription = this._socketSubject.getMessageStream(topicRegistrationMessage).pipe(
				takeWhile((value: Envelope<{}>) => value.topic !== MessagingTopics.StreamCompleted, true)
			).subscribe(response => {
                if (response.topic === MessagingTopics.StreamCompleted) {
                    this._currentStreams.delete(topicRegistrationMessage.correlationId);
                    isCompleted = true;
                    observer.complete();
                } else {
                    observer.next(response);
                }
            });

			return () => {
				subscription.unsubscribe();
				if (!isCompleted) {
                    this.cancelStream(topicRegistrationMessage.correlationId);
                }
			};
		});
	}

	private cancelStream(correlationId: string): void {
		const topicCancellationMessage: Envelope<string> = Envelope.create(MessagingTopics.UnregisterStream, undefined, correlationId);
		if (this._currentStreams.has(correlationId)) {
			this._currentStreams.delete(correlationId);
		} else {
			logger.warn('Received invalid cancel stream request', topicCancellationMessage);
		}
		this._socketSubject.send(topicCancellationMessage);
	}

	public get(path: string): Observable<any> {
		return new Observable(subscriber => {
			const url = window.location.origin;
			fetch(`${url}/${path}`, {
				headers: new Headers({'content-type': 'application/json'}),
			})
				.then(response => response.json())
				.then(result => {
					console.log('Completed GET request ', result);
					subscriber.next(result);
					subscriber.complete();
				});
		});
	}

	public post(path: string, body: any): Observable<any> {
		return new Observable(subscriber => {
			const url = window.location.origin;
			fetch(`${url}/${path}`, {
				method: 'POST',
				headers: new Headers({'content-type': 'application/json'}),
				redirect: 'manual',
				mode: 'cors',
				credentials: 'include',
				body: JSON.stringify(body)
			})
				.then(response => response.json())
				.then(result => {
					console.log('Completed POST request: ', result);
					subscriber.next(result);
					subscriber.complete();
				});
		});
	}
}
