package com.ibm.ws.microprofile.reactive.messaging.kafka;

import com.ibm.websphere.ras.Tr;
import com.ibm.websphere.ras.TraceComponent;
import com.ibm.websphere.ras.annotation.TraceObjectField;
import com.ibm.websphere.ras.annotation.TraceOptions;
import com.ibm.ws.ffdc.FFDCFilter;
import com.ibm.ws.ffdc.annotation.FFDCIgnore;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterException;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaAdapterFactory;
import com.ibm.ws.microprofile.reactive.messaging.kafka.adapter.KafkaConsumer;
import com.ibm.ws.ras.instrument.annotation.InjectedFFDC;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.enterprise.concurrent.ManagedScheduledExecutorService;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.osgi.framework.Bundle;
import org.osgi.framework.FrameworkUtil;

@InjectedFFDC
@TraceObjectField(fieldName = "tc", fieldDesc = "Lcom/ibm/websphere/ras/TraceComponent;")
@ApplicationScoped
@Connector(KafkaConnectorConstants.CONNECTOR_NAME)
@TraceOptions
/* loaded from: input_file:com/ibm/ws/microprofile/reactive/messaging/kafka/KafkaIncomingConnector.class */
public class KafkaIncomingConnector implements IncomingConnectorFactory {
    private static final TraceComponent tc = Tr.register(KafkaIncomingConnector.class, "REACTIVEMESSAGE", "com.ibm.ws.microprofile.reactive.messaging.kafka.resources.ReactiveMessaging");
    ManagedScheduledExecutorService executor;

    @Inject
    KafkaAdapterFactory kafkaAdapterFactory;
    private final List<KafkaInput<?, ?>> kafkaInputs = Collections.synchronizedList(new ArrayList());
    static final long serialVersionUID = -9192585079099587413L;

    @PostConstruct
    private void postConstruct() {
        Bundle bundle = FrameworkUtil.getBundle(KafkaIncomingConnector.class);
        this.executor = (ManagedScheduledExecutorService) bundle.getBundleContext().getService(bundle.getBundleContext().getServiceReference(ManagedScheduledExecutorService.class));
        if (this.executor == null) {
            throw new IllegalStateException(Tr.formatMessage(tc, "internal.kafka.connector.error.CWMRX1000E", new Object[]{"The Managed Scheduled Executor Service could not be found."}));
        }
    }

    @PreDestroy
    private void shutdown() {
        synchronized (this.kafkaInputs) {
            Iterator<KafkaInput<?, ?>> it = this.kafkaInputs.iterator();
            while (it.hasNext()) {
                try {
                    it.next().shutdown();
                } catch (Exception e) {
                    FFDCFilter.processException(e, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaIncomingConnector", "78", this, new Object[0]);
                }
            }
        }
    }

    public PublisherBuilder<Message<Object>> getPublisherBuilder(Config config) {
        String str = (String) config.getValue("channel-name", String.class);
        try {
            if (!config.getOptionalValue(KafkaConnectorConstants.GROUP_ID, String.class).isPresent()) {
                throw new IllegalArgumentException(Tr.formatMessage(tc, "kafka.groupid.not.set.CWMRX1005E", new Object[]{"mp.messaging.incoming." + str + "." + KafkaConnectorConstants.GROUP_ID}));
            }
            String str2 = (String) config.getOptionalValue(KafkaConnectorConstants.TOPIC, String.class).orElse(str);
            int intValue = ((Integer) config.getOptionalValue(KafkaConnectorConstants.UNACKED_LIMIT, Integer.class).orElse(Integer.valueOf(((Integer) config.getOptionalValue(KafkaConnectorConstants.MAX_POLL_RECORDS, Integer.class).orElse(500)).intValue()))).intValue();
            int intValue2 = ((Integer) config.getOptionalValue(KafkaConnectorConstants.CREATION_RETRY_SECONDS, Integer.class).orElse(0)).intValue();
            HashMap hashMap = new HashMap();
            hashMap.put(KafkaConnectorConstants.ENABLE_AUTO_COMMIT, "false");
            hashMap.put(KafkaConnectorConstants.KEY_DESERIALIZER, KafkaConnectorConstants.STRING_DESERIALIZER);
            hashMap.put(KafkaConnectorConstants.VALUE_DESERIALIZER, KafkaConnectorConstants.STRING_DESERIALIZER);
            hashMap.putAll((Map) StreamSupport.stream(config.getPropertyNames().spliterator(), false).filter(str3 -> {
                return !KafkaConnectorConstants.NON_KAFKA_PROPS.contains(str3);
            }).collect(Collectors.toMap(Function.identity(), str4 -> {
                return (String) config.getValue(str4, String.class);
            })));
            boolean equalsIgnoreCase = "true".equalsIgnoreCase((String) hashMap.get(KafkaConnectorConstants.ENABLE_AUTO_COMMIT));
            KafkaConsumer kafkaConsumerWithRetry = getKafkaConsumerWithRetry(hashMap, intValue2, str);
            PartitionTrackerFactory partitionTrackerFactory = new PartitionTrackerFactory();
            partitionTrackerFactory.setExecutor(this.executor);
            partitionTrackerFactory.setAdapterFactory(this.kafkaAdapterFactory);
            partitionTrackerFactory.setAutoCommitEnabled(equalsIgnoreCase);
            if (equalsIgnoreCase) {
                intValue = 0;
            }
            KafkaInput<?, ?> kafkaInput = new KafkaInput<>(this.kafkaAdapterFactory, partitionTrackerFactory, kafkaConsumerWithRetry, this.executor, str2, intValue);
            this.kafkaInputs.add(kafkaInput);
            return kafkaInput.getPublisher();
        } catch (Exception e) {
            FFDCFilter.processException(e, "com.ibm.ws.microprofile.reactive.messaging.kafka.KafkaIncomingConnector", "137", this, new Object[]{config});
            throw new KafkaConnectorException(Tr.formatMessage(tc, "kafka.create.incoming.error.CWMRX1007E", new Object[]{str, e.getMessage()}), e);
        }
    }

    @FFDCIgnore({KafkaAdapterException.class})
    private <K, V> KafkaConsumer<K, V> getKafkaConsumerWithRetry(Map<String, Object> map, int i, String str) throws InterruptedException {
        if (i == 0) {
            return this.kafkaAdapterFactory.newKafkaConsumer(map);
        }
        long nanos = Duration.ofSeconds(i).toNanos();
        long nanoTime = System.nanoTime();
        while (true) {
            try {
                return this.kafkaAdapterFactory.newKafkaConsumer(map);
            } catch (KafkaAdapterException e) {
                if (System.nanoTime() - nanoTime > nanos) {
                    throw e;
                }
                Tr.warning(tc, "kafka.create.incoming.retry.CWMRX1009W", new Object[]{str, e.getMessage()});
                Thread.sleep(1000L);
            }
        }
    }
}
