import '@main/polyfills/promise-with-resolvers';

import { BulkDownloaderLogger } from '../logger';
import { awaitTimeout } from '../promise-utils';
import { MessagableTarget, MessageStreamPortalExtra, WorkerStreamEvent } from './message';
import { StreamPortalSink } from './sink';
import { StreamPortalSource } from './source';

declare global {
  interface ReadableStream<R> {
    [Symbol.asyncIterator](): AsyncIterableIterator<R>;
  }
}

interface MessageChannelStreamData<TExtra> {
  id?: string;
  streamId?: string;
  data?: unknown;
  close?: boolean;
  error?: unknown;
  extra?: TExtra;
}

interface MessageChannelStreamResponse {
  id: string;
  streamId?: string;
  error?: unknown;
}

interface MessageChannelTargetData {
  type: 'message-channel-target';
}

export class MessageChannelStreamPortalSource<TExtra = void> implements StreamPortalSource<TExtra> {
  static CHANNEL_TIMEOUT = 10_000;
  protected readonly channel: MessagePort;
  protected readonly disposeController = new AbortController();
  protected readonly responses: Record<
    string,
    PromiseWithResolvers<MessageChannelStreamResponse> | undefined
  > = Object.create(null);
  protected readonly disposed = Promise.withResolvers<never>();
  protected readonly channelReady;
  protected lastResponse: Promise<unknown> = Promise.resolve();
  protected lastMessageId = 0n;
  protected isDisposing = false;

  constructor(
    protected readonly target: MessagableTarget,
    protected readonly logger: BulkDownloaderLogger = console,
    protected readonly channelTimeout = MessageChannelStreamPortalSource.CHANNEL_TIMEOUT,
  ) {
    this.disposeController.signal.addEventListener(
      'abort',
      () => this.disposed.reject(this.disposeController.signal.reason),
      { once: true },
    );

    const channel = new MessageChannel();

    this.channel = channel.port1;
    this.channel.addEventListener('message', this.handleChannelMessage.bind(this), {
      signal: this.disposeController.signal,
    });
    this.logger.debug('MessageChannelStreamPortalSource Starting own port');
    this.channel.start();

    this.logger.debug('MessageChannelStreamPortalSource Sending second port to target');
    this.target.postMessage({ type: 'message-channel-target' } as MessageChannelTargetData, [
      channel.port2,
    ]);

    this.channelReady = this.sendMessageAwait({});
  }

  async send(stream: ReadableStream, extra: TExtra): Promise<void> {
    if (this.disposeController.signal.aborted) {
      throw new Error('Resource is already disposed');
    }

    await this.channelReady;

    const streamId = this.createStreamId();

    this.logger.debug('MessageChannelStreamPortalSource Sending stream', { streamId, extra });

    try {
      await this.sendMessageAwait({ streamId, extra });
      await this.sendStream(streamId, stream);
      await this.sendMessageAwait({ streamId, close: true });
    } catch (error) {
      if (this.disposeController.signal.aborted) {
        return;
      }

      if (error instanceof TargetError) {
        this.logger.error(
          'MessageChannelStreamPortalSource Target error',
          { streamId },
          error.error,
        );
        this.abort(error.error);
        await stream.cancel(error.error);
        throw error.error;
      }

      this.logger.error('MessageChannelStreamPortalSource Error', { streamId }, error);
      await this.sendMessageAwait({ streamId, error });
    }
  }

  async dispose() {
    if (this.disposeController.signal.aborted || this.isDisposing) {
      return;
    }
    this.isDisposing = true;

    this.logger.debug('MessageChannelStreamPortalSource Disposing');

    try {
      await this.sendCloseAll();
    } finally {
      this.abort('Disposed');
    }
  }

  protected abort(reason?: unknown) {
    this.logger.debug('MessageChannelStreamPortalSource Aborting', reason);
    this.disposeController.abort(reason);
    this.channel.close();
  }

  protected async sendCloseAll() {
    await this.sendMessageAwait({ close: true });
  }

  protected async sendStream(streamId: string, stream: ReadableStream) {
    // Safari does not implement async iterator on ReadableStream
    if (Symbol.asyncIterator in stream === false) {
      return this.sendStreamLegacy(streamId, stream);
    }

    for await (const data of stream) {
      if (this.disposeController.signal.aborted) {
        throw this.disposeController.signal.reason;
      }
      await this.sendMessageAwait({ streamId, data });
    }
  }

  protected async sendStreamLegacy(streamId: string, stream: ReadableStream) {
    const reader = stream.getReader();
    let result: ReadableStreamReadResult<unknown>;

    try {
      do {
        result = await Promise.race([reader.read(), this.disposed.promise]);

        if (!result.done) {
          await this.sendMessageAwait({ streamId, data: result.value });
        }
      } while (!result.done);
    } finally {
      reader.releaseLock();
    }
  }

  protected async sendMessageAwait(data: MessageChannelStreamData<TExtra>) {
    if (this.disposeController.signal.aborted) {
      throw this.disposeController.signal.reason;
    }

    const id = String(++this.lastMessageId);
    const response = (this.responses[id] = Promise.withResolvers());
    const result = Promise.race([response.promise, this.disposed.promise]);

    const lastResponse = this.lastResponse;
    this.lastResponse = result.catch(() => undefined);

    try {
      await lastResponse;
      this.logger.debug('MessageChannelStreamPortalSource Sending message awaiting', { id });
      this.sendMessage({ ...data, id });

      // If channel does not respond in time - we assume the download has failed
      await awaitTimeout(
        result,
        this.channelTimeout,
        () => new TargetError(new Error('ChannelTimeout')),
      );
    } finally {
      delete this.responses[id];
    }
  }

  protected sendMessage(data: MessageChannelStreamData<TExtra>) {
    const transferables: Transferable[] = [];

    if (data.data && ArrayBuffer.isView(data.data)) {
      // Transfer underlying ArrayBuffer of TypedArray
      transferables.push(data.data.buffer);
    }

    this.logger.debug('MessageChannelStreamPortalSource Sending message', data);
    this.channel.postMessage(data, transferables);
  }

  protected async handleChannelMessage({ data }: MessageEvent<MessageChannelStreamResponse>) {
    if (data.id === undefined && data.error) {
      this.logger.error('MessageChannelStreamPortalSource Received error', data);
      this.abort(new TargetError(data.error));
      return;
    }

    const response = this.responses[data.id];

    if (!response) {
      return;
    }

    this.logger.debug('MessageChannelStreamPortalSource Received response', data);

    if (data.error) {
      response.reject(new TargetError(data.error));
    } else {
      response.resolve(data);
    }
  }

  protected createStreamId() {
    return `${Math.round(Math.random() * 1000000)}${Date.now()}`;
  }
}

export class MessageChannelStreamPortalSink<TExtra = void> implements StreamPortalSink<TExtra> {
  protected readonly disposeController = new AbortController();
  protected readonly handlersDisposed: Promise<void>[] = [];

  constructor(
    protected readonly target: EventTarget,
    protected readonly logger: BulkDownloaderLogger = console,
    protected readonly eventBus = new EventTarget(),
  ) {
    this.target.addEventListener('message', this.handleMessage.bind(this), {
      signal: this.disposeController.signal,
    });
  }

  onStream(
    cb: (stream: ReadableStream, extra: TExtra & MessageStreamPortalExtra) => void,
  ): () => void {
    if (this.disposeController.signal.aborted) {
      throw new Error('Resource is already disposed');
    }

    const listener = (event: Event) =>
      cb((event as unknown as WorkerStreamEvent<TExtra>).stream, {
        ...(event as WorkerStreamEvent<TExtra>).extra,
        event: (event as WorkerStreamEvent<TExtra>).event,
      });

    this.eventBus.addEventListener(WorkerStreamEvent.eventName, listener, {
      signal: this.disposeController.signal,
    });

    return () => this.eventBus.removeEventListener(WorkerStreamEvent.eventName, listener);
  }

  async dispose() {
    if (this.disposeController.signal.aborted) {
      return;
    }

    this.logger.debug('MessageChannelStreamPortalSink Disposing');
    this.disposeController.abort();
    await Promise.allSettled(this.handlersDisposed);
    this.logger.debug('MessageChannelStreamPortalSink Disposed');
  }

  protected handleMessage(e: Event) {
    const event = e as MessageEvent<MessageChannelTargetData>;

    if (event.data?.type !== 'message-channel-target') {
      return;
    }

    const port = event.ports[0];

    if (!port) {
      this.logger.error('MessageChannelStreamPortalSink Channel port is not received!');
      return;
    }

    this.logger.debug(
      'MessageChannelStreamPortalSink Creating MessageChannelHandler with channel port',
    );

    const handler = new MessageChannelHandler<TExtra>(
      port,
      event,
      this.eventBus,
      this.disposeController.signal,
      this.logger,
    );

    this.handlersDisposed.push(handler.whenDisposed());

    if (event instanceof ExtendableMessageEvent) {
      this.logger.debug('MessageChannelStreamPortalSink Extending event until handler completes');
      event.waitUntil(
        handler
          .whenDisposed()
          .finally(() =>
            this.logger.debug('MessageChannelStreamPortalSink Event handler completed'),
          ),
      );
    }
  }
}

export class MessageChannelHandler<TExtra = void> {
  protected readonly disposeController = new AbortController();
  protected readonly disposed = Promise.withResolvers<void>();
  protected readonly writers: Record<string, WritableStreamDefaultWriter> = Object.create(null);

  constructor(
    protected readonly port: MessagePort,
    protected readonly sourceEvent: MessageEvent<MessageChannelTargetData>,
    protected readonly eventBus: EventTarget,
    abortSignal: AbortSignal,
    protected readonly logger: BulkDownloaderLogger = console,
  ) {
    abortSignal.addEventListener('abort', () => this.dispose(abortSignal.reason), {
      signal: this.disposeController.signal,
    });
    this.port.addEventListener('message', this.handleMessage.bind(this), {
      signal: this.disposeController.signal,
    });
    this.port.start();
  }

  whenDisposed() {
    return this.disposed.promise.catch(() => undefined);
  }

  protected async handleMessage(event: MessageEvent<MessageChannelStreamData<TExtra>>) {
    const data = event.data;
    const response = { id: data.id, streamId: data.streamId } as MessageChannelStreamResponse;
    let writer: WritableStreamDefaultWriter | undefined;

    try {
      if (data.close && !data.streamId) {
        this.logger.debug('MessageChannelHandler Recived command to close all streams on channel');
        await this.dispose(new Error('Closed'));
        return;
      }

      if (!data.streamId) {
        return;
      }

      writer = this.writers[data.streamId];

      if (!writer) {
        this.logger.debug('MessageChannelHandler Creating new stream', { streamId: data.streamId });
        const stream = new TransformStream();
        writer = this.writers[data.streamId] = stream.writable.getWriter();

        Promise.race([writer.closed, this.disposed.promise])
          .then(() => {
            this.logger.debug('MessageChannelHandler Stream closed', { streamId: data.streamId });
            this.port.postMessage({ error: new Error('DestinationClosed') });
          })
          .catch((error) => {
            if (this.disposeController.signal.aborted) {
              return;
            }
            this.logger.error(
              'MessageChannelHandler Stream error',
              { streamId: data.streamId },
              error,
            );
            this.port.postMessage({ error });
          })
          // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
          .finally(() => delete this.writers[data.streamId!]);

        this.eventBus.dispatchEvent(
          new WorkerStreamEvent(stream.readable, data.extra, this.sourceEvent),
        );
      }

      if (data.error) {
        this.logger.error('MessageChannelHandler Received abort command', data, data.error);
        await writer?.abort(data.error);
        writer?.releaseLock();
        return;
      }

      if (data.close) {
        this.logger.debug('MessageChannelHandler Received command to close stream', {
          streamId: data.streamId,
        });
        await writer.ready;
        await writer.close();
        writer.releaseLock();
        return;
      }

      if (data.data) {
        this.logger.debug('MessageChannelHandler Writing data to stream', {
          streamId: data.streamId,
        });
        await writer.ready;
        await writer.write(data.data);
        this.logger.debug('MessageChannelHandler Data written to stream', {
          streamId: data.streamId,
        });
      }
    } catch (error) {
      this.logger.error('MessageChannelHandler Error during stream operation', data, error);
      try {
        await writer?.abort(error);
        writer?.releaseLock();
      } catch {
        /* empty */
      }
      response.error = error;
    } finally {
      if (data.id) {
        this.port.postMessage(response);
      }
    }
  }

  protected async dispose(reason?: unknown) {
    if (this.disposeController.signal.aborted) {
      return;
    }

    this.logger.debug('MessageChannelHandler Disposing', { reason });

    this.disposeController.abort(reason);
    this.disposed.reject(reason);

    await Promise.allSettled(
      Object.entries(this.writers).map(([id, writer]) => {
        delete this.writers[id];
        return awaitTimeout(writer.ready, 1000, () => new Error('DisposeTimeout'))
          .then(() => writer.close())
          .catch((error) => writer.abort(error))
          .catch(() => undefined);
      }),
    );

    this.logger.debug('MessageChannelHandler Disposed');

    // Allow response to be sent before closing the port
    setTimeout(() => this.port.close(), 10);
  }
}

class TargetError extends Error {
  override name = 'TargetError';
  constructor(public readonly error: unknown) {
    super(String(error));
  }
}
