import { HttpClient } from "@angular/common/http";
import { Injectable } from "@angular/core";
import { BehaviorSubject, Subject } from "rxjs";
import {
    GenericRequestDto,
    GenericResponseDto,
    KeyValueDto,
    newSequentialId,
    SharedHelper,
    SyncCommand,
    SyncCommandDto,
    SyncCommandTypes,
    SystemInformationKeys
} from "shield.shared";
import { DatabaseService } from "../services/database.service";
import Dexie from "dexie";

const defaultRequeueSeconds = 30;

@Injectable({
    providedIn: "root"
})
export class DataSyncQueueService {
    constructor(
        private dbService: DatabaseService,
        private http: HttpClient
    ) { }

    private queueItemAdded = new Subject<SyncCommand>();

    get queueItemAdded$() {
        return this.queueItemAdded.asObservable();
    }

    enqueue(message: SyncCommand): Promise<number> {
        const response = this.dbService.syncQueue.add(message);
        this.queueItemAdded.next(message);
        return response;
    }

    /**
     * Returns the number of pending items in the queue, organized by entity type.
     */
    async getPendingSyncCounts(): Promise<{ [key: string]: number }> {
        const returnVal: { [key: string]: number } = {};
        const syncItems = SharedHelper.groupBy(await this.peekQueueEntries(), e => e.entity);
        [...syncItems.entries()].forEach(([command, group]) => returnVal[command] = group.length);
        return returnVal;
    }
    async process<T extends SyncCommand>(
        entity: SyncCommandTypes,
        processor: (command: T) => Promise<void>
    ): Promise<void> {
        let cmd = await this.dequeue<T>(entity);
        while (cmd) {
            try {
                await processor(cmd);
                await this.delete(cmd.id);
            } catch (e) {
                cmd.lastError = e;
                console.error(
                    `Error processing message id ${cmd.id} for entity ${entity}:`,
                    e
                );
                await this.requeue(cmd);
            }
            // next message
            cmd = await this.dequeue<T>(entity);
        }
    }

    async pushCommand<T, K extends SyncCommand>(
        entity: T,
        syncCommand: K,
        params: KeyValueDto[] = []
    ): Promise<void> {
        const command = new SyncCommandDto<T, K>();
        command.id = newSequentialId();
        command.command = syncCommand;
        command.payload = entity;
        command.params = params;

        const request = new GenericRequestDto<SyncCommandDto<T, K>>();
        request.id = command.id;
        request.values = command;

        await this.http
            .post<GenericResponseDto<undefined>>(
                "/api/offline-sync/create",
                request
            )
            .toPromise();
    }

    async requeue(
        message: SyncCommand,
        exponentialRequeueSeconds?: number
    ): Promise<number> {
        const maxRetriesObj = await this.dbService.systemInformation.where("key").equals(SystemInformationKeys.dataSyncQueueMaxRetries)?.first();
        const maxRetries = SharedHelper.parseInt(maxRetriesObj?.value) ?? 10;

        if (message.dequeueCount >= maxRetries) {
            const errorText = `Out of tries attempting to requeue sync message with message id ${message.id}.`;
            console.error(errorText);

            await this.dbService.deadLetterQueue.put(message);
            await this.delete(message.id);
            return;
        }
        const nextVisibleSeconds =
            Math.pow(message.dequeueCount, 2) *
            (exponentialRequeueSeconds ?? defaultRequeueSeconds);
        message.nextVisibleTime = new Date();
        message.nextVisibleTime.setSeconds(
            message.nextVisibleTime.getSeconds() + nextVisibleSeconds
        );
        message.dequeuedAt = null;
        await this.dbService.syncQueue.put(message);
    }

    async dequeue<T extends SyncCommand>(entity: SyncCommandTypes): Promise<T> {
        const messages = await this.dbService.syncQueue
            .where({ entity })
            .sortBy("nextVisibleTime");

        if (!messages || messages.length === 0) return null;

        const message = messages[0];

        const isVisible = message.nextVisibleTime.valueOf() <= Date.now();

        if (!isVisible || message.dequeuedAt !== null) return;

        message.dequeueCount++;
        message.dequeuedAt = new Date();

        await this.dbService.syncQueue.delete(message.id);

        return (message as unknown) as T;
    }

    delete(id: number): Promise<void> {
        return this.dbService.syncQueue.delete(id);
    }

    peekQueueEntries(): Promise<SyncCommand[]> {
        return this.dbService.syncQueue.toArray();
    }

    clearQueue(): Promise<void> {
        return this.dbService.syncQueue.clear();
    }

    // this is used during startup in case the app had been shutdown while something was dequeued
    async requeueAllDequeuedCommandsOnStartup(): Promise<void> {
        const commands = await this.dbService.syncQueue
            .where("dequeuedAt").above(Dexie.minKey)
            .toArray();

        if (commands.length > 0) {
            for (const command of commands) {
                await this.requeue(command);
            }
        }
    }
}
