package lsr.paxos.network;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import lsr.common.KillOnExceptionHandler;
import lsr.common.PID;
import lsr.common.ProcessDescriptor;
import lsr.paxos.messages.Message;
import lsr.paxos.messages.MessageFactory;

/* loaded from: input_file:lsr/paxos/network/TcpConnection.class */
public class TcpConnection {
    private Socket socket;
    private DataInputStream input;
    private DataOutputStream output;
    private final PID replica;
    private final boolean active;
    private final TcpNetwork network;
    private final Thread senderThread;
    private final Thread receiverThread;
    private static final Logger logger;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean connected = false;
    private final ArrayBlockingQueue<byte[]> sendQueue = new ArrayBlockingQueue<>(128);

    /* loaded from: input_file:lsr/paxos/network/TcpConnection$ReceiverThread.class */
    final class ReceiverThread implements Runnable {
        ReceiverThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                TcpConnection.logger.info("Waiting for tcp connection to " + TcpConnection.this.replica.getId());
                try {
                    TcpConnection.this.connect();
                    TcpConnection.logger.info("Tcp connected " + TcpConnection.this.replica.getId());
                    while (!Thread.interrupted()) {
                        try {
                            Message create = MessageFactory.create(TcpConnection.this.input);
                            if (TcpConnection.logger.isLoggable(Level.FINE)) {
                                TcpConnection.logger.fine("Received [" + TcpConnection.this.replica.getId() + "] " + create + " size: " + create.byteSize());
                            }
                            TcpConnection.this.network.fireReceiveMessage(create, TcpConnection.this.replica.getId());
                        } catch (IllegalArgumentException e) {
                            TcpConnection.logger.log(Level.SEVERE, "Error deserializing msg", (Throwable) e);
                            TcpConnection.this.close();
                        }
                    }
                    TcpConnection.logger.severe("Receiver thread has been interupted.");
                    TcpConnection.this.close();
                    return;
                } catch (InterruptedException e2) {
                    TcpConnection.logger.severe("Receiver thread has been interupted.");
                    return;
                }
            }
        }
    }

    /* loaded from: input_file:lsr/paxos/network/TcpConnection$Sender.class */
    final class Sender implements Runnable {
        Sender() {
        }

        @Override // java.lang.Runnable
        public void run() {
            TcpConnection.logger.info("Sender thread started.");
            while (true) {
                try {
                    byte[] bArr = (byte[]) TcpConnection.this.sendQueue.take();
                    synchronized (TcpConnection.this) {
                        if (TcpConnection.this.connected) {
                            try {
                                TcpConnection.this.output.write(bArr);
                                TcpConnection.this.output.flush();
                            } catch (IOException e) {
                                TcpConnection.logger.log(Level.WARNING, "Error sending message", (Throwable) e);
                            }
                        }
                    }
                } catch (InterruptedException e2) {
                    TcpConnection.logger.severe("Sender thread has been interupted and stopped.");
                    return;
                }
            }
        }
    }

    public TcpConnection(TcpNetwork tcpNetwork, PID pid, boolean z) {
        this.network = tcpNetwork;
        this.replica = pid;
        this.active = z;
        logger.info("Creating connection: " + pid + " - " + z);
        this.receiverThread = new Thread(new ReceiverThread(), "TcpReceiver" + this.replica.getId());
        this.senderThread = new Thread(new Sender(), "TcpSender" + this.replica.getId());
        this.receiverThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
        this.senderThread.setUncaughtExceptionHandler(new KillOnExceptionHandler());
    }

    public synchronized void start() {
        this.receiverThread.start();
        this.senderThread.start();
    }

    public boolean send(byte[] bArr) {
        try {
            this.sendQueue.put(bArr);
            return true;
        } catch (InterruptedException e) {
            throw new RuntimeException("Thread interrupted", e);
        }
    }

    public synchronized void setConnection(Socket socket, DataInputStream dataInputStream, DataOutputStream dataOutputStream) {
        if (!$assertionsDisabled && !socket.isConnected()) {
            throw new AssertionError("Invalid socket state");
        }
        close();
        this.socket = socket;
        this.input = dataInputStream;
        this.output = dataOutputStream;
        this.connected = true;
        notifyAll();
    }

    public void stop() throws InterruptedException {
        close();
        this.receiverThread.interrupt();
        this.senderThread.interrupt();
        this.receiverThread.join();
        this.senderThread.join();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void connect() throws InterruptedException {
        if (!this.active) {
            synchronized (this) {
                while (!this.connected) {
                    wait();
                }
            }
            return;
        }
        while (true) {
            try {
                this.socket = new Socket();
                this.socket.setReceiveBufferSize(131072);
                this.socket.setSendBufferSize(131072);
                logger.fine("RcvdBuffer: " + this.socket.getReceiveBufferSize() + ", SendBuffer: " + this.socket.getSendBufferSize());
                this.socket.setTcpNoDelay(true);
                logger.info("Connecting to: " + this.replica);
                try {
                    this.socket.connect(new InetSocketAddress(this.replica.getHostname(), this.replica.getReplicaPort()));
                    this.input = new DataInputStream(new BufferedInputStream(this.socket.getInputStream()));
                    this.output = new DataOutputStream(new BufferedOutputStream(this.socket.getOutputStream()));
                    this.output.writeInt(ProcessDescriptor.getInstance().localId);
                    this.output.flush();
                    synchronized (this) {
                        this.connected = true;
                        notifyAll();
                    }
                    return;
                } catch (ConnectException e) {
                    logger.warning("TCP connection with replica " + this.replica.getId() + " failed");
                    Thread.sleep(ProcessDescriptor.getInstance().tcpReconnectTimeout);
                }
            } catch (IOException e2) {
                logger.log(Level.WARNING, "Error connecting to " + this.replica, (Throwable) e2);
                Thread.sleep(ProcessDescriptor.getInstance().tcpReconnectTimeout);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void close() {
        if (this.socket != null && this.socket.isConnected()) {
            logger.info("Closing socket ...");
            try {
                this.socket.shutdownOutput();
                this.socket.close();
                this.socket = null;
                logger.info("Socket closed.");
            } catch (IOException e) {
                logger.warning("Error closing socket: " + e.getMessage());
            }
        }
        this.connected = false;
    }

    static {
        $assertionsDisabled = !TcpConnection.class.desiredAssertionStatus();
        logger = Logger.getLogger(TcpConnection.class.getCanonicalName());
    }
}
