package fun.common.net.output;

import fun.common.net.output.ReplicaOutputStream;
import fun.common.net.output.StreamProtocol;
import fun.common.util.NetUtils;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:fun/common/net/output/PacketSender.class */
public class PacketSender implements Closeable {
    private static final Logger logger = Logger.getLogger(PacketSender.class.getName());
    protected String tag;
    private final InetSocketAddress target;
    private final DataOutputStream out;
    private final DataInputStream ackIn;
    private final AckReadThread ackReadThread;
    private final int maxPackets;
    private volatile boolean closed;
    private volatile IOException lastException;
    private final Queue<StreamProtocol.StreamPacket> pendingSendQueue = new LinkedBlockingQueue();
    private final Queue<StreamProtocol.StreamPacket> pendingAckQueue = new LinkedBlockingQueue();
    private final Lock writeLock = new ReentrantLock(false);
    private final Lock readLock = new ReentrantLock(false);
    private final Condition notEmptyCondition = this.writeLock.newCondition();
    private final Condition notFullCondition = this.readLock.newCondition();
    private final PacketWriteThread packetWriteThread = new PacketWriteThread();

    /* loaded from: input_file:fun/common/net/output/PacketSender$AckReadThread.class */
    private class AckReadThread extends Thread {
        public AckReadThread() {
            setName("ACK read thread - " + NetUtils.desc(PacketSender.this.target));
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                while (true) {
                    try {
                        if ((!PacketSender.this.closed || !PacketSender.this.pendingSendQueue.isEmpty() || !PacketSender.this.pendingAckQueue.isEmpty()) && PacketSender.this.lastException == null) {
                            StreamProtocol.PacketACK read = StreamProtocol.ACK.read(PacketSender.this.ackIn);
                            if (read.getFailedNodes() != null) {
                                throw new ReplicaOutputStream.TargetFailureException(read.getFailedNodes());
                            }
                            if (PacketSender.logger.isLoggable(Level.FINER)) {
                                PacketSender.logger.finer(PacketSender.this.tag + "Received " + read);
                            }
                            StreamProtocol.StreamPacket streamPacket = (StreamProtocol.StreamPacket) PacketSender.this.pendingAckQueue.peek();
                            if (read.getSeqNo() != streamPacket.getSeqNo()) {
                                throw new IOException("Bad order of response, expect " + streamPacket.getSeqNo() + ", get " + read.getSeqNo());
                            }
                            PacketSender.this.pendingAckQueue.remove();
                            try {
                                PacketSender.this.readLock.lock();
                                PacketSender.this.notFullCondition.signal();
                                PacketSender.this.readLock.unlock();
                            } catch (Throwable th) {
                                PacketSender.this.readLock.unlock();
                                throw th;
                            }
                        }
                    } catch (IOException e) {
                        PacketSender.this.setLastException(e);
                        if (PacketSender.logger.isLoggable(Level.FINER)) {
                            PacketSender.logger.finer(PacketSender.this.tag + "ACK read thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                            return;
                        }
                        return;
                    } catch (Throwable th2) {
                        PacketSender.this.setLastException(new IOException(th2));
                        if (PacketSender.logger.isLoggable(Level.FINER)) {
                            PacketSender.logger.finer(PacketSender.this.tag + "ACK read thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                            return;
                        }
                        return;
                    }
                }
                if (PacketSender.logger.isLoggable(Level.FINER)) {
                    PacketSender.logger.finer(PacketSender.this.tag + "ACK read thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                }
            } catch (Throwable th3) {
                if (PacketSender.logger.isLoggable(Level.FINER)) {
                    PacketSender.logger.finer(PacketSender.this.tag + "ACK read thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                }
                throw th3;
            }
        }
    }

    /* loaded from: input_file:fun/common/net/output/PacketSender$PacketWriteThread.class */
    private class PacketWriteThread extends Thread {
        public PacketWriteThread() {
            setName("Packet write thread - " + NetUtils.desc(PacketSender.this.target));
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    PacketSender.this.writeLock.lock();
                    while (true) {
                        if (PacketSender.this.pendingSendQueue.isEmpty() && !PacketSender.this.closed && PacketSender.this.lastException == null) {
                            PacketSender.this.notEmptyCondition.awaitUninterruptibly();
                        }
                        if ((PacketSender.this.closed && PacketSender.this.pendingSendQueue.isEmpty() && PacketSender.this.pendingAckQueue.isEmpty()) || PacketSender.this.lastException != null) {
                            break;
                        }
                        if (!PacketSender.this.pendingSendQueue.isEmpty()) {
                            while (true) {
                                StreamProtocol.StreamPacket streamPacket = (StreamProtocol.StreamPacket) PacketSender.this.pendingSendQueue.peek();
                                if (streamPacket == null) {
                                    break;
                                }
                                PacketSender.this.pendingAckQueue.add(streamPacket);
                                PacketSender.this.pendingSendQueue.remove();
                                if (PacketSender.logger.isLoggable(Level.FINER)) {
                                    PacketSender.logger.finer(PacketSender.this.tag + "Write " + streamPacket);
                                }
                                StreamProtocol.PACKET.write(PacketSender.this.out, streamPacket);
                            }
                            PacketSender.this.out.flush();
                        }
                    }
                    PacketSender.this.writeLock.unlock();
                    if (PacketSender.logger.isLoggable(Level.FINER)) {
                        PacketSender.logger.finer(PacketSender.this.tag + "Packet write thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                    }
                } catch (IOException e) {
                    PacketSender.this.setLastException(e);
                    PacketSender.this.writeLock.unlock();
                    if (PacketSender.logger.isLoggable(Level.FINER)) {
                        PacketSender.logger.finer(PacketSender.this.tag + "Packet write thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                    }
                } catch (Throwable th) {
                    PacketSender.this.setLastException(new IOException(th));
                    PacketSender.this.writeLock.unlock();
                    if (PacketSender.logger.isLoggable(Level.FINER)) {
                        PacketSender.logger.finer(PacketSender.this.tag + "Packet write thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                    }
                }
            } catch (Throwable th2) {
                PacketSender.this.writeLock.unlock();
                if (PacketSender.logger.isLoggable(Level.FINER)) {
                    PacketSender.logger.finer(PacketSender.this.tag + "Packet write thread exit, closed=" + PacketSender.this.closed + ", queue=" + (PacketSender.this.pendingAckQueue.size() + PacketSender.this.pendingSendQueue.size()) + (PacketSender.this.lastException == null ? "" : " for exception " + PacketSender.this.lastException));
                }
                throw th2;
            }
        }
    }

    public PacketSender(InetSocketAddress inetSocketAddress, DataOutputStream dataOutputStream, DataInputStream dataInputStream, StreamConfig streamConfig) {
        this.tag = "{->" + NetUtils.desc(inetSocketAddress) + "}";
        this.target = inetSocketAddress;
        this.out = dataOutputStream;
        this.ackIn = dataInputStream;
        this.maxPackets = streamConfig.getMaxPackets();
        this.packetWriteThread.start();
        this.ackReadThread = new AckReadThread();
        this.ackReadThread.start();
    }

    public InetSocketAddress getTarget() {
        return this.target;
    }

    public void enqueuePacket(StreamProtocol.StreamPacket streamPacket) throws IOException {
        if (this.lastException != null) {
            throw this.lastException;
        }
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Enqueue " + streamPacket);
        }
        if (this.pendingSendQueue.size() + this.pendingAckQueue.size() > this.maxPackets) {
            try {
                this.readLock.lock();
                if (logger.isLoggable(Level.FINER)) {
                    logger.finer(this.tag + "Queue is full, waiting to enqueue packet " + streamPacket);
                }
                while (this.pendingSendQueue.size() + this.pendingAckQueue.size() > this.maxPackets && this.lastException == null) {
                    this.notFullCondition.awaitUninterruptibly();
                }
            } finally {
                this.readLock.unlock();
            }
        }
        if (this.lastException != null) {
            throw this.lastException;
        }
        try {
            this.writeLock.lock();
            this.pendingSendQueue.add(streamPacket);
            this.notEmptyCondition.signal();
            this.writeLock.unlock();
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    public Queue<StreamProtocol.StreamPacket> getUnSentPackets() {
        LinkedList linkedList = new LinkedList();
        Iterator<StreamProtocol.StreamPacket> it = this.pendingAckQueue.iterator();
        while (it.hasNext()) {
            linkedList.add(it.next());
        }
        Iterator<StreamProtocol.StreamPacket> it2 = this.pendingSendQueue.iterator();
        while (it2.hasNext()) {
            linkedList.add(it2.next());
        }
        return linkedList;
    }

    public void flush() throws IOException {
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Flush");
        }
        if (this.lastException != null) {
            throw this.lastException;
        }
        try {
            this.writeLock.lock();
            this.notEmptyCondition.signal();
            this.writeLock.unlock();
            if (!this.pendingSendQueue.isEmpty() || !this.pendingAckQueue.isEmpty()) {
                try {
                    this.readLock.lock();
                    while (true) {
                        if ((!this.pendingSendQueue.isEmpty() || !this.pendingAckQueue.isEmpty()) && this.lastException == null) {
                            this.notFullCondition.awaitUninterruptibly();
                        }
                    }
                } finally {
                    this.readLock.unlock();
                }
            }
            if (this.lastException != null) {
                throw this.lastException;
            }
        } catch (Throwable th) {
            this.writeLock.unlock();
            throw th;
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.closed) {
            if (this.lastException != null) {
                throw this.lastException;
            }
            return;
        }
        this.closed = true;
        flush();
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Closed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setTag(String str) {
        this.tag = str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setLastException(IOException iOException) {
        if (this.lastException == null) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, this.tag + "Found exception", (Throwable) iOException);
            }
            this.lastException = iOException;
        }
        try {
            this.writeLock.lock();
            this.notEmptyCondition.signal();
            this.writeLock.unlock();
            try {
                this.readLock.lock();
                this.notFullCondition.signal();
                this.readLock.unlock();
            } catch (Throwable th) {
                this.readLock.unlock();
                throw th;
            }
        } catch (Throwable th2) {
            this.writeLock.unlock();
            throw th2;
        }
    }
}
