import { BulkDownloaderLogger } from '../logger';
import { MergedAbortController } from '../merge-abort-controller';
import { assertNotAborted, awaitUntil } from '../promise-utils';
import { StreamPortalSink } from './sink';
import { StreamPortalSource } from './source';

export interface MessagableTarget {
  postMessage(message: unknown, transfer?: Transferable[]): void;
}

export interface MessageStreamPortalExtra {
  event: Event;
}

interface MessageStreamPortalMessage<TExtra = void> {
  type: 'message-stream-portal';
  streamId: string;
  stream: ReadableStream;
  extra: TExtra;
}

interface MessageStreamPortalMessageResponse {
  type: 'message-stream-portal-response';
  streamId: string;
}

export class MessageStreamPortalSource<TExtra = void> implements StreamPortalSource<TExtra> {
  protected readonly disposeController = new AbortController();
  protected readonly responses = new Map<string, PromiseWithResolvers<void>>();

  constructor(
    protected readonly target: EventTarget & MessagableTarget,
    protected readonly logger: BulkDownloaderLogger = console,
  ) {
    this.target.addEventListener('message', (event) => this.handleMessage(event as MessageEvent), {
      signal: this.disposeController.signal,
    });
  }

  async send(stream: ReadableStream, extra: TExtra): Promise<void> {
    const streamId = this.createStreamId();
    const response = Promise.withResolvers<void>();
    this.responses.set(streamId, response);

    this.logger.debug('MessageStreamPortalSource Sending stream', { streamId, extra });
    this.target.postMessage(
      {
        type: 'message-stream-portal',
        streamId,
        stream,
        extra,
      } satisfies MessageStreamPortalMessage<TExtra>,
      [stream],
    );

    this.logger.debug('MessageStreamPortalSource Waiting ACK', { streamId, extra });
    await awaitUntil(response.promise, this.disposeController.signal);
    this.logger.debug('MessageStreamPortalSource ACK received', { streamId, extra });
  }

  dispose() {
    this.responses.clear();
    this.disposeController.abort(new Error('Disposed'));
  }

  protected handleMessage(event: MessageEvent) {
    if (!this.isResponseMessage(event)) {
      return;
    }

    this.responses.get(event.data.streamId)?.resolve();
    this.responses.delete(event.data.streamId);
  }

  protected isResponseMessage(
    event: MessageEvent<unknown>,
  ): event is MessageEvent<MessageStreamPortalMessageResponse> {
    return (
      typeof event.data === 'object' &&
      !!event.data &&
      'type' in event.data &&
      event.data.type === 'message-stream-portal-response'
    );
  }

  protected createStreamId(): string {
    return crypto.randomUUID();
  }
}

export class MessageStreamPortalSink<TExtra = void> implements StreamPortalSink<TExtra> {
  protected readonly disposeController = new AbortController();

  constructor(
    protected readonly target: EventTarget,
    protected readonly logger: BulkDownloaderLogger = console,
    protected readonly eventBus = new EventTarget(),
  ) {
    this.target.addEventListener('message', (event) => this.handleMessage(event as MessageEvent), {
      signal: this.disposeController.signal,
    });
  }

  onStream(
    cb: (stream: ReadableStream, extra: TExtra & MessageStreamPortalExtra) => void,
  ): () => void {
    assertNotAborted(this.disposeController.signal);

    const cancelCtrl = new MergedAbortController(this.disposeController.signal);

    this.eventBus.addEventListener(
      WorkerStreamEvent.eventName,
      (event: Event) =>
        cb((event as WorkerStreamEvent<TExtra>).stream, {
          ...(event as WorkerStreamEvent<TExtra>).extra,
          event: (event as WorkerStreamEvent<TExtra>).event,
        }),
      { signal: cancelCtrl.signal },
    );

    return () => cancelCtrl.abort();
  }

  dispose() {
    this.disposeController.abort();
  }

  protected async handleMessage(event: MessageEvent) {
    if (!this.isStreamMessage(event)) {
      return;
    }

    this.logger.debug('MessageStreamPortalSink Received stream', { extra: event.data.extra });
    this.eventBus.dispatchEvent(new WorkerStreamEvent(event.data.stream, event.data.extra, event));

    this.logger.debug('MessageStreamPortalSink Sending ACK', { extra: event.data.extra });
    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    event.source!.postMessage({
      type: 'message-stream-portal-response',
      streamId: event.data.streamId,
    });
  }

  protected isStreamMessage(
    event: MessageEvent<unknown>,
  ): event is MessageEvent<MessageStreamPortalMessage<TExtra>> {
    return (
      typeof event.data === 'object' &&
      !!event.data &&
      'type' in event.data &&
      event.data.type === 'message-stream-portal'
    );
  }
}

export class WorkerStreamEvent<TExtra = void> extends Event {
  static readonly eventName = 'stream';
  constructor(
    readonly stream: ReadableStream,
    readonly extra: TExtra,
    readonly event: Event,
  ) {
    super(WorkerStreamEvent.eventName);
  }
}
