package fun.common.net.output;

import fun.common.net.output.StreamProtocol;
import fun.common.util.NetUtils;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:fun/common/net/output/PacketReceiver.class */
public class PacketReceiver implements InputProcessor {
    private static final Logger logger = Logger.getLogger(PacketReceiver.class.getName());
    protected String tag;
    protected final InetSocketAddress localAddress;
    protected final InetSocketAddress client;
    protected final DataInputStream packetIn;
    protected final DataOutputStream ackOut;
    protected final StreamConfig config;
    protected final InputEventHandler inputEventHandler;
    private Object streamID;
    private IOException lastException;
    private boolean closed;
    private final LinkedBlockingQueue<StreamProtocol.PacketACK> ackQueue;
    private final AckWriteThread ackWriteThread;
    private final Lock ackWriteLock;
    private final Condition ackFinishedCondition;

    /* loaded from: input_file:fun/common/net/output/PacketReceiver$AckWriteThread.class */
    private class AckWriteThread extends Thread {
        public AckWriteThread() throws IOException {
            setName("ACK write thread - " + NetUtils.desc(PacketReceiver.this.client));
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    try {
                        try {
                            if (PacketReceiver.this.closed && PacketReceiver.this.ackQueue.isEmpty()) {
                                break;
                            }
                            StreamProtocol.PacketACK packetACK = (StreamProtocol.PacketACK) PacketReceiver.this.ackQueue.poll(50L, TimeUnit.MILLISECONDS);
                            if (packetACK != null) {
                                do {
                                    PacketReceiver.this.onWriteACK(packetACK);
                                    if (PacketReceiver.logger.isLoggable(Level.FINER)) {
                                        PacketReceiver.logger.finer(PacketReceiver.this.tag + "Write " + packetACK);
                                    }
                                    StreamProtocol.ACK.write(PacketReceiver.this.ackOut, packetACK);
                                    packetACK = (StreamProtocol.PacketACK) PacketReceiver.this.ackQueue.poll();
                                } while (packetACK != null);
                                PacketReceiver.this.ackOut.flush();
                            }
                        } catch (Throwable th) {
                            if (PacketReceiver.logger.isLoggable(Level.FINER)) {
                                PacketReceiver.logger.finer(PacketReceiver.this.tag + "ACK write thread exit, closed=" + PacketReceiver.this.closed + ", notAckedMesssagesTotal=" + PacketReceiver.this.ackQueue.size() + (PacketReceiver.this.lastException == null ? "" : " for exception " + PacketReceiver.this.lastException.getMessage()));
                            }
                            try {
                                PacketReceiver.this.ackWriteLock.lock();
                                PacketReceiver.this.ackFinishedCondition.signal();
                                PacketReceiver.this.ackWriteLock.unlock();
                                throw th;
                            } finally {
                            }
                        }
                    } catch (IOException e) {
                        PacketReceiver.this.setLastException(e);
                        if (PacketReceiver.logger.isLoggable(Level.FINER)) {
                            PacketReceiver.logger.finer(PacketReceiver.this.tag + "ACK write thread exit, closed=" + PacketReceiver.this.closed + ", notAckedMesssagesTotal=" + PacketReceiver.this.ackQueue.size() + (PacketReceiver.this.lastException == null ? "" : " for exception " + PacketReceiver.this.lastException.getMessage()));
                        }
                        try {
                            PacketReceiver.this.ackWriteLock.lock();
                            PacketReceiver.this.ackFinishedCondition.signal();
                            PacketReceiver.this.ackWriteLock.unlock();
                            return;
                        } finally {
                            PacketReceiver.this.ackFinishedCondition.signal();
                            PacketReceiver.this.ackWriteLock.unlock();
                        }
                    }
                } catch (Throwable th2) {
                    PacketReceiver.this.setLastException(new IOException(th2));
                    if (PacketReceiver.logger.isLoggable(Level.FINER)) {
                        PacketReceiver.logger.finer(PacketReceiver.this.tag + "ACK write thread exit, closed=" + PacketReceiver.this.closed + ", notAckedMesssagesTotal=" + PacketReceiver.this.ackQueue.size() + (PacketReceiver.this.lastException == null ? "" : " for exception " + PacketReceiver.this.lastException.getMessage()));
                    }
                    try {
                        PacketReceiver.this.ackWriteLock.lock();
                        PacketReceiver.this.ackFinishedCondition.signal();
                        PacketReceiver.this.ackWriteLock.unlock();
                        return;
                    } finally {
                        PacketReceiver.this.ackFinishedCondition.signal();
                        PacketReceiver.this.ackWriteLock.unlock();
                    }
                }
            }
            if (PacketReceiver.logger.isLoggable(Level.FINER)) {
                PacketReceiver.logger.finer(PacketReceiver.this.tag + "ACK write thread exit, closed=" + PacketReceiver.this.closed + ", notAckedMesssagesTotal=" + PacketReceiver.this.ackQueue.size() + (PacketReceiver.this.lastException == null ? "" : " for exception " + PacketReceiver.this.lastException.getMessage()));
            }
            try {
                PacketReceiver.this.ackWriteLock.lock();
                PacketReceiver.this.ackFinishedCondition.signal();
                PacketReceiver.this.ackWriteLock.unlock();
            } finally {
            }
        }
    }

    public PacketReceiver(NetSocket netSocket, StreamConfig streamConfig) throws IOException {
        if (streamConfig.getNetSocketFactory() == null) {
            throw new NullPointerException("The config should set a NetSocketFactory properly");
        }
        this.localAddress = streamConfig.getNetSocketFactory().getLocalAddress();
        this.client = netSocket.getRemoteAddress();
        this.tag = "{" + NetUtils.desc(this.localAddress) + "}";
        this.ackQueue = new LinkedBlockingQueue<>();
        this.config = streamConfig;
        this.inputEventHandler = streamConfig.getInputEventHandler();
        this.packetIn = netSocket.getInputStream();
        this.ackOut = netSocket.getOutputStream();
        this.ackWriteLock = new ReentrantLock(false);
        this.ackFinishedCondition = this.ackWriteLock.newCondition();
        this.ackWriteThread = new AckWriteThread();
        this.ackWriteThread.start();
        onOpen();
    }

    private void close() throws IOException {
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Close");
        }
        if (this.closed) {
            if (this.lastException != null) {
                throw this.lastException;
            }
            return;
        }
        this.closed = true;
        onClose();
        if (this.lastException != null) {
            throw this.lastException;
        }
        if (!this.ackQueue.isEmpty()) {
            try {
                this.ackWriteLock.lock();
                if (!this.ackQueue.isEmpty() && this.lastException == null) {
                    this.ackFinishedCondition.awaitUninterruptibly();
                }
            } finally {
                this.ackWriteLock.unlock();
            }
        }
        if (this.lastException != null) {
            throw this.lastException;
        }
    }

    /* JADX WARN: Finally extract failed */
    @Override // fun.common.net.output.InputProcessor
    public void process() throws IOException {
        StreamProtocol.StreamPacket streamPacket = null;
        StreamProtocol.PacketACK packetACK = null;
        do {
            try {
                try {
                    StreamProtocol.StreamPacket receivePacket = receivePacket();
                    streamPacket = receivePacket;
                    if (receivePacket == null) {
                        break;
                    }
                    packetACK = new StreamProtocol.PacketACK(streamPacket.getSeqNo());
                    try {
                        onReceivedPacket(streamPacket, packetACK);
                    } catch (Throwable th) {
                        if (logger.isLoggable(Level.WARNING)) {
                            logger.log(Level.WARNING, this.tag + "Found exception", (Throwable) this.lastException);
                        }
                        packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.localAddress, StreamProtocol.NodeStatus.FAIL));
                    }
                    if (streamPacket.isEnd()) {
                        break;
                    } else {
                        addAck(packetACK);
                    }
                } catch (Throwable th2) {
                    try {
                        try {
                            close();
                            if (streamPacket != null && streamPacket.isEnd()) {
                                addAck(packetACK);
                            }
                        } catch (Throwable th3) {
                            if (th3 != this.lastException && logger.isLoggable(Level.WARNING)) {
                                logger.log(Level.WARNING, this.tag + "Found exception", (Throwable) this.lastException);
                            }
                            if (streamPacket != null && streamPacket.isEnd()) {
                                packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.localAddress, StreamProtocol.NodeStatus.FAIL));
                            }
                            if (streamPacket != null && streamPacket.isEnd()) {
                                addAck(packetACK);
                            }
                            throw th2;
                        }
                        throw th2;
                    } catch (Throwable th4) {
                        if (streamPacket != null && streamPacket.isEnd()) {
                            addAck(packetACK);
                        }
                        throw th4;
                    }
                }
            } finally {
                try {
                    this.ackWriteThread.join(this.config.getBasicTimeout());
                } catch (InterruptedException e) {
                }
            }
        } while (packetACK.getFailedNodes() == null);
        try {
            try {
                close();
                if (streamPacket != null && streamPacket.isEnd()) {
                    addAck(packetACK);
                }
            } catch (Throwable th5) {
                if (th5 != this.lastException && logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, this.tag + "Found exception", (Throwable) this.lastException);
                }
                if (streamPacket != null && streamPacket.isEnd()) {
                    packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.localAddress, StreamProtocol.NodeStatus.FAIL));
                }
                if (streamPacket != null && streamPacket.isEnd()) {
                    addAck(packetACK);
                }
            }
        } catch (Throwable th6) {
            if (streamPacket != null && streamPacket.isEnd()) {
                addAck(packetACK);
            }
            throw th6;
        }
    }

    protected void onOpen() throws IOException {
        if (this.inputEventHandler != null) {
            this.streamID = this.inputEventHandler.onOpen();
        }
    }

    protected void onClose() throws IOException {
        if (this.inputEventHandler != null) {
            this.inputEventHandler.onClose(this.streamID);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void onReceivedPacket(StreamProtocol.StreamPacket streamPacket, StreamProtocol.PacketACK packetACK) throws IOException {
        if (this.inputEventHandler == null || streamPacket.getData().length <= 0) {
            return;
        }
        this.inputEventHandler.onReceived(this.streamID, streamPacket.getData());
    }

    protected void onWriteACK(StreamProtocol.PacketACK packetACK) throws IOException {
    }

    private StreamProtocol.StreamPacket receivePacket() throws IOException {
        if (this.lastException != null) {
            throw this.lastException;
        }
        StreamProtocol.StreamPacket read = StreamProtocol.PACKET.read(this.packetIn);
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Received " + read);
        }
        return read;
    }

    private void addAck(StreamProtocol.PacketACK packetACK) throws IOException {
        if (this.lastException != null) {
            throw this.lastException;
        }
        Iterator<StreamProtocol.PacketACK> it = this.ackQueue.iterator();
        while (it.hasNext()) {
            if (it.next().getSeqNo() == packetACK.getSeqNo()) {
                return;
            }
        }
        this.ackQueue.add(packetACK);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setLastException(IOException iOException) {
        if (this.lastException == null) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, this.tag + "Found exception", (Throwable) iOException);
            }
            this.lastException = iOException;
        }
    }
}
