package de.rtb.pcon.core.notification;

import de.rtb.pcon.core.notification.NotificationMessage;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/classes/de/rtb/pcon/core/notification/NotificationSenderThread.class */
class NotificationSenderThread<T extends NotificationMessage> extends Thread {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) NotificationSenderThread.class);
    private static final Duration RECOVERY_TIME_OUT = Duration.ofMinutes(1);
    private Throwable lastException;
    private LocalDateTime inRecoveryFrom;
    private BlockingQueue<T> queue;
    private Consumer<T> consumer;

    public NotificationSenderThread(String str, BlockingQueue<T> blockingQueue, Consumer<T> consumer) {
        super(str);
        setDaemon(true);
        this.queue = blockingQueue;
        this.consumer = consumer;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        boolean z = true;
        T t = null;
        while (z) {
            try {
                this.lastException = null;
                this.inRecoveryFrom = null;
                t = this.queue.take();
                if (t.getType() != NotificationMessage.RecordTarget.SHUTDOWN_MARKER) {
                    this.consumer.accept(t);
                } else {
                    z = false;
                }
            } catch (InterruptedException | RuntimeException e) {
                if (!this.queue.offer(t)) {
                    log.error("Message for '{}' is lost. It cannot be put back to the queue. Queue is full {}.", t.getReceiver(), Integer.valueOf(this.queue.size()));
                }
                log.error("Email was not sent. Next try in " + RECOVERY_TIME_OUT.toSeconds() + " seconds.", e);
                this.lastException = e;
                this.inRecoveryFrom = LocalDateTime.now();
                try {
                    sleep(RECOVERY_TIME_OUT.toMillis());
                } catch (InterruptedException e2) {
                    log.error("Mail thread can't sleep", (Throwable) e2);
                }
            } catch (Exception e3) {
                this.lastException = e3;
                log.error("Sender thread failed permanently. There will be no resume.", (Throwable) e3);
                throw e3;
            }
        }
    }

    public void shutDown() {
        this.queue.add(NotificationMessage.shutDown());
    }

    public Throwable getLastEcException() {
        return this.lastException;
    }

    public Duration resumedIn() {
        return this.inRecoveryFrom == null ? Duration.ZERO : Duration.between(LocalDateTime.now(), this.inRecoveryFrom).plus(RECOVERY_TIME_OUT);
    }

    public boolean isOk() {
        return isAlive() && getLastEcException() == null;
    }

    public int numberOfMessagesInQueue() {
        return this.queue.size();
    }
}
