import { type ProducerGlobalConfig } from "@confluentinc/kafka-javascript";

import {
  KAFKA_PRODUCER_CLIENT_ID,
  CONTACT_BATCH_CREATIONS_TOPIC,
  LIST_EVALUATIONS_TOPIC,
  OVERVIEW_EVALUATIONS_TOPIC,
  LIST_PRIORITIY_CUTOFF,
  PRIORITY_PARTITION_COUNT,
  REPO_EVALUATION_TOPIC,
  USER_EVALUATION_TOPIC,
  REPO_DEPS_EVALUATION_TOPIC,
  USER_DEPS_EVALUATION_TOPIC,
  REEVALUATE_BATCH_TOPIC,
} from "./kafkaConstants";

import { Constants, createOrUpdateTeamEvals } from ".";
import { setListContactWaiting } from "./setListContactWaiting";
import { PrismaTXN } from "@openqlabs/drm-db";
import { CommonConstructorConfig } from "@confluentinc/kafka-javascript/types/kafkajs";
import {
  type KafkaConsumer as KafkaConsumerType,
  type Producer as ProducerType,
} from "@confluentinc/kafka-javascript";

export class KafkaSingleton {
  private _consumerReferences: KafkaConsumerType[] = [];
  private _producer: ProducerType | undefined;
  private producerConnected = false;

  globalConfig: CommonConstructorConfig = {};

  constructor(
    Producer: new (config: ProducerGlobalConfig) => ProducerType,
    clientProps: CommonConstructorConfig,
    _logLevel: 1,
    nodeEnv?: string
  ) {
    if (process.env.DATABASE_URL?.includes("test")) return;
    this.globalConfig = clientProps;
    if (nodeEnv !== "development") {
      this.globalConfig = {
        "client.id": KAFKA_PRODUCER_CLIENT_ID,
        ...clientProps,
      };
    }

    console.log(
      `Booting ${
        nodeEnv === "development" ? "LOCAL" : "REMOTE"
      } Kafka client...`
    );

    const producer = new Producer({
      ...this.globalConfig,
      "metadata.broker.list": this.globalConfig["bootstrap.servers"],
      dr_cb: (err: Error, report: { topic: string }) => {
        if (err) {
          console.error(`Error producing to topic ${report.topic}`);
          console.error(err);
        } else {
          console.log(
            `Successfully produced to topic ${JSON.stringify(report)}`
          );
        }
      },
    });
    producer.connect();

    producer.on("ready", () => {
      this.producerConnected = true;
      console.log("producer ready.");
    });

    setInterval(() => {
      if (this.producerConnected && this._producer) {
        producer.poll();
      }
    }, 1000);
    this._producer = producer;
  }

  public async disconnect(): Promise<void> {
    try {
      if (this._producer) {
        this._producer.disconnect();
        console.log("Producer disconnected");
        for (const consumer of this._consumerReferences) {
          consumer.disconnect();
          console.log("Consumer disconnected");
        }
      }
    } catch (err) {
      console.error(err);
    }
  }

  public storeConsumerReference(
    consumer: KafkaConsumerType | void | undefined
  ): void {
    if (consumer) {
      this._consumerReferences.push(consumer);
    }
  }

  public async publishContactBatchCreation(
    contactBatchCreationId: string,
    teamAccountId: string,
    listId: string,
    prisma: PrismaTXN
  ): Promise<void> {
    const listSize = await prisma.githubUrl.count({
      where: {
        listId,
      },
    });
    await this.publishMessage(CONTACT_BATCH_CREATIONS_TOPIC, {
      contactBatchCreationId,
      teamAccountId,
      listId,
      listSize,
    });
  }

  public async publishEvaluation(
    evaluationId: string,
    teamAccountId: string,
    listId: string,
    type: string,
    contactId: string,
    accountId: string,
    listSize: number
  ): Promise<void> {
    const topic = Constants.getEvaluationTopic(type);

    if (topic === null) {
      throw new Error(`No topic found for evaluation type: ${type}`);
    }

    await this.publishMessage(
      topic,
      {
        evaluationId,
        teamAccountId,
        listId,
        type,
        contactId,
        accountId,
      },
      listSize
    );
  }

  public async publishListEvaluation(
    evaluationId: string,
    teamAccountId: string,
    listId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(LIST_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      listId,
      type,
    });
  }

  public async publishOverviewEvaluation(
    evaluationId: string,
    teamAccountId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(OVERVIEW_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      type,
    });
  }
  private async getPartition(topic: string, listSize?: number) {
    const numPartitions = await new Promise<number>((resolve) => {
      if (this._producer) {
        // resolves when num partitions is found, has 10 retries if it fails.
        const checkConnection = () => {
          try {
            switch (topic) {
              case LIST_EVALUATIONS_TOPIC:
                resolve(parseInt(process.env.LIST_EVALUATIONS_CONSUMER_COUNT!));
                break;
              case OVERVIEW_EVALUATIONS_TOPIC:
                resolve(
                  parseInt(process.env.OVERVIEW_EVALUATIONS_CONSUMER_COUNT!)
                );
                break;
              case REPO_EVALUATION_TOPIC:
                resolve(parseInt(process.env.REPO_EVALUATIONS_CONSUMER_COUNT!));

                break;
              case USER_EVALUATION_TOPIC:
                resolve(parseInt(process.env.USER_EVALUATIONS_CONSUMER_COUNT!));
                break;
              case REPO_DEPS_EVALUATION_TOPIC:
                resolve(
                  parseInt(
                    process.env.REPO_DEPENDENCIES_EVALUATIONS_CONSUMER_COUNT!
                  )
                );
                break;
              case USER_DEPS_EVALUATION_TOPIC:
                resolve(
                  parseInt(
                    process.env.USER_DEPENDENCIES_EVALUATIONS_CONSUMER_COUNT!
                  )
                );
                break;
              case CONTACT_BATCH_CREATIONS_TOPIC:
                resolve(
                  parseInt(process.env.CONTACT_BATCH_CREATIONS_CONSUMER_COUNT!)
                );
                break;
              case REEVALUATE_BATCH_TOPIC:
                resolve(parseInt(process.env.REEVALUATE_BATCH_CONSUMER_COUNT!));
                break;
              default:
                resolve(10);
            }
          } catch (err) {
            console.error(err);
            resolve(1);
          }
        };

        checkConnection();
      }
    });
    const getRandomPartition = (
      initialOffset: number,
      currentNumPartitions: number,
      numTotalPartitions: number
    ) => {
      const numPartitions = Math.min(currentNumPartitions, numTotalPartitions);
      const partition =
        Math.floor(Math.random() * numPartitions) + initialOffset;
      if (partition >= numPartitions) {
        console.log(`Selected partition: ${partition - initialOffset}`);
        return partition - initialOffset;
      }
      return partition;
    };
    if (listSize === undefined) {
      return getRandomPartition(0, numPartitions, numPartitions);
    }
    if (listSize > LIST_PRIORITIY_CUTOFF) {
      return getRandomPartition(
        PRIORITY_PARTITION_COUNT,
        numPartitions,
        numPartitions
      );
    } else {
      return getRandomPartition(0, PRIORITY_PARTITION_COUNT, numPartitions);
    }
  }
  public async publishMessage(
    topic: string,
    message: object,
    listSize?: number
  ): Promise<void> {
    try {
      if (this._producer) {
        while (!this.producerConnected || !this._producer) {
          console.log("Waiting for producer to connect...");
          await new Promise((resolve) => setTimeout(resolve, 1000));
        }

        const contactPartition = await this.getPartition(topic, listSize);

        await this._producer.produce(
          topic,
          contactPartition,
          Buffer.from(JSON.stringify(message))
        );
      }
    } catch (error) {
      console.error(error);
    }
  }
  public async publishMessageWithSideEffect(
    prisma: PrismaTXN,
    topic: string,
    message: {
      evaluationId: string;
      teamAccountId: string;
      listId: string;
      type: string;
      contactId: string;
      queued: boolean | null;
    }
  ): Promise<void> {
    if ((message as { evaluationId?: string }).evaluationId) {
      // run sideEffects

      await setListContactWaiting(prisma, {
        contactId: message.contactId,
        evaluationId: message.evaluationId,
        type: message.type,
      });

      await createOrUpdateTeamEvals(prisma, {
        teamAccountId: message.teamAccountId,
        listId: message.listId,
      });
    }
    const contactCount = await prisma.githubUrl.count({
      where: {
        listId: message.listId,
      },
    });
    if (!message.queued) {
      await this.publishMessage(topic, message, contactCount)
        .then(async () => {
          console.info(
            `Successfully published evaluation ${message.evaluationId}. Updating status...`
          );
          return await prisma.evaluation.update({
            where: { id: message.evaluationId },
            data: { queued: true },
          });
        })
        .then(() => {
          console.info(
            `Evaluation ${message.evaluationId} status updated to queued.`
          );
        })
        .catch((err) => {
          console.error(
            `Error publishing evaluation ${message.evaluationId} to topic: ${err}`
          );
        });
    }
  }
}
