import { Inject, Injectable } from '@angular/core';
import { EMPTY as ObservableEmpty, fromEvent, Observable, Subject } from 'rxjs';
import { AuthService } from '@app/auth';
import { KeycloakService } from '@app/core/auth/keycloak.service';
import { filter, map } from 'rxjs/operators';
import { generateUUID } from '@dagility-ui/kit';
import { ENV_TOKEN } from '@app/tokens';

@Injectable({
    providedIn: 'root'
})
export class StompService {
    public static readonly CI_CD_PIPELINE_USER_TOPIC = 'cicd-pipeline-topic-';
    public static readonly CI_CD_WORKFLOW_USER_TOPIC = 'cicd-workflow-topic-';
    public static readonly MONITORING_CHANGE_STATE_TOPIC = 'monitoringChangeStateTopic';
    public static readonly CUSTOM_ACTION_STATE_TOPIC = 'customActionStateTopic';
    public static readonly CI_CD_DP_TASK_PIPELINE_TOPIC = 'CiCdDPTaskPipelineTopic';
    public static readonly WC_WIDGET_SELF_TEST_TOPIC = 'wcSelfTestTopic';
    public static readonly WIDGET_SELF_TEST_TOPIC = 'selfTestTopic';
    public static readonly NOTIFICATION_TOPIC = 'notificationTopic';
    public static readonly PROJECT_WIZARD_PROGRESS_TOPIC = 'project-wizard-progress-topic';
    public static readonly PROJECT_WIZARD_RESULT_TOPIC = 'project-wizard-result-topic';
    public static readonly QUERY_BUILDER_SYNC_STATUS_TOPIC = 'querybuilder-sync-statuses';

    public readonly onMessage = new Subject<ServerMessage>();

    public readonly reconnected$ = new Subject<void>();

    sharedWorker: SharedWorker;

    constructor(@Inject(ENV_TOKEN) private env: Env, private authService: AuthService, private keycloakService: KeycloakService) {
        this.authService.onAfterLogin.subscribe(() => {
            this.init();
            this.subscribeOnGenericTopic('user-' + this.authService.getUser().id + '/*');
            this.subscribeOnGenericTopic('broadcast/*');

        });

        this.keycloakService.onRefreshToken$.subscribe(() => {
            this.sharedWorker.port.postMessage({
                type: 'reconnect',
                payload: {
                    token: this.keycloakService.getTokenApi()
                }
            });
            this.reconnected$.next();
        });
    }

    listenTopic(topic: string): Observable<any> {
        if (!this.sharedWorker) {
            return ObservableEmpty;
        }
        const id = generateUUID();
        this.sharedWorker.port.postMessage({
            type: 'listen',
            payload: {
                id,
                 topic
            }
        });

        return new Observable<any>(observer => {
            const subscription = fromEvent(this.sharedWorker.port, 'message')
                .pipe(
                    map((msg: any) => msg.data),
                    filter(msg => msg.topic === topic && msg.id === id),
                )
                .subscribe((p) => observer.next(p));

            return () => {
                this.sharedWorker.port.postMessage({
                    type: 'unlisten',
                    payload: id
                });
                subscription.unsubscribe();
            };
        });
    }

    init() {
        if (!this.sharedWorker) {
            this.sharedWorker = new SharedWorker('/assets/stomp/stomp-shared-worker.js');
            this.sharedWorker.port.start();
            this.sharedWorker.port.postMessage({
                type: 'init',
                payload: {
                    stompApiUrl: this.env.stompApiUrl,
                    token: this.keycloakService.getTokenApi()
                }
            });
        }
    }

    private subscribeOnGenericTopic(topic: string) {
        this.listenTopic(topic).subscribe(data => {
            const topicName: string = data.headers.destination; // like this '/topic/user-123123/type'
            const msg = {
                topicName: topicName.substring(topic.length + 6),
                body: JSON.parse(data.body),
            };
            this.onMessage.next(msg);
        });
    }
}

export interface ServerMessage {
    topicName: string;
    body: any;
}
