package org.xtreemfs.babudb.replication;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.xtreemfs.babudb.BabuDBRequest;
import org.xtreemfs.babudb.clients.SlaveClient;
import org.xtreemfs.babudb.config.ReplicationConfig;
import org.xtreemfs.babudb.log.LogEntry;
import org.xtreemfs.babudb.lsmdb.LSN;
import org.xtreemfs.babudb.replication.RequestDispatcher;
import org.xtreemfs.babudb.replication.SlavesStates;
import org.xtreemfs.babudb.replication.operations.ChunkOperation;
import org.xtreemfs.babudb.replication.operations.HeartbeatOperation;
import org.xtreemfs.babudb.replication.operations.LoadOperation;
import org.xtreemfs.babudb.replication.operations.ReplicaOperation;
import org.xtreemfs.babudb.replication.operations.StateOperation;
import org.xtreemfs.babudb.replication.stages.StageRequest;
import org.xtreemfs.include.common.buffer.ReusableBuffer;
import org.xtreemfs.include.common.logging.Logging;
import org.xtreemfs.include.foundation.oncrpc.client.RPCResponse;
import org.xtreemfs.include.foundation.oncrpc.client.RPCResponseAvailableListener;

/* loaded from: input_file:org/xtreemfs/babudb/conversion/jars/3.jar:org/xtreemfs/babudb/replication/MasterRequestDispatcher.class */
public class MasterRequestDispatcher extends RequestDispatcher {
    private final SlavesStates states;
    private final int syncN;
    public final int chunkSize;
    public final LSN lastOnView;
    static final /* synthetic */ boolean $assertionsDisabled;

    static {
        $assertionsDisabled = !MasterRequestDispatcher.class.desiredAssertionStatus();
    }

    public MasterRequestDispatcher(RequestDispatcher requestDispatcher, InetSocketAddress inetSocketAddress) throws IOException {
        super("Master", requestDispatcher);
        if (!$assertionsDisabled && !requestDispatcher.isPaused()) {
            throw new AssertionError();
        }
        RequestDispatcher.DispatcherState state = requestDispatcher.getState();
        if (state.requestQueue != null) {
            Iterator it = state.requestQueue.iterator();
            while (it.hasNext()) {
                ((StageRequest) it.next()).free();
            }
        }
        this.chunkSize = requestDispatcher.configuration.getChunkSize();
        this.syncN = requestDispatcher.configuration.getSyncN();
        this.states = new SlavesStates(((ReplicationConfig) this.dbs.getConfig()).getSyncN(), new LinkedList(requestDispatcher.configuration.getParticipants()), this.rpcClient);
        LSN lsn = requestDispatcher.getState().latest;
        this.lastOnView = lsn.getSequenceNo() == 0 ? new LSN(0, 0L) : lsn;
    }

    @Override // org.xtreemfs.babudb.replication.RequestDispatcher
    protected void initializeOperations() {
        StateOperation stateOperation = new StateOperation(this);
        this.operations.put(Integer.valueOf(stateOperation.getProcedureId()), stateOperation);
        ReplicaOperation replicaOperation = new ReplicaOperation(this);
        this.operations.put(Integer.valueOf(replicaOperation.getProcedureId()), replicaOperation);
        LoadOperation loadOperation = new LoadOperation(this);
        this.operations.put(Integer.valueOf(loadOperation.getProcedureId()), loadOperation);
        ChunkOperation chunkOperation = new ChunkOperation();
        this.operations.put(Integer.valueOf(chunkOperation.getProcedureId()), chunkOperation);
        HeartbeatOperation heartbeatOperation = new HeartbeatOperation(this);
        this.operations.put(Integer.valueOf(heartbeatOperation.getProcedureId()), heartbeatOperation);
    }

    public List<SlaveClient> getSlavesForBroadCast() throws SlavesStates.NotEnoughAvailableSlavesException, InterruptedException {
        return this.states.getAvailableSlaves();
    }

    public void markSlaveAsDead(SlaveClient slaveClient) {
        Logging.logMessage(7, this, "Slave was marked as dead: ", slaveClient.getDefaultServerAddress().toString());
        this.states.markAsDead(slaveClient);
    }

    public void markSlaveAsFinished(SlaveClient slaveClient) {
        this.states.requestFinished(slaveClient);
    }

    public int getSyncN() {
        return this.syncN;
    }

    public void heartbeat(SocketAddress socketAddress, LSN lsn, long j) throws SlavesStates.UnknownParticipantException {
        if (!$assertionsDisabled && !(socketAddress instanceof InetSocketAddress)) {
            throw new AssertionError();
        }
        this.states.update(((InetSocketAddress) socketAddress).getAddress(), lsn, j);
    }

    @Override // org.xtreemfs.babudb.replication.RequestDispatcher
    public void subscribeListener(LatestLSNUpdateListener latestLSNUpdateListener) {
        this.states.subscribeListener(latestLSNUpdateListener);
    }

    @Override // org.xtreemfs.babudb.replication.RequestDispatcher
    protected ReplicateResponse _replicate(LogEntry logEntry, ReusableBuffer reusableBuffer) throws SlavesStates.NotEnoughAvailableSlavesException, InterruptedException {
        List<SlaveClient> slavesForBroadCast = getSlavesForBroadCast();
        final ReplicateResponse replicateResponse = new ReplicateResponse(logEntry, slavesForBroadCast.size() - getSyncN());
        if (slavesForBroadCast.size() == 0) {
            Logging.logMessage(7, this, "There are no slaves available anymore! BabuDB runs if it would be in non-replicated mode.", new Object[0]);
        } else {
            for (final SlaveClient slaveClient : slavesForBroadCast) {
                slaveClient.replicate(logEntry.getLSN(), reusableBuffer.createViewBuffer()).registerListener(new RPCResponseAvailableListener<Object>() { // from class: org.xtreemfs.babudb.replication.MasterRequestDispatcher.1
                    @Override // org.xtreemfs.include.foundation.oncrpc.client.RPCResponseAvailableListener
                    public void responseAvailable(RPCResponse<Object> rPCResponse) {
                        try {
                            try {
                                rPCResponse.get();
                                MasterRequestDispatcher.this.markSlaveAsFinished(slaveClient);
                                if (rPCResponse != null) {
                                    rPCResponse.freeBuffers();
                                }
                            } catch (Exception e) {
                                MasterRequestDispatcher.this.markSlaveAsDead(slaveClient);
                                replicateResponse.decrementPermittedFailures();
                                Logging.logMessage(6, this, "'%s' was marked as dead, because %s", slaveClient.getDefaultServerAddress().toString(), e.getMessage());
                                if (e.getMessage() == null) {
                                    Logging.logError(7, this, e);
                                }
                                if (rPCResponse != null) {
                                    rPCResponse.freeBuffers();
                                }
                            }
                        } catch (Throwable th) {
                            if (rPCResponse != null) {
                                rPCResponse.freeBuffers();
                            }
                            throw th;
                        }
                    }
                });
            }
        }
        return replicateResponse;
    }

    @Override // org.xtreemfs.babudb.replication.RequestDispatcher
    public RequestDispatcher.DispatcherState getState() {
        return new RequestDispatcher.DispatcherState(this.dbs.getLogger().getLatestLSN());
    }

    @Override // org.xtreemfs.babudb.replication.RequestDispatcher
    public void pauses(BabuDBRequest<Object> babuDBRequest) {
        super.pauses(babuDBRequest);
        this.states.clearListeners();
    }
}
