import '@main/polyfills/promise-with-resolvers';

import { BulkDownloaderLogger } from '../logger';
import { MergedAbortController, mergeSignals } from '../merge-abort-controller';
import { assertNotAborted, awaitTimeout, sleep } from '../promise-utils';
import { MessagableTarget, MessageStreamPortalExtra, WorkerStreamEvent } from './message';
import { StreamPortalSink } from './sink';
import { StreamPortalSource } from './source';

declare global {
  interface ReadableStream<R> {
    [Symbol.asyncIterator](): AsyncIterableIterator<R>;
  }
}

interface MessageChannelStreamData<TExtra> {
  id?: string;
  streamId?: string;
  data?: unknown;
  close?: boolean;
  error?: unknown;
  extra?: TExtra;
}

interface MessageChannelStreamResponse {
  id: string;
  streamId?: string;
  error?: unknown;
}

interface MessageChannelTargetData {
  type: 'message-channel-target';
}

export class MessageChannelStreamPortalSource<TExtra = void> implements StreamPortalSource<TExtra> {
  static CHANNEL_TIMEOUT = 10_000;
  protected readonly channel: MessagePort;
  protected readonly disposeController = new AbortController();
  protected readonly responses = Object.create(null) as Record<
    string,
    PromiseWithResolvers<MessageChannelStreamResponse> | undefined
  >;
  protected readonly disposed = Promise.withResolvers<never>();
  protected readonly channelReady;
  protected lastResponse: Promise<unknown> = Promise.resolve();
  protected lastMessageId = 0n;
  protected isDisposing = false;

  constructor(
    protected readonly target: MessagableTarget,
    protected readonly logger: BulkDownloaderLogger = console,
    protected readonly channelTimeout = MessageChannelStreamPortalSource.CHANNEL_TIMEOUT,
  ) {
    this.disposeController.signal.addEventListener(
      'abort',
      () => this.disposed.reject(this.disposeController.signal.reason),
      { once: true },
    );

    const channel = new MessageChannel();

    this.channel = channel.port1;
    this.channel.addEventListener(
      'message',
      (event: MessageEvent<MessageChannelStreamResponse>) => this.handleChannelMessage(event),
      { signal: this.disposeController.signal },
    );
    this.logger.debug('MessageChannelStreamPortalSource Starting own port');
    this.channel.start();

    this.logger.debug('MessageChannelStreamPortalSource Sending second port to target');
    this.target.postMessage({ type: 'message-channel-target' } as MessageChannelTargetData, [
      channel.port2,
    ]);

    this.channelReady = this.sendMessageAwait({});
  }

  async send(stream: ReadableStream, extra: TExtra): Promise<void> {
    assertNotAborted(this.disposeController.signal);

    await Promise.race([this.channelReady, this.disposed.promise]);

    const streamId = this.createStreamId();

    this.logger.debug('MessageChannelStreamPortalSource Sending stream', { streamId, extra });

    try {
      // Init stream on target
      await this.sendMessageAwait({ streamId, extra });

      // Start sending stream data without blocking to allow for data pumping
      // eslint-disable-next-line @typescript-eslint/no-floating-promises
      this.sendStream(streamId, stream);
    } catch (error) {
      await this.handleStreamError(error, stream, streamId);
    }
  }

  async dispose() {
    if (this.disposeController.signal.aborted || this.isDisposing) {
      return;
    }
    this.isDisposing = true;

    this.logger.debug('MessageChannelStreamPortalSource Disposing');

    try {
      await this.sendCloseAll();
    } finally {
      this.abort(new Error('Resource is disposed'));
    }
  }

  protected async handleStreamError(error: unknown, stream: ReadableStream, streamId: string) {
    if (this.disposeController.signal.aborted) {
      return;
    }

    if (error instanceof TargetError) {
      await stream.cancel(error.error);
      throw error.error;
    }

    this.logger.error('MessageChannelStreamPortalSource Error', { streamId }, error);
    await stream.cancel(error);
    await this.sendMessageAwait({ streamId, error });
  }

  protected abort(reason?: unknown) {
    this.logger.debug('MessageChannelStreamPortalSource Aborting', reason);
    this.disposeController.abort(reason);
    this.channel.close();
  }

  protected async sendCloseAll() {
    await this.sendMessageAwait({ close: true });
  }

  protected async sendStream(streamId: string, stream: ReadableStream) {
    try {
      this.logger.debug('MessageChannelStreamPortalSource Sending stream', { streamId });

      // Safari does not implement async iterator on ReadableStream
      if (Symbol.asyncIterator in stream === false) {
        await this.sendStreamLegacy(streamId, stream);
      } else {
        for await (const data of stream) {
          await this.sendMessageAwait({ streamId, data });
        }
      }

      await this.sendMessageAwait({ streamId, close: true });
    } catch (error) {
      this.handleStreamError(error, stream, streamId).catch(() => undefined);
    }
  }

  protected async sendStreamLegacy(streamId: string, stream: ReadableStream) {
    const reader = stream.getReader();
    let result: ReadableStreamReadResult<unknown>;

    try {
      do {
        result = await Promise.race([reader.read(), this.disposed.promise]);

        if (!result.done) {
          await this.sendMessageAwait({ streamId, data: result.value });
        }
      } while (!result.done);
    } finally {
      reader.releaseLock();
    }
  }

  protected async sendMessageAwait(data: MessageChannelStreamData<TExtra>) {
    assertNotAborted(this.disposeController.signal);

    const id = String(++this.lastMessageId);
    const response = (this.responses[id] = Promise.withResolvers());
    const result = Promise.race([response.promise, this.disposed.promise]);

    const lastResponse = this.lastResponse;
    this.lastResponse = result;

    await lastResponse;
    this.logger.debug('MessageChannelStreamPortalSource Sending message awaiting', { id });
    this.sendMessage({ ...data, id });

    // If channel does not respond in time - we assume the download has failed
    await awaitTimeout(
      result,
      this.channelTimeout,
      () => new TargetError(new Error('ChannelTimeout')),
    );
  }

  protected sendMessage(data: MessageChannelStreamData<TExtra>) {
    const transferables: Transferable[] = [];

    if (data.data && ArrayBuffer.isView(data.data)) {
      // Transfer underlying ArrayBuffer of TypedArray
      transferables.push(data.data.buffer);
    }

    this.logger.debug('MessageChannelStreamPortalSource Sending message', data);
    this.channel.postMessage(data, transferables);
  }

  protected async handleChannelMessage({ data }: MessageEvent<MessageChannelStreamResponse>) {
    if (data.id === undefined && data.error) {
      if (this.isDisposing && data.error instanceof Error && data.error.message === 'Closed') {
        // This is a graceful shutdown error when disposing the target
        this.logger.debug('MessageChannelStreamPortalSource Shutdown confirmed', data);
        return;
      }

      this.logger.debug('MessageChannelStreamPortalSource Received error', data);
      this.abort(new TargetError(data.error));
      return;
    }

    const response = this.responses[data.id];

    if (!response) {
      return;
    }

    this.logger.debug('MessageChannelStreamPortalSource Received response', data);

    if (data.error) {
      response.reject(new TargetError(data.error));
    } else {
      response.resolve(data);
    }

    delete this.responses[data.id];
  }

  protected createStreamId(): string {
    return crypto.randomUUID();
  }
}

export class MessageChannelStreamPortalSink<TExtra = void> implements StreamPortalSink<TExtra> {
  protected readonly disposeController = new AbortController();
  protected readonly handlersDisposed: Promise<void>[] = [];

  constructor(
    protected readonly target: EventTarget,
    protected readonly logger: BulkDownloaderLogger = console,
    protected readonly eventBus = new EventTarget(),
  ) {
    this.target.addEventListener('message', (event) => this.handleMessage(event), {
      signal: this.disposeController.signal,
    });
  }

  onStream(
    cb: (stream: ReadableStream, extra: TExtra & MessageStreamPortalExtra) => void,
  ): () => void {
    assertNotAborted(this.disposeController.signal);

    const cancelCtrl = new MergedAbortController(this.disposeController.signal);

    this.eventBus.addEventListener(
      WorkerStreamEvent.eventName,
      (event: Event) =>
        cb((event as unknown as WorkerStreamEvent<TExtra>).stream, {
          ...(event as WorkerStreamEvent<TExtra>).extra,
          event: (event as WorkerStreamEvent<TExtra>).event,
        }),
      { signal: cancelCtrl.signal },
    );

    return () => cancelCtrl.abort();
  }

  async dispose() {
    if (this.disposeController.signal.aborted) {
      return;
    }

    this.logger.debug('MessageChannelStreamPortalSink Disposing');
    this.disposeController.abort(new Error('Resource is disposed'));
    await Promise.all(this.handlersDisposed);
    this.logger.debug('MessageChannelStreamPortalSink Disposed');
  }

  protected handleMessage(e: Event) {
    const event = e as MessageEvent<MessageChannelTargetData>;

    if (event.data?.type !== 'message-channel-target') {
      return;
    }

    if (event.ports.length === 0) {
      this.logger.error('MessageChannelStreamPortalSink Channel port is not received!');
      return;
    }

    this.logger.debug(
      'MessageChannelStreamPortalSink Creating MessageChannelHandler for channel ports',
      { receivedPorts: event.ports.length },
    );

    const handler = new MessageChannelHandler<TExtra>(
      event,
      this.eventBus,
      this.disposeController.signal,
      this.logger,
    );

    event.ports.forEach((port) => this.handlersDisposed.push(handler.handle(port)));
  }
}

export class MessageChannelHandler<TExtra = void> {
  protected readonly disposeController = new AbortController();
  protected readonly disposed = Promise.withResolvers<void>();
  protected readonly writers = Object.create(null) as Record<string, WritableStreamDefaultWriter>;

  constructor(
    protected readonly sourceEvent: MessageEvent<MessageChannelTargetData>,
    protected readonly eventBus: EventTarget,
    abortSignal: AbortSignal,
    protected readonly logger: BulkDownloaderLogger = console,
  ) {
    this.handleMessage = this.handleMessage.bind(this);

    abortSignal.addEventListener('abort', () => this.dispose(abortSignal.reason), {
      signal: this.disposeController.signal,
    });

    if (sourceEvent instanceof ExtendableMessageEvent) {
      this.logger.debug('MessageChannelHandler Extending event until disposed');
      sourceEvent.waitUntil(
        this.whenDisposed().finally(() =>
          this.logger.debug('MessageChannelHandler Event handler disposed'),
        ),
      );
    }
  }

  async handle(port: MessagePort, signal?: AbortSignal) {
    port.addEventListener('message', this.handleMessage, {
      signal: mergeSignals(this.disposeController.signal, signal),
    });

    try {
      port.start();
      await this.whenDisposed();
    } finally {
      // Allow responses to be sent before closing the port
      await sleep(10);
      port.close();
    }
  }

  whenDisposed() {
    return this.disposed.promise.catch(() => undefined);
  }

  protected async handleMessage(event: MessageEvent<MessageChannelStreamData<TExtra>>) {
    const port = event.target as MessagePort;
    const data = event.data;
    const response = {
      id: data.id,
      streamId: data.streamId,
    } as Partial<MessageChannelStreamResponse>;
    let writer: WritableStreamDefaultWriter | undefined;

    try {
      if (data.close && !data.streamId) {
        this.logger.debug('MessageChannelHandler Recived command to close all streams on channel');
        await this.dispose(new Error('Closed'));
        return;
      }

      if (!data.streamId) {
        return;
      }

      writer = this.writers[data.streamId];

      if (!writer) {
        this.logger.debug('MessageChannelHandler Creating new stream', { streamId: data.streamId });
        const stream = new TransformStream();
        writer = this.writers[data.streamId] = stream.writable.getWriter();

        // Monitor state of the destination without blocking message handling
        // eslint-disable-next-line @typescript-eslint/no-floating-promises
        Promise.race([
          writer.closed.finally(() => {
            if (this.disposeController.signal.aborted) {
              return;
            }
            this.logger.debug('MessageChannelHandler Stream destination closed', {
              streamId: data.streamId,
            });
            port.postMessage({
              error: new Error('DestinationClosed'),
              streamId: data.streamId,
            });
          }),
          this.disposed.promise.catch((error: unknown) => {
            if (this.disposeController.signal.aborted) {
              return;
            }
            this.logger.debug('MessageChannelHandler Disposed', { streamId: data.streamId }, error);
            port.postMessage({ error, streamId: data.streamId });
          }),
        ])
          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
          .finally(() => delete this.writers[data.streamId!]);

        this.eventBus.dispatchEvent(
          new WorkerStreamEvent(stream.readable, data.extra, this.sourceEvent),
        );
      }

      if (data.error) {
        this.logger.error('MessageChannelHandler Received abort command', data, data.error);
        await writer.abort(data.error);
        writer.releaseLock();
        return;
      }

      if (data.close) {
        this.logger.debug('MessageChannelHandler Received command to close stream', {
          streamId: data.streamId,
        });
        await writer.ready;
        await writer.close();
        writer.releaseLock();
        return;
      }

      if (data.data) {
        this.logger.debug('MessageChannelHandler Writing data to stream', {
          streamId: data.streamId,
        });
        await writer.ready;
        await writer.write(data.data);
        this.logger.debug('MessageChannelHandler Data written to stream', {
          streamId: data.streamId,
        });
      }
    } catch (error) {
      this.logger.error('MessageChannelHandler Error during stream operation', data, error);
      try {
        await writer?.abort(error);
        writer?.releaseLock();
      } catch {
        /* empty */
      }
      response.error = error;
    } finally {
      if (response.id) {
        port.postMessage(response);
      }
    }
  }

  protected async dispose(reason: unknown = new Error('Disposed')) {
    if (this.disposeController.signal.aborted) {
      return;
    }

    this.logger.debug('MessageChannelHandler Disposing', { reason });

    this.disposeController.abort(reason);
    this.disposed.reject(reason);

    await Promise.allSettled(
      Object.entries(this.writers).map(([id, writer]) => {
        delete this.writers[id];
        return awaitTimeout(writer.ready, 1000, () => new Error('DisposeTimeout'))
          .then(() => writer.close())
          .catch((error) => writer.abort(error))
          .catch(() => undefined);
      }),
    );

    this.logger.debug('MessageChannelHandler Disposed');
  }
}

class TargetError extends Error {
  override name = 'TargetError';
  constructor(public readonly error: unknown) {
    super(String(error));
  }
}
