/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.ipc;

import java.util.Comparator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.ipc.AdaptiveLifoCoDelCallQueue;
import org.apache.hadoop.hbase.ipc.BalancedQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.CallRunner;
import org.apache.hadoop.hbase.ipc.FastPathBalancedQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
import org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcExecutor;
import org.apache.hadoop.hbase.ipc.RpcScheduler;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;

@InterfaceAudience.LimitedPrivate(value={"Coprocesssor", "Phoenix"})
@InterfaceStability.Evolving
public class SimpleRpcScheduler
extends RpcScheduler
implements ConfigurationObserver {
    public static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
    public static final String CALL_QUEUE_READ_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.read.ratio";
    public static final String CALL_QUEUE_SCAN_SHARE_CONF_KEY = "hbase.ipc.server.callqueue.scan.ratio";
    public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor";
    public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type";
    public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
    public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
    public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
    public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = "fifo";
    public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay";
    public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay";
    public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval";
    public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold";
    public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
    public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
    public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
    private AtomicLong numGeneralCallsDropped = new AtomicLong();
    private AtomicLong numLifoModeSwitches = new AtomicLong();
    private int port;
    private final PriorityFunction priority;
    private final RpcExecutor callExecutor;
    private final RpcExecutor priorityExecutor;
    private final RpcExecutor replicationExecutor;
    private final int highPriorityLevel;
    private Abortable abortable = null;

    @Override
    public void onConfigurationChange(Configuration conf) {
        String callQueueType;
        this.callExecutor.resizeQueues(conf);
        if (this.priorityExecutor != null) {
            this.priorityExecutor.resizeQueues(conf);
        }
        if (this.replicationExecutor != null) {
            this.replicationExecutor.resizeQueues(conf);
        }
        if (SimpleRpcScheduler.isCodelQueueType(callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE))) {
            int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 100);
            int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, 100);
            double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
            for (BlockingQueue<CallRunner> queue : this.callExecutor.getQueues()) {
                if (!(queue instanceof AdaptiveLifoCoDelCallQueue)) continue;
                ((AdaptiveLifoCoDelCallQueue)queue).updateTunables(codelTargetDelay, codelInterval, codelLifoThreshold);
            }
        }
    }

    public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, Abortable server, int highPriorityLevel) {
        int maxQueueLength = conf.getInt("hbase.ipc.server.max.callqueue.length", handlerCount * 10);
        int maxPriorityQueueLength = conf.getInt("hbase.ipc.server.priority.max.callqueue.length", maxQueueLength);
        this.priority = priority;
        this.highPriorityLevel = highPriorityLevel;
        this.abortable = server;
        String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY, "fifo");
        float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.0f);
        float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.0f);
        float callQueuesHandlersFactor = conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.0f);
        int numCallQueues = Math.max(1, Math.round((float)handlerCount * callQueuesHandlersFactor));
        int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY, 100);
        int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, 100);
        double codelLifoThreshold = conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD, 0.8);
        LOG.info((Object)("Using " + callQueueType + " as user call queue, count=" + numCallQueues));
        if (numCallQueues > 1 && callqReadShare > 0.0f) {
            if (SimpleRpcScheduler.isDeadlineQueueType(callQueueType)) {
                CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
                this.callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, this.abortable, BoundedPriorityBlockingQueue.class, callPriority);
            } else if (SimpleRpcScheduler.isCodelQueueType(callQueueType)) {
                Object[] callQueueInitArgs = new Object[]{maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, this.numGeneralCallsDropped, this.numLifoModeSwitches};
                this.callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
            } else {
                this.callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, maxQueueLength, conf, this.abortable);
            }
        } else if (SimpleRpcScheduler.isDeadlineQueueType(callQueueType)) {
            CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
            this.callExecutor = new BalancedQueueRpcExecutor("BQDeadline.default", handlerCount, numCallQueues, conf, this.abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
        } else {
            this.callExecutor = SimpleRpcScheduler.isCodelQueueType(callQueueType) ? new BalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues, conf, this.abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, this.numGeneralCallsDropped, this.numLifoModeSwitches) : new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", handlerCount, numCallQueues, maxQueueLength, conf, this.abortable);
        }
        this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, 2, maxPriorityQueueLength, conf, this.abortable) : null;
        this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", replicationHandlerCount, 1, maxQueueLength, conf, this.abortable) : null;
    }

    private static boolean isDeadlineQueueType(String callQueueType) {
        return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
    }

    private static boolean isCodelQueueType(String callQueueType) {
        return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
    }

    public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, PriorityFunction priority, int highPriorityLevel) {
        this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount, priority, null, highPriorityLevel);
    }

    @Override
    public void init(RpcScheduler.Context context) {
        this.port = context.getListenerAddress().getPort();
    }

    @Override
    public void start() {
        this.callExecutor.start(this.port);
        if (this.priorityExecutor != null) {
            this.priorityExecutor.start(this.port);
        }
        if (this.replicationExecutor != null) {
            this.replicationExecutor.start(this.port);
        }
    }

    @Override
    public void stop() {
        this.callExecutor.stop();
        if (this.priorityExecutor != null) {
            this.priorityExecutor.stop();
        }
        if (this.replicationExecutor != null) {
            this.replicationExecutor.stop();
        }
    }

    @Override
    public boolean dispatch(CallRunner callTask) throws InterruptedException {
        RpcServer.Call call = callTask.getCall();
        int level = this.priority.getPriority(call.getHeader(), call.param);
        if (this.priorityExecutor != null && level > this.highPriorityLevel) {
            return this.priorityExecutor.dispatch(callTask);
        }
        if (this.replicationExecutor != null && level == 5) {
            return this.replicationExecutor.dispatch(callTask);
        }
        return this.callExecutor.dispatch(callTask);
    }

    @Override
    public int getGeneralQueueLength() {
        return this.callExecutor.getQueueLength();
    }

    @Override
    public int getPriorityQueueLength() {
        return this.priorityExecutor == null ? 0 : this.priorityExecutor.getQueueLength();
    }

    @Override
    public int getReplicationQueueLength() {
        return this.replicationExecutor == null ? 0 : this.replicationExecutor.getQueueLength();
    }

    @Override
    public int getActiveRpcHandlerCount() {
        return this.callExecutor.getActiveHandlerCount() + (this.priorityExecutor == null ? 0 : this.priorityExecutor.getActiveHandlerCount()) + (this.replicationExecutor == null ? 0 : this.replicationExecutor.getActiveHandlerCount());
    }

    @Override
    public long getNumGeneralCallsDropped() {
        return this.numGeneralCallsDropped.get();
    }

    @Override
    public long getNumLifoModeSwitches() {
        return this.numLifoModeSwitches.get();
    }

    private static class CallPriorityComparator
    implements Comparator<CallRunner> {
        private static final int DEFAULT_MAX_CALL_DELAY = 5000;
        private final PriorityFunction priority;
        private final int maxDelay;

        public CallPriorityComparator(Configuration conf, PriorityFunction priority) {
            this.priority = priority;
            this.maxDelay = conf.getInt(SimpleRpcScheduler.QUEUE_MAX_CALL_DELAY_CONF_KEY, 5000);
        }

        @Override
        public int compare(CallRunner a, CallRunner b) {
            RpcServer.Call callA = a.getCall();
            RpcServer.Call callB = b.getCall();
            long deadlineA = this.priority.getDeadline(callA.getHeader(), callA.param);
            long deadlineB = this.priority.getDeadline(callB.getHeader(), callB.param);
            deadlineA = callA.timestamp + Math.min(deadlineA, (long)this.maxDelay);
            deadlineB = callB.timestamp + Math.min(deadlineB, (long)this.maxDelay);
            return (int)(deadlineA - deadlineB);
        }
    }
}

