package net.sf.xenqtt.test;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:net/sf/xenqtt/test/AbstractBlockingConnection.class */
public abstract class AbstractBlockingConnection {
    private static final int MAX_QUEUED_MSGS_PER_CONNECTION = 1000;
    private static final AtomicInteger NEXT_NUM = new AtomicInteger();
    private final int connectionNumber = NEXT_NUM.incrementAndGet();
    private final WriteThread writer = new WriteThread();
    private final ReadThread reader = new ReadThread();
    private final SocketChannel channel;

    /* loaded from: input_file:net/sf/xenqtt/test/AbstractBlockingConnection$ReadThread.class */
    private final class ReadThread extends Thread {
        private final ByteBuffer header;

        public ReadThread() {
            super("ReadThread-" + AbstractBlockingConnection.this.connectionNumber);
            this.header = ByteBuffer.allocate(2);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                try {
                    read();
                    AbstractBlockingConnection.this.close();
                } catch (Exception e) {
                    if (AbstractBlockingConnection.this.channel.isOpen()) {
                        System.err.println(Thread.currentThread().getName());
                        e.printStackTrace();
                    }
                    AbstractBlockingConnection.this.close();
                }
            } catch (Throwable th) {
                AbstractBlockingConnection.this.close();
                throw th;
            }
        }

        private void read() throws IOException {
            while (true) {
                if (!this.header.hasRemaining()) {
                    this.header.flip();
                    ByteBuffer allocate = ByteBuffer.allocate((this.header.getShort(0) & 65535) + 2);
                    allocate.put(this.header);
                    while (allocate.hasRemaining()) {
                        if (AbstractBlockingConnection.this.channel.read(allocate) < 0) {
                            return;
                        }
                    }
                    allocate.flip();
                    AbstractBlockingConnection.this.messageReceived(allocate);
                    this.header.clear();
                } else if (AbstractBlockingConnection.this.channel.read(this.header) < 0) {
                    return;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:net/sf/xenqtt/test/AbstractBlockingConnection$WriteThread.class */
    public final class WriteThread extends Thread {
        private final BlockingQueue<ByteBuffer> toSend;

        public WriteThread() {
            super("WriteThread-" + AbstractBlockingConnection.this.connectionNumber);
            this.toSend = new ArrayBlockingQueue(AbstractBlockingConnection.MAX_QUEUED_MSGS_PER_CONNECTION);
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    try {
                        ByteBuffer take = this.toSend.take();
                        while (take != null && take.hasRemaining()) {
                            AbstractBlockingConnection.this.channel.write(take);
                        }
                        AbstractBlockingConnection.this.messageSent(take);
                    } catch (InterruptedException e) {
                        AbstractBlockingConnection.this.close();
                        return;
                    } catch (Exception e2) {
                        if (AbstractBlockingConnection.this.channel.isOpen()) {
                            System.err.println(Thread.currentThread().getName());
                            e2.printStackTrace();
                        }
                        AbstractBlockingConnection.this.close();
                        return;
                    }
                } catch (Throwable th) {
                    AbstractBlockingConnection.this.close();
                    throw th;
                }
            }
        }
    }

    public AbstractBlockingConnection(SocketChannel socketChannel) {
        this.channel = socketChannel;
    }

    public final void start() {
        this.writer.start();
        this.reader.start();
    }

    public final void send(ByteBuffer byteBuffer) {
        try {
            this.writer.toSend.put(byteBuffer);
        } catch (InterruptedException e) {
        }
    }

    void messageReceived(ByteBuffer byteBuffer) {
    }

    void messageSent(ByteBuffer byteBuffer) {
    }

    public final void close() {
        try {
            this.channel.close();
            this.writer.interrupt();
        } catch (IOException e) {
        }
    }
}
