import { type ProducerGlobalConfig } from "@confluentinc/kafka-javascript";

import {
  KAFKA_PRODUCER_CLIENT_ID,
  TARGET_BATCH_CREATIONS_TOPIC,
  CAMPAIGN_EVALUATIONS_TOPIC,
  OVERVIEW_EVALUATIONS_TOPIC,
} from "./kafkaConstants";

import { Constants, createOrUpdateTeamEvals } from ".";
import { setTargetContactWaiting } from "./setTargetContactWaiting";
import { PrismaTXN } from "@openqlabs/drm-db";
import { CommonConstructorConfig } from "@confluentinc/kafka-javascript/types/kafkajs";
import {
  type KafkaConsumer as KafkaConsumerType,
  type Producer as ProducerType,
} from "@confluentinc/kafka-javascript";

export class KafkaSingleton {
  private _producerIsConnected = false;
  private _consumerReferences: KafkaConsumerType[] = [];
  private _producer: ProducerType;

  globalConfig: CommonConstructorConfig = {};

  constructor(
    Producer: new (config: ProducerGlobalConfig) => ProducerType,
    clientProps: CommonConstructorConfig,
    _logLevel: 1,
    kafkaBrokerUrls?: string,
    nodeEnv?: string
  ) {
    this.globalConfig = clientProps;
    if (nodeEnv !== "development") {
      this.globalConfig = {
        "client.id": KAFKA_PRODUCER_CLIENT_ID,
        "bootstrap.servers": kafkaBrokerUrls
          ? kafkaBrokerUrls
          : "localhost:9092",
        ...clientProps,
      };
    }

    console.log(
      `Booting ${
        nodeEnv === "development" ? "LOCAL" : "REMOTE"
      } Kafka client...`
    );

    const producer = new Producer({
      ...this.globalConfig,
      "metadata.broker.list": this.globalConfig["bootstrap.servers"],
      dr_cb: (err: Error, report: { topic: string }) => {
        if (err) {
          console.error(`Error producing to topic ${report.topic}`);
          console.error(err);
        }
        console.log(`Successfully produced to topic ${JSON.stringify(report)}`);
      },
    });
    producer.connect();
    producer.on("ready", () => {
      this._producerIsConnected = true;
      console.log("producer ready.");
    });
    this._producer = producer;
  }

  public async disconnect(): Promise<void> {
    try {
      this._producer.disconnect();
      console.log("Producer disconnected");
      for (const consumer of this._consumerReferences) {
        consumer.disconnect();
        console.log("Consumer disconnected");
      }
    } catch (err) {
      console.error(err);
    }
  }

  public storeConsumerReference(
    consumer: KafkaConsumerType | void | undefined
  ): void {
    if (consumer) {
      this._consumerReferences.push(consumer);
    }
  }

  public async publishTargetBatchCreation(
    targetBatchCreationId: string,
    teamAccountId: string,
    campaignId: string
  ): Promise<void> {
    await this.publishMessage(TARGET_BATCH_CREATIONS_TOPIC, {
      targetBatchCreationId,
      teamAccountId,
      campaignId,
    });
  }

  public async publishEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string,
    targetContactId: string,
    accountId: string
  ): Promise<void> {
    const topic = Constants.getEvaluationTopic(type);

    if (topic === null) {
      throw new Error(`No topic found for evaluation type: ${type}`);
    }

    await this.publishMessage(topic, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
      targetContactId,
      accountId,
    });
  }

  public async publishCampaignEvaluation(
    evaluationId: string,
    teamAccountId: string,
    campaignId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(CAMPAIGN_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      campaignId,
      type,
    });
  }

  public async publishOverviewEvaluation(
    evaluationId: string,
    teamAccountId: string,
    type: string
  ): Promise<void> {
    await this.publishMessage(OVERVIEW_EVALUATIONS_TOPIC, {
      evaluationId,
      teamAccountId,
      type,
    });
  }

  public async publishMessage(topic: string, message: object): Promise<void> {
    try {
      while (!this._producerIsConnected) {
        console.log("Waiting for producer to connect...");
        await new Promise((resolve) => setTimeout(resolve, 1000));
      }

      await this._producer.produce(
        topic,
        null,
        Buffer.from(JSON.stringify(message))
      );
    } catch (error) {
      console.error(error);
    }
  }
  public async publishMessageWithSideEffect(
    prisma: PrismaTXN,
    topic: string,
    message: {
      evaluationId: string;
      teamAccountId: string;
      campaignId: string;
      type: string;
      targetContactId: string;
      queued: boolean | null;
    }
  ): Promise<void> {
    if ((message as { evaluationId?: string }).evaluationId) {
      // run sideEffects

      await setTargetContactWaiting(prisma, {
        targetContactId: message.targetContactId,
        evaluationId: message.evaluationId,
        type: message.type,
      });

      await createOrUpdateTeamEvals(prisma, {
        teamAccountId: message.teamAccountId,
        campaignId: message.campaignId,
      });
    }
    if (!message.queued) {
      await this.publishMessage(topic, message)
        .then(async () => {
          console.info(
            `Successfully published evaluation ${message.evaluationId}. Updating status...`
          );
          return await prisma.evaluation.update({
            where: { id: message.evaluationId },
            data: { queued: true },
          });
        })
        .then(() => {
          console.info(
            `Evaluation ${message.evaluationId} status updated to queued.`
          );
        })
        .catch((err) => {
          console.error(
            `Error publishing evaluation ${message.evaluationId} to topic: ${err}`
          );
        });
    }
  }
}
