/*
 * Decompiled with CFR 0.152.
 */
package io.lettuce.core.protocol;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionBuilder;
import io.lettuce.core.ConnectionEvents;
import io.lettuce.core.event.EventBus;
import io.lettuce.core.event.connection.ReconnectAttemptEvent;
import io.lettuce.core.event.connection.ReconnectFailedEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.protocol.ChannelLogDescriptor;
import io.lettuce.core.protocol.CommandHandler;
import io.lettuce.core.protocol.ConnectionFacade;
import io.lettuce.core.protocol.Endpoint;
import io.lettuce.core.protocol.ReconnectionHandler;
import io.lettuce.core.protocol.ReconnectionListener;
import io.lettuce.core.resource.Delay;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.local.LocalAddress;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.concurrent.EventExecutorGroup;
import io.netty.util.internal.logging.InternalLogLevel;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

@ChannelHandler.Sharable
public class ConnectionWatchdog
extends ChannelInboundHandlerAdapter {
    private static final long LOGGING_QUIET_TIME_MS = TimeUnit.MILLISECONDS.convert(5L, TimeUnit.SECONDS);
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(ConnectionWatchdog.class);
    private final Delay reconnectDelay;
    private final Bootstrap bootstrap;
    private final EventExecutorGroup reconnectWorkers;
    private final ReconnectionHandler reconnectionHandler;
    private final ReconnectionListener reconnectionListener;
    private final Timer timer;
    private final EventBus eventBus;
    private final String redisUri;
    private final String epid;
    private Channel channel;
    private SocketAddress remoteAddress;
    private long lastReconnectionLogging = -1L;
    private String logPrefix;
    private final AtomicBoolean reconnectSchedulerSync;
    private volatile int attempts;
    private volatile boolean armed;
    private volatile boolean listenOnChannelInactive;
    private volatile Timeout reconnectScheduleTimeout;

    public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Bootstrap bootstrap, Timer timer, EventExecutorGroup reconnectWorkers, Mono<SocketAddress> socketAddressSupplier, ReconnectionListener reconnectionListener, ConnectionFacade connectionFacade, EventBus eventBus, Endpoint endpoint) {
        LettuceAssert.notNull((Object)reconnectDelay, "Delay must not be null");
        LettuceAssert.notNull((Object)clientOptions, "ClientOptions must not be null");
        LettuceAssert.notNull((Object)bootstrap, "Bootstrap must not be null");
        LettuceAssert.notNull((Object)timer, "Timer must not be null");
        LettuceAssert.notNull((Object)reconnectWorkers, "ReconnectWorkers must not be null");
        LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null");
        LettuceAssert.notNull((Object)reconnectionListener, "ReconnectionListener must not be null");
        LettuceAssert.notNull((Object)connectionFacade, "ConnectionFacade must not be null");
        LettuceAssert.notNull((Object)eventBus, "EventBus must not be null");
        LettuceAssert.notNull((Object)endpoint, "Endpoint must not be null");
        this.reconnectDelay = reconnectDelay;
        this.bootstrap = bootstrap;
        this.timer = timer;
        this.reconnectWorkers = reconnectWorkers;
        this.reconnectionListener = reconnectionListener;
        this.reconnectSchedulerSync = new AtomicBoolean(false);
        this.eventBus = eventBus;
        this.redisUri = (String)bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI);
        this.epid = endpoint.getId();
        Mono<SocketAddress> wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> {
            this.remoteAddress = addr;
        }).onErrorResume(t -> {
            if (logger.isDebugEnabled()) {
                logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString() + ", reusing cached address " + this.remoteAddress, (Throwable)t);
            } else {
                logger.warn("Cannot retrieve current address from socketAddressSupplier: " + t.toString() + ", reusing cached address " + this.remoteAddress);
            }
            return Mono.just(this.remoteAddress);
        });
        this.reconnectionHandler = new ReconnectionHandler(clientOptions, bootstrap, wrappedSocketAddressSupplier, timer, reconnectWorkers, connectionFacade);
        this.resetReconnectDelay();
    }

    void prepareClose() {
        this.setListenOnChannelInactive(false);
        this.setReconnectSuspended(true);
        Timeout reconnectScheduleTimeout = this.reconnectScheduleTimeout;
        if (reconnectScheduleTimeout != null && !reconnectScheduleTimeout.isCancelled()) {
            reconnectScheduleTimeout.cancel();
        }
        this.reconnectionHandler.prepareClose();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        CommandHandler commandHandler = ctx.pipeline().get(CommandHandler.class);
        this.reconnectSchedulerSync.set(false);
        this.channel = ctx.channel();
        this.reconnectScheduleTimeout = null;
        this.logPrefix = null;
        this.remoteAddress = this.channel.remoteAddress();
        this.attempts = 0;
        this.resetReconnectDelay();
        this.logPrefix = null;
        logger.debug("{} channelActive()", (Object)this.logPrefix());
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        logger.debug("{} channelInactive()", (Object)this.logPrefix());
        if (!this.armed) {
            logger.debug("{} ConnectionWatchdog not armed", (Object)this.logPrefix());
            return;
        }
        this.channel = null;
        if (this.listenOnChannelInactive && !this.reconnectionHandler.isReconnectSuspended()) {
            this.scheduleReconnect();
        } else {
            logger.debug("{} Reconnect scheduling disabled", (Object)this.logPrefix(), (Object)ctx);
        }
        super.channelInactive(ctx);
    }

    void arm() {
        this.armed = true;
        this.setListenOnChannelInactive(true);
    }

    public void scheduleReconnect() {
        logger.debug("{} scheduleReconnect()", (Object)this.logPrefix());
        if (!this.isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }
        if (!this.isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }
        if ((this.channel == null || !this.channel.isActive()) && this.reconnectSchedulerSync.compareAndSet(false, true)) {
            ++this.attempts;
            int attempt = this.attempts;
            Duration delay = this.reconnectDelay.createDelay(attempt);
            int timeout2 = (int)delay.toMillis();
            logger.debug("{} Reconnect attempt {}, delay {}ms", this.logPrefix(), attempt, timeout2);
            this.reconnectScheduleTimeout = this.timer.newTimeout(it -> {
                this.reconnectScheduleTimeout = null;
                if (!this.isEventLoopGroupActive()) {
                    logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated");
                    return;
                }
                this.reconnectWorkers.submit(() -> {
                    this.run(attempt, delay);
                    return null;
                });
            }, timeout2, TimeUnit.MILLISECONDS);
            if (!this.reconnectSchedulerSync.get()) {
                this.reconnectScheduleTimeout = null;
            }
        } else {
            logger.debug("{} Skipping scheduleReconnect() because I have an active channel", (Object)this.logPrefix());
        }
    }

    public void run(int attempt) throws Exception {
        this.run(attempt, Duration.ZERO);
    }

    private void run(int attempt, Duration delay) throws Exception {
        this.reconnectSchedulerSync.set(false);
        this.reconnectScheduleTimeout = null;
        if (!this.isEventLoopGroupActive()) {
            logger.debug("isEventLoopGroupActive() == false");
            return;
        }
        if (!this.isListenOnChannelInactive()) {
            logger.debug("Skip reconnect scheduling, listener disabled");
            return;
        }
        if (this.isReconnectSuspended()) {
            logger.debug("Skip reconnect scheduling, reconnect is suspended");
            return;
        }
        boolean shouldLog = this.shouldLog();
        InternalLogLevel infoLevel = InternalLogLevel.INFO;
        InternalLogLevel warnLevel = InternalLogLevel.WARN;
        if (shouldLog) {
            this.lastReconnectionLogging = System.currentTimeMillis();
        } else {
            warnLevel = InternalLogLevel.DEBUG;
            infoLevel = InternalLogLevel.DEBUG;
        }
        InternalLogLevel warnLevelToUse = warnLevel;
        try {
            this.reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
            this.eventBus.publish(new ReconnectAttemptEvent(this.redisUri, this.epid, LocalAddress.ANY, this.remoteAddress, attempt, delay));
            logger.log(infoLevel, "Reconnecting, last destination was {}", (Object)this.remoteAddress);
            Tuple2<CompletableFuture<Channel>, CompletableFuture<SocketAddress>> tuple = this.reconnectionHandler.reconnect();
            CompletableFuture<Channel> future = tuple.getT1();
            future.whenComplete((c, t) -> {
                if (c != null && t == null) {
                    return;
                }
                CompletableFuture remoteAddressFuture = (CompletableFuture)tuple.getT2();
                SocketAddress remote = this.remoteAddress;
                if (remoteAddressFuture.isDone() && !remoteAddressFuture.isCompletedExceptionally() && !remoteAddressFuture.isCancelled()) {
                    remote = (SocketAddress)remoteAddressFuture.join();
                }
                String message = String.format("Cannot reconnect to [%s]: %s", remote, t.getMessage() != null ? t.getMessage() : t.toString());
                if (ReconnectionHandler.isExecutionException(t)) {
                    if (logger.isDebugEnabled()) {
                        logger.debug(message, (Throwable)t);
                    } else {
                        logger.log(warnLevelToUse, message);
                    }
                } else {
                    logger.log(warnLevelToUse, message, (Throwable)t);
                }
                this.eventBus.publish(new ReconnectFailedEvent(this.redisUri, this.epid, LocalAddress.ANY, remote, (Throwable)t, attempt));
                if (!this.isReconnectSuspended()) {
                    this.scheduleReconnect();
                }
            });
        }
        catch (Exception e) {
            logger.log(warnLevel, "Cannot reconnect: {}", (Object)e.toString());
            this.eventBus.publish(new ReconnectFailedEvent(this.redisUri, this.epid, LocalAddress.ANY, this.remoteAddress, e, attempt));
        }
    }

    private boolean isEventLoopGroupActive() {
        return ConnectionWatchdog.isEventLoopGroupActive(this.bootstrap.group()) && ConnectionWatchdog.isEventLoopGroupActive(this.reconnectWorkers);
    }

    private static boolean isEventLoopGroupActive(EventExecutorGroup executorService) {
        return !executorService.isShuttingDown();
    }

    private boolean shouldLog() {
        long quietUntil = this.lastReconnectionLogging + LOGGING_QUIET_TIME_MS;
        return quietUntil <= System.currentTimeMillis();
    }

    public void setListenOnChannelInactive(boolean listenOnChannelInactive) {
        this.listenOnChannelInactive = listenOnChannelInactive;
    }

    public boolean isListenOnChannelInactive() {
        return this.listenOnChannelInactive;
    }

    public void setReconnectSuspended(boolean reconnectSuspended) {
        this.reconnectionHandler.setReconnectSuspended(reconnectSuspended);
    }

    public boolean isReconnectSuspended() {
        return this.reconnectionHandler.isReconnectSuspended();
    }

    ReconnectionHandler getReconnectionHandler() {
        return this.reconnectionHandler;
    }

    private void resetReconnectDelay() {
        if (this.reconnectDelay instanceof Delay.StatefulDelay) {
            ((Delay.StatefulDelay)((Object)this.reconnectDelay)).reset();
        }
    }

    private String logPrefix() {
        String buffer;
        if (this.logPrefix != null) {
            return this.logPrefix;
        }
        this.logPrefix = buffer = "[" + ChannelLogDescriptor.logDescriptor(this.channel) + ", last known addr=" + this.remoteAddress + ']';
        return this.logPrefix;
    }
}

