import { Injectable } from '@angular/core';
import { SessionMessage, SessionMessageHandler } from '../common/web-socket/session-message';
import { WebSocketMessageDispatcher } from '../common/web-socket/web-socket-message-dispatcher';
import { CollectionDataEntity } from './collection-manager.service';
import { BehaviorSubject, ReplaySubject, tap, filter, concatMap } from 'rxjs';
import { LiveDataEvent } from '@common/web-socket/session-message';
import { ProcessLiveUpdatesService } from './live-data/process-live-updates.service';

@Injectable({
  providedIn: 'root',
})
export class CollectionManagerLiveDataHandler implements SessionMessageHandler {
  private eventBuffer = new ReplaySubject<CollectionDataEntity[]>(1);
  private bufferedEvents: CollectionDataEntity[][] = [];
  private processSignal = new BehaviorSubject<boolean>(true);

  constructor(
    private websocketMessageDispatcher: WebSocketMessageDispatcher,
    private processLiveUpdatesService: ProcessLiveUpdatesService,
  ) {
    this.init();
  }

  private init() {
    this.websocketMessageDispatcher.registerHandler('LIVE_DATA', this);
    this.startProcessingEvents();
  }

  public async handleMessage(message: SessionMessage) {
    switch (message.action) {
      case 'LIVE_DATA': {
        const changes = message?.liveDataEvents?.events as CollectionDataEntity[];
        console.log('EVENT RECEIVED');
        if (this.processSignal.value) {
          this.eventBuffer.next(changes);
        } else {
          this.bufferedEvents.push(changes);
        }
        break;
      }
    }
  }

  private startProcessingEvents() {
    this.processSignal
      .pipe(
        tap((processing) => console.log('PROCESS SIGNAL', processing)),
        filter((processing) => processing),
        concatMap(() =>
          this.eventBuffer.pipe(
            tap(async (entities: LiveDataEvent[]) => {
              console.log('EVENT PROCESSING');
              await this.processLiveUpdatesService.processLiveUpdates(entities);
            }),
          ),
        ),
      )
      .subscribe();
  }

  public startProcessing() {
    console.log('START PROCESSING');
    this.processSignal.next(true);
    // Flush buffered events
    this.bufferedEvents.forEach((events) => this.eventBuffer.next(events));
    this.bufferedEvents = [];
  }

  public stopProcessing() {
    console.log('STOP PROCESSING');
    this.processSignal.next(false);
  }
}
