import { Injectable } from '@angular/core';
import { catchError, map } from 'rxjs/operators';
import { defer, forkJoin, Observable, of } from 'rxjs';

import { DataProcessorState } from '../models/processor-monitoring/data-processor-state.model';
import { ProcessorMonitoringService } from './processor-monitoring.service';
import { polling } from '@dagility-ui/kit';
import { ToolGroup, ToolModel, ToolService } from '@dagility-ui/shared-components';

@Injectable()
export class DataProcessorStateListener {
    constructor(private processorMonitoringService: ProcessorMonitoringService, private toolService: ToolService) {}

    listen(params: DataProcessorStateParams): Observable<ExtendedDataProcessorState> {
        return polling(5000, () => {
            const { full } = params;

            return this.getState(full);
        });
    }

    private getState(full: boolean): Observable<ExtendedDataProcessorState> {
        return defer(() =>
            full
                ? forkJoin([
                      this.processorMonitoringService.getJobDefinitionState(),
                      this.toolService.getAllTools().pipe(map(this.flatResponse)),
                  ])
                : this.processorMonitoringService.getJobDefinitionState().pipe<[DataProcessorState]>(map(state => [state]))
        ).pipe(
            map(([state, tools]) => ({
                state,
                tools,
                full,
            })),
            catchError(() => of({ isError: true } as any))
        );
    }

    private flatResponse(response: any) {
        return ((response.result.content as ToolGroup[]) || []).reduce((acc, group) => [...acc, ...group.plugins], []);
    }
}

export interface DataProcessorStateParams {
    full: boolean;
}

export interface ExtendedDataProcessorState {
    isError: boolean;
    state: DataProcessorState;
    full: boolean;
    tools?: ToolModel[];
}
