package monasca.persister.consumer;

import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:monasca/persister/consumer/KafkaConsumer.class */
public class KafkaConsumer<T> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private static final int WAIT_TIME = 10;
    private ExecutorService executorService;
    private final KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic;
    private final String threadId;

    @Inject
    public KafkaConsumer(@Assisted KafkaConsumerRunnableBasic<T> kafkaConsumerRunnableBasic, @Assisted String str) {
        this.kafkaConsumerRunnableBasic = kafkaConsumerRunnableBasic;
        this.threadId = str;
    }

    public void start() {
        logger.info("[{}]: start", this.threadId);
        this.executorService = Executors.newFixedThreadPool(1);
        this.executorService.submit(this.kafkaConsumerRunnableBasic);
    }

    public void stop() {
        logger.info("[{}]: stop", this.threadId);
        this.kafkaConsumerRunnableBasic.stop();
        if (this.executorService != null) {
            logger.info("[{}]: shutting down executor service", this.threadId);
            this.executorService.shutdown();
            try {
                logger.info("[{}]: awaiting termination...", this.threadId);
                if (!this.executorService.awaitTermination(10L, TimeUnit.SECONDS)) {
                    logger.warn("[{}]: did not shut down in {} seconds", this.threadId, Integer.valueOf(WAIT_TIME));
                }
                logger.info("[{}]: terminated", this.threadId);
            } catch (InterruptedException e) {
                logger.info("[{}]: awaitTermination interrupted", this.threadId, e);
            }
        }
    }
}
