package fun.common.net.output;

import fun.common.net.output.StreamProtocol;
import fun.common.util.NetUtils;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:fun/common/net/output/ChainInputProcessor.class */
public class ChainInputProcessor extends PacketReceiver {
    private static final Logger logger = Logger.getLogger(ChainInputProcessor.class.getName());
    protected final StreamProtocol.StreamHeader streamHeader;
    private InetSocketAddress nextNode;
    private NetSocket outSocket;
    private DataOutputStream outStream;
    private DataInputStream ackInStream;
    private boolean isLastNode;
    private long nextAckedSeq;

    public ChainInputProcessor(StreamProtocol.StreamHeader streamHeader, NetSocket netSocket, StreamConfig streamConfig) throws IOException {
        super(netSocket, streamConfig);
        StreamProtocol.PacketACK packetACK;
        this.nextAckedSeq = -1L;
        this.streamHeader = streamHeader;
        this.tag = "{" + Long.toHexString(this.streamHeader.getStreamID()).toUpperCase() + "-" + NetUtils.desc(this.localAddress) + "}";
        InetSocketAddress[] targets = this.streamHeader.getTargets();
        this.isLastNode = targets == null || targets.length == 0;
        if (this.isLastNode) {
            if (logger.isLoggable(Level.FINER)) {
                logger.finer(this.tag + "Creating chain stream reached end, writing ack to " + NetUtils.desc(this.client));
            }
            ackConnect(new StreamProtocol.PacketACK(-1));
            return;
        }
        this.nextNode = targets[0];
        int timeout = this.streamHeader.getTimeout();
        if (logger.isLoggable(Level.FINER)) {
            logger.finer(this.tag + "Creating chain stream " + NetUtils.desc(this.localAddress) + "->" + NetUtils.desc(this.nextNode) + ", timeout " + timeout + "ms");
        }
        this.outSocket = streamConfig.getNetSocketFactory().getNetSocket(this.nextNode);
        this.outStream = this.outSocket.getOutputStream();
        this.ackInStream = this.outSocket.getInputStream();
        try {
            packetACK = connectNext();
        } catch (IOException e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, this.tag + "Connect " + this.nextNode + " failed", (Throwable) e);
            }
            packetACK = new StreamProtocol.PacketACK(-1);
            packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.nextNode, StreamProtocol.NodeStatus.FAIL));
        }
        ackConnect(packetACK);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // fun.common.net.output.PacketReceiver
    public void onReceivedPacket(StreamProtocol.StreamPacket streamPacket, StreamProtocol.PacketACK packetACK) throws IOException {
        if (this.outStream != null) {
            try {
                StreamProtocol.PACKET.write(this.outStream, streamPacket);
                this.outStream.flush();
            } catch (Exception e) {
                if (logger.isLoggable(Level.WARNING)) {
                    logger.log(Level.WARNING, this.tag + "Failed to write packet to " + NetUtils.desc(this.nextNode), (Throwable) e);
                }
                packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.nextNode, StreamProtocol.NodeStatus.FAIL));
            }
        }
        super.onReceivedPacket(streamPacket, packetACK);
    }

    @Override // fun.common.net.output.PacketReceiver
    protected void onWriteACK(StreamProtocol.PacketACK packetACK) {
        List<StreamProtocol.NodeInfo> failedNodes = packetACK.getFailedNodes();
        if (this.isLastNode) {
            return;
        }
        if (failedNodes == null || failedNodes.isEmpty()) {
            waitNextAck(packetACK);
        }
    }

    private void waitNextAck(StreamProtocol.PacketACK packetACK) {
        try {
            if (packetACK.getSeqNo() != this.nextAckedSeq + 1) {
                throw new IOException("Bad sequence from local, expected " + (this.nextAckedSeq + 1) + ", while get " + packetACK.getSeqNo());
            }
            StreamProtocol.PacketACK read = StreamProtocol.ACK.read(this.ackInStream);
            if (logger.isLoggable(Level.FINER)) {
                logger.finer(this.tag + "Received ack from next " + NetUtils.desc(this.nextNode) + ", next " + read + ", current " + packetACK);
            }
            if (packetACK.getSeqNo() != read.getSeqNo()) {
                throw new IOException("Bad sequence from next " + NetUtils.desc(this.nextNode) + ", expected " + packetACK.getSeqNo() + ", while get " + read.getSeqNo());
            }
            List<StreamProtocol.NodeInfo> failedNodes = read.getFailedNodes();
            if (failedNodes != null) {
                Iterator<StreamProtocol.NodeInfo> it = failedNodes.iterator();
                while (it.hasNext()) {
                    packetACK.addFailedNode(it.next());
                }
            }
            this.nextAckedSeq++;
        } catch (Exception e) {
            if (logger.isLoggable(Level.WARNING)) {
                logger.log(Level.WARNING, this.tag + "Failed to read ack from " + NetUtils.desc(this.nextNode), (Throwable) e);
            }
            packetACK.addFailedNode(new StreamProtocol.NodeInfo(this.nextNode, StreamProtocol.NodeStatus.FAIL));
        }
    }

    private StreamProtocol.PacketACK connectNext() throws IOException {
        StreamProtocol.CONNECT.write(this.outStream, this.streamHeader.nextChainHeader(this.streamHeader.getTimeout() - this.config.getBasicTimeout()));
        return StreamProtocol.ACK.read(this.ackInStream);
    }

    private void ackConnect(StreamProtocol.PacketACK packetACK) throws IOException {
        StreamProtocol.ACK.write(this.ackOut, packetACK);
    }
}
