package net.sf.xenqtt.proxy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import net.sf.xenqtt.Log;
import net.sf.xenqtt.message.ChannelManager;
import net.sf.xenqtt.message.ChannelManagerImpl;
import net.sf.xenqtt.message.ConnAckMessage;
import net.sf.xenqtt.message.ConnectMessage;
import net.sf.xenqtt.message.ConnectReturnCode;
import net.sf.xenqtt.message.DisconnectMessage;
import net.sf.xenqtt.message.IdentifiableMqttMessage;
import net.sf.xenqtt.message.MessageHandler;
import net.sf.xenqtt.message.MqttChannel;
import net.sf.xenqtt.message.MqttMessage;
import net.sf.xenqtt.message.PubAckMessage;
import net.sf.xenqtt.message.PubCompMessage;
import net.sf.xenqtt.message.PubMessage;
import net.sf.xenqtt.message.PubRecMessage;
import net.sf.xenqtt.message.PubRelMessage;
import net.sf.xenqtt.message.SubAckMessage;
import net.sf.xenqtt.message.SubscribeMessage;
import net.sf.xenqtt.message.UnsubAckMessage;
import net.sf.xenqtt.message.UnsubscribeMessage;
import org.apache.log4j.helpers.FileWatchdog;

/* loaded from: input_file:net/sf/xenqtt/proxy/ProxySession.class */
class ProxySession implements MessageHandler {
    private final int maxInFlightBrokerMessages;
    private final Map<Integer, MessageSource> messageSourceByBrokerMessageId;
    private final Map<MqttChannel, ConnectMessage> connectMessageByChannelPendingAttach;
    private final Set<MqttChannel> channelsPendingBroker;
    private final ChannelManager channelManager;
    private final String brokerUri;
    private final String clientId;
    private final ConnectMessage originalConnectMessage;
    private final List<MqttChannel> channelsToClients;
    private MqttChannel channelToBroker;
    private ConnectReturnCode brokerConnectReturnCode;
    private ConnectionState brokerConnectionState;
    private int nextIdToBroker;
    private int nextBrokerChannelIndex;
    private long enablePauseMessageTime;
    private volatile boolean sessionClosed;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/xenqtt/proxy/ProxySession$ConnectionState.class */
    public enum ConnectionState {
        PENDING,
        CONNECTED,
        DISCONNECTED
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/xenqtt/proxy/ProxySession$MessageSource.class */
    public static class MessageSource {
        private final int sourceMessageId;
        private final MqttChannel sourceChannel;

        public MessageSource(int i, MqttChannel mqttChannel) {
            this.sourceMessageId = i;
            this.sourceChannel = mqttChannel;
        }
    }

    public ProxySession(String str, ConnectMessage connectMessage, int i) {
        this(str, connectMessage, new ChannelManagerImpl(0L), i);
    }

    ProxySession(String str, ConnectMessage connectMessage, ChannelManager channelManager, int i) {
        this.messageSourceByBrokerMessageId = new HashMap();
        this.connectMessageByChannelPendingAttach = new ConcurrentHashMap();
        this.channelsPendingBroker = new HashSet();
        this.channelsToClients = new ArrayList();
        this.brokerConnectionState = ConnectionState.PENDING;
        this.nextIdToBroker = 1;
        this.brokerUri = str;
        this.originalConnectMessage = connectMessage;
        this.channelManager = channelManager;
        this.maxInFlightBrokerMessages = i;
        this.clientId = this.originalConnectMessage.getClientId();
    }

    public void init() {
        this.channelManager.init();
        this.channelManager.newClientChannel(this.brokerUri, this);
    }

    public void shutdown() {
        this.channelManager.shutdown();
        this.sessionClosed = true;
    }

    public boolean isClosed() {
        return this.sessionClosed;
    }

    public String getClientId() {
        return this.clientId;
    }

    public boolean newConnection(MqttChannel mqttChannel, ConnectMessage connectMessage) {
        if (this.sessionClosed) {
            return false;
        }
        this.connectMessageByChannelPendingAttach.put(mqttChannel, connectMessage);
        this.channelManager.attachChannel(mqttChannel, this);
        return true;
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void connect(MqttChannel mqttChannel, ConnectMessage connectMessage) throws Exception {
        if (mqttChannel == this.channelToBroker) {
            Log.warn("Received a %s message from the broker at %s. This should never happen. clientId=%s", connectMessage.getMessageType(), mqttChannel.getRemoteAddress(), this.clientId);
        } else {
            Log.warn("Received a %s message from clustered client %s. This should never happen. clientId=%s", connectMessage.getMessageType(), mqttChannel.getRemoteAddress(), this.clientId);
        }
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void connAck(MqttChannel mqttChannel, ConnAckMessage connAckMessage) throws Exception {
        if (mqttChannel != this.channelToBroker) {
            Log.warn("Received a %s message from clustered client %s. This should never happen. clientId=%s", connAckMessage.getMessageType(), mqttChannel.getRemoteAddress(), this.clientId);
            return;
        }
        this.brokerConnectReturnCode = connAckMessage.getReturnCode();
        if (this.brokerConnectReturnCode == ConnectReturnCode.ACCEPTED) {
            this.brokerConnectionState = ConnectionState.CONNECTED;
        } else {
            Log.info("Broker %s rejected connect attempt with return code %s", this.brokerUri, this.brokerConnectReturnCode);
            this.brokerConnectionState = ConnectionState.DISCONNECTED;
        }
        Iterator<MqttChannel> it = this.channelsPendingBroker.iterator();
        while (it.hasNext()) {
            newSessionClient(it.next());
        }
        this.channelsPendingBroker.clear();
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void publish(MqttChannel mqttChannel, PubMessage pubMessage) throws Exception {
        forwardMessage(mqttChannel, pubMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void pubAck(MqttChannel mqttChannel, PubAckMessage pubAckMessage) throws Exception {
        forwardMessage(mqttChannel, pubAckMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void pubRec(MqttChannel mqttChannel, PubRecMessage pubRecMessage) throws Exception {
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void pubRel(MqttChannel mqttChannel, PubRelMessage pubRelMessage) throws Exception {
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void pubComp(MqttChannel mqttChannel, PubCompMessage pubCompMessage) throws Exception {
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void subscribe(MqttChannel mqttChannel, SubscribeMessage subscribeMessage) throws Exception {
        forwardMessage(mqttChannel, subscribeMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void subAck(MqttChannel mqttChannel, SubAckMessage subAckMessage) throws Exception {
        forwardMessage(mqttChannel, subAckMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void unsubscribe(MqttChannel mqttChannel, UnsubscribeMessage unsubscribeMessage) throws Exception {
        forwardMessage(mqttChannel, unsubscribeMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void unsubAck(MqttChannel mqttChannel, UnsubAckMessage unsubAckMessage) throws Exception {
        forwardMessage(mqttChannel, unsubAckMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void disconnect(MqttChannel mqttChannel, DisconnectMessage disconnectMessage) throws Exception {
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void channelOpened(MqttChannel mqttChannel) {
        this.channelToBroker = mqttChannel;
        this.channelToBroker.send(this.originalConnectMessage);
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void channelClosed(MqttChannel mqttChannel, Throwable th) {
        if (mqttChannel == this.channelToBroker) {
            this.brokerConnectionState = ConnectionState.DISCONNECTED;
            Iterator<MqttChannel> it = this.channelsToClients.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.channelsToClients.clear();
            this.sessionClosed = true;
            return;
        }
        if (this.brokerConnectionState != ConnectionState.DISCONNECTED) {
            Iterator<MessageSource> it2 = this.messageSourceByBrokerMessageId.values().iterator();
            while (it2.hasNext()) {
                if (it2.next().sourceChannel == mqttChannel) {
                    it2.remove();
                    if (this.messageSourceByBrokerMessageId.size() == this.maxInFlightBrokerMessages - 1) {
                        resumeRead();
                    }
                }
            }
            this.channelsToClients.remove(mqttChannel);
            if (this.channelsToClients.isEmpty()) {
                this.channelToBroker.send(new DisconnectMessage());
            } else {
                distributeMessagesForChannel(mqttChannel);
            }
        }
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void channelAttached(MqttChannel mqttChannel) {
        ConnectMessage remove = this.connectMessageByChannelPendingAttach.remove(mqttChannel);
        if (remove == null) {
            Log.warn("Channel attached with no connect message. This is most likely a bug. clientId: %s, channel: %s", this.clientId, mqttChannel);
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.OTHER));
            return;
        }
        if (remove.isCleanSession()) {
            Log.warn("Proxied connections cannot have the clean session flag set in the connect message; clientId: %s", this.clientId);
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.OTHER));
            return;
        }
        if (this.originalConnectMessage.getProtocolVersion() != remove.getProtocolVersion()) {
            Log.warn("Connect message protocol version does not match; clientId: %s, expected: %d, actual: %d", this.clientId, Integer.valueOf(this.originalConnectMessage.getProtocolVersion()), Integer.valueOf(remove.getProtocolVersion()));
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.UNACCEPTABLE_PROTOCOL_VERSION));
            return;
        }
        if (!stringEquqls(this.originalConnectMessage.getProtocolName(), remove.getProtocolName())) {
            Log.warn("Connect message protocol name does not match; clientId: %s, expected: %s, actual: %s", this.clientId, this.originalConnectMessage.getProtocolName(), remove.getProtocolName());
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.OTHER));
            return;
        }
        if (this.originalConnectMessage.isUserNameFlag() != remove.isUserNameFlag() || this.originalConnectMessage.isPasswordFlag() != remove.isPasswordFlag() || !stringEquqls(this.originalConnectMessage.getUserName(), remove.getUserName()) || !stringEquqls(this.originalConnectMessage.getPassword(), remove.getPassword())) {
            Log.warn("Connect message username/password does not match; clientId: %s, expected: %s/%s, actual: %s/%s", this.clientId, this.originalConnectMessage.getUserName(), this.originalConnectMessage.getPassword(), remove.getUserName(), remove.getPassword());
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.BAD_CREDENTIALS));
            return;
        }
        if (this.originalConnectMessage.isWillMessageFlag() != remove.isWillMessageFlag() || this.originalConnectMessage.isWillRetain() != remove.isWillRetain() || this.originalConnectMessage.getWillQoSLevel() != remove.getWillQoSLevel() || !stringEquqls(this.originalConnectMessage.getWillTopic(), remove.getWillTopic()) || !stringEquqls(this.originalConnectMessage.getWillMessage(), remove.getWillMessage())) {
            Log.warn("Connect message will message config does not match; clientId: %s, expected topic|qos|retain|message: %s|%s|%s|%s, actual topic|qos|retain|message: %s|%s|%s|%s", remove.getClientId(), this.originalConnectMessage.getWillTopic(), this.originalConnectMessage.getWillQoS(), Boolean.valueOf(this.originalConnectMessage.isWillRetain()), this.originalConnectMessage.getWillMessage(), remove.getWillTopic(), remove.getWillQoS(), Boolean.valueOf(remove.isWillRetain()), remove.getWillMessage());
            mqttChannel.send(new ConnAckMessage(ConnectReturnCode.OTHER));
        } else if (this.brokerConnectionState != ConnectionState.PENDING) {
            newSessionClient(mqttChannel);
        } else {
            Log.debug("New client connection waiting for broker connection to complete; client ID: %s", this.clientId);
            this.channelsPendingBroker.add(mqttChannel);
        }
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void channelDetached(MqttChannel mqttChannel) {
    }

    @Override // net.sf.xenqtt.message.MessageHandler
    public void messageSent(MqttChannel mqttChannel, MqttMessage mqttMessage) {
    }

    private void newSessionClient(MqttChannel mqttChannel) {
        if (!$assertionsDisabled && this.brokerConnectionState == ConnectionState.PENDING) {
            throw new AssertionError();
        }
        if (this.brokerConnectionState == ConnectionState.DISCONNECTED) {
            Log.warn("Attempting to connect a clustered client to a session with a closed broker connection. clientId: %s, address: %s", this.clientId, mqttChannel.getRemoteAddress());
            ConnectReturnCode connectReturnCode = this.brokerConnectReturnCode == null ? ConnectReturnCode.OTHER : this.brokerConnectReturnCode;
            if (connectReturnCode == ConnectReturnCode.ACCEPTED) {
                connectReturnCode = ConnectReturnCode.SERVER_UNAVAILABLE;
            }
            mqttChannel.send(new ConnAckMessage(connectReturnCode));
            return;
        }
        Log.info("New client connection accepted into cluster; clientId: %s, address: %s", this.clientId, mqttChannel.getRemoteAddress());
        mqttChannel.send(new ConnAckMessage(this.brokerConnectReturnCode));
        if (this.messageSourceByBrokerMessageId.size() == this.maxInFlightBrokerMessages) {
            mqttChannel.pauseRead();
        }
        this.channelsToClients.add(mqttChannel);
    }

    private void forwardMessage(MqttChannel mqttChannel, IdentifiableMqttMessage identifiableMqttMessage) {
        if (mqttChannel == this.channelToBroker) {
            forwardToClient(identifiableMqttMessage);
        } else {
            forwardToBroker(mqttChannel, identifiableMqttMessage);
        }
    }

    private void forwardToBroker(MqttChannel mqttChannel, IdentifiableMqttMessage identifiableMqttMessage) {
        if (identifiableMqttMessage.isAckable()) {
            int messageId = identifiableMqttMessage.getMessageId();
            int nextIdToBroker = nextIdToBroker();
            identifiableMqttMessage.setMessageId(nextIdToBroker);
            this.messageSourceByBrokerMessageId.put(Integer.valueOf(nextIdToBroker), new MessageSource(messageId, mqttChannel));
            if (this.messageSourceByBrokerMessageId.size() == this.maxInFlightBrokerMessages) {
                pauseRead();
            }
        }
        this.channelToBroker.send(identifiableMqttMessage);
    }

    private void forwardToClient(IdentifiableMqttMessage identifiableMqttMessage) {
        if (!identifiableMqttMessage.isAck()) {
            MqttChannel leastBusyChannelToClient = getLeastBusyChannelToClient();
            if (leastBusyChannelToClient != null) {
                leastBusyChannelToClient.send(identifiableMqttMessage);
                return;
            }
            return;
        }
        MessageSource remove = this.messageSourceByBrokerMessageId.remove(Integer.valueOf(identifiableMqttMessage.getMessageId()));
        if (remove != null) {
            identifiableMqttMessage.setMessageId(remove.sourceMessageId);
            remove.sourceChannel.send(identifiableMqttMessage);
            if (this.messageSourceByBrokerMessageId.size() == this.maxInFlightBrokerMessages - 1) {
                resumeRead();
            }
        }
    }

    private void pauseRead() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis > this.enablePauseMessageTime) {
            this.enablePauseMessageTime = currentTimeMillis + FileWatchdog.DEFAULT_DELAY;
            Log.warn("There are too many in-flight (unacknowledged) messages to the broker in cluster with client ID %s. No more message IDs are available. This means the broker is unable to keep up with the rate your clients are publishing messages. The proxy is pausing accepting messages from clients in the cluster until the broker acknowledges an existing in-flight message. This will not cause data loss but will make message publishing slow down. This log message will be disabled for 60 seconds.", this.clientId);
        }
        Iterator<MqttChannel> it = this.channelsToClients.iterator();
        while (it.hasNext()) {
            it.next().pauseRead();
        }
    }

    private void resumeRead() {
        Iterator<MqttChannel> it = this.channelsToClients.iterator();
        while (it.hasNext()) {
            it.next().resumeRead();
        }
    }

    private MqttChannel getLeastBusyChannelToClient() {
        int i = Integer.MAX_VALUE;
        MqttChannel mqttChannel = null;
        int i2 = this.nextBrokerChannelIndex;
        int size = this.channelsToClients.size();
        for (int i3 = 0; i3 < size; i3++) {
            int i4 = i2;
            i2++;
            int i5 = i4 % size;
            MqttChannel mqttChannel2 = this.channelsToClients.get(i5);
            int inFlightMessageCount = mqttChannel2.inFlightMessageCount() + mqttChannel2.sendQueueDepth();
            if (inFlightMessageCount < i) {
                i = inFlightMessageCount;
                mqttChannel = mqttChannel2;
                this.nextBrokerChannelIndex = i5 + 1;
            }
        }
        return mqttChannel;
    }

    private int nextIdToBroker() {
        for (int i = 0; i < this.maxInFlightBrokerMessages; i++) {
            if (this.nextIdToBroker > this.maxInFlightBrokerMessages) {
                this.nextIdToBroker = 1;
            }
            int i2 = this.nextIdToBroker;
            this.nextIdToBroker = i2 + 1;
            if (!this.messageSourceByBrokerMessageId.containsKey(Integer.valueOf(i2))) {
                return i2;
            }
        }
        Log.error("Unable to generate message ID to broker. THIS IS A BUG!!", new Object[0]);
        return 0;
    }

    private boolean stringEquqls(String str, String str2) {
        if (str == str2) {
            return true;
        }
        if (str == null || str2 == null) {
            return false;
        }
        return str.equals(str2);
    }

    private void distributeMessagesForChannel(MqttChannel mqttChannel) {
        for (MqttMessage mqttMessage : mqttChannel.getUnsentMessages()) {
            MqttChannel leastBusyChannelToClient = getLeastBusyChannelToClient();
            if (leastBusyChannelToClient != null) {
                leastBusyChannelToClient.send(mqttMessage);
            }
        }
    }

    static {
        $assertionsDisabled = !ProxySession.class.desiredAssertionStatus();
    }
}
