/*
 * Decompiled with CFR 0.152.
 */
package org.pentaho.big.data.kettle.plugins.kafka;

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerField;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaConsumerInputMeta;
import org.pentaho.big.data.kettle.plugins.kafka.KafkaProducerOutputMeta;
import org.pentaho.bigdata.api.jaas.JaasConfigService;

public class KafkaFactory {
    private Function<Map<String, Object>, Consumer> consumerFunction;
    private Function<Map<String, Object>, Producer<Object, Object>> producerFunction;

    public static KafkaFactory defaultFactory() {
        return new KafkaFactory(KafkaConsumer::new, KafkaProducer::new);
    }

    KafkaFactory(Function<Map<String, Object>, Consumer> consumerFunction, Function<Map<String, Object>, Producer<Object, Object>> producerFunction) {
        this.consumerFunction = consumerFunction;
        this.producerFunction = producerFunction;
    }

    public Consumer consumer(KafkaConsumerInputMeta meta, Function<String, String> variablesFunction) {
        return this.consumer(meta, variablesFunction, KafkaConsumerField.Type.String, KafkaConsumerField.Type.String);
    }

    public Consumer consumer(KafkaConsumerInputMeta meta, Function<String, String> variablesFunction, KafkaConsumerField.Type keyDeserializerType, KafkaConsumerField.Type msgDeserializerType) {
        HashMap<String, Object> kafkaConfig = new HashMap<String, Object>();
        Function<String, String> variableNonNull = variablesFunction.andThen(KafkaFactory::nullToEmpty);
        kafkaConfig.put("bootstrap.servers", variableNonNull.apply(meta.getBootstrapServers()));
        kafkaConfig.put("group.id", variableNonNull.apply(meta.getConsumerGroup()));
        kafkaConfig.put("value.deserializer", msgDeserializerType.getKafkaDeserializerClass());
        kafkaConfig.put("key.deserializer", keyDeserializerType.getKafkaDeserializerClass());
        kafkaConfig.put("enable.auto.commit", meta.isAutoCommit());
        meta.getJaasConfigService().ifPresent(jaasConfigService -> this.putKerberosConfig(kafkaConfig, (JaasConfigService)jaasConfigService));
        meta.getConfig().entrySet().forEach(entry -> kafkaConfig.put((String)entry.getKey(), variableNonNull.apply((String)entry.getValue())));
        return this.consumerFunction.apply(kafkaConfig);
    }

    public void putKerberosConfig(HashMap<String, Object> kafkaConfig, JaasConfigService jaasConfigService) {
        if (jaasConfigService.isKerberos()) {
            kafkaConfig.put("sasl.jaas.config", jaasConfigService.getJaasConfig());
            kafkaConfig.put("security.protocol", "SASL_PLAINTEXT");
        }
    }

    public Producer<Object, Object> producer(KafkaProducerOutputMeta meta, Function<String, String> variablesFunction) {
        return this.producer(meta, variablesFunction, KafkaConsumerField.Type.String, KafkaConsumerField.Type.String);
    }

    public Producer<Object, Object> producer(KafkaProducerOutputMeta meta, Function<String, String> variablesFunction, KafkaConsumerField.Type keySerializerType, KafkaConsumerField.Type msgSerializerType) {
        Function<String, String> variableNonNull = variablesFunction.andThen(KafkaFactory::nullToEmpty);
        HashMap<String, Object> kafkaConfig = new HashMap<String, Object>();
        kafkaConfig.put("bootstrap.servers", variableNonNull.apply(meta.getBootstrapServers()));
        kafkaConfig.put("client.id", variableNonNull.apply(meta.getClientId()));
        kafkaConfig.put("value.serializer", msgSerializerType.getKafkaSerializerClass());
        kafkaConfig.put("key.serializer", keySerializerType.getKafkaSerializerClass());
        meta.getJaasConfigService().ifPresent(jaasConfigService -> this.putKerberosConfig(kafkaConfig, (JaasConfigService)jaasConfigService));
        meta.getConfig().entrySet().forEach(entry -> kafkaConfig.put((String)entry.getKey(), variableNonNull.apply((String)entry.getValue())));
        return this.producerFunction.apply(kafkaConfig);
    }

    private static String nullToEmpty(String value) {
        return value == null ? "" : value;
    }
}

