package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.metrics2.sink.ganglia.AbstractGangliaSink;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

@InterfaceAudience.LimitedPrivate({MRConfig.YARN_FRAMEWORK_NAME})
@InterfaceStability.Unstable
/* loaded from: input_file:lib/hadoop-yarn-server-resourcemanager-2.2.0.jar:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.class */
public class FairScheduler implements ResourceScheduler {
    private boolean initialized;
    private FairSchedulerConfiguration conf;
    private RMContext rmContext;
    private Resource minimumAllocation;
    private Resource maximumAllocation;
    private Resource incrAllocation;
    private boolean usePortForNodeName;
    FSQueueMetrics rootMetrics;
    protected long lastPreemptionUpdateTime;
    private long lastPreemptCheckTime;
    protected long preemptionInterval;
    protected long waitTimeBeforeKill;
    protected boolean preemptionEnabled;
    protected boolean sizeBasedWeight;
    protected WeightAdjuster weightAdjuster;
    protected double nodeLocalityThreshold;
    protected double rackLocalityThreshold;
    private FairSchedulerEventLog eventLog;
    protected boolean assignMultiple;
    protected int maxAssign;
    private static final Log LOG = LogFactory.getLog(FairScheduler.class);
    private static final ResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator();
    public static final Resource CONTAINER_RESERVED = Resources.createResource(-1);
    private static final List<Container> EMPTY_CONTAINER_LIST = new ArrayList();
    private static final Allocation EMPTY_ALLOCATION = new Allocation(EMPTY_CONTAINER_LIST, Resources.createResource(0));
    protected long UPDATE_INTERVAL = 500;
    private volatile boolean userAsDefaultQueue = false;

    @VisibleForTesting
    protected Map<ApplicationAttemptId, FSSchedulerApp> applications = new ConcurrentHashMap();
    private Map<NodeId, FSSchedulerNode> nodes = new ConcurrentHashMap();
    private Resource clusterCapacity = (Resource) RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
    private List<RMContainer> warnedContainers = new ArrayList();
    private Clock clock = new SystemClock();
    private QueueManager queueMgr = new QueueManager(this);

    /* loaded from: input_file:lib/hadoop-yarn-server-resourcemanager-2.2.0.jar:org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler$UpdateThread.class */
    private class UpdateThread implements Runnable {
        private UpdateThread() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Thread.sleep(FairScheduler.this.UPDATE_INTERVAL);
                    FairScheduler.this.update();
                    FairScheduler.this.preemptTasksIfNecessary();
                } catch (Exception e) {
                    FairScheduler.LOG.error("Exception in fair scheduler UpdateThread", e);
                }
            }
        }
    }

    private void validateConf(Configuration configuration) {
        int i = configuration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024);
        int i2 = configuration.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 8192);
        if (i < 0 || i > i2) {
            throw new YarnRuntimeException("Invalid resource scheduler memory allocation configuration, yarn.scheduler.minimum-allocation-mb=" + i + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + AbstractGangliaSink.EQUAL + i2 + ", min should equal greater than 0, max should be no smaller than min.");
        }
        int i3 = configuration.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
        int i4 = configuration.getInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, 4);
        if (i3 < 0 || i3 > i4) {
            throw new YarnRuntimeException("Invalid resource scheduler vcores allocation configuration, yarn.scheduler.minimum-allocation-vcores=" + i3 + ", " + YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + AbstractGangliaSink.EQUAL + i4 + ", min should equal greater than 0, max should be no smaller than min.");
        }
    }

    public FairSchedulerConfiguration getConf() {
        return this.conf;
    }

    public QueueManager getQueueManager() {
        return this.queueMgr;
    }

    private RMContainer getRMContainer(ContainerId containerId) {
        FSSchedulerApp fSSchedulerApp = this.applications.get(containerId.getApplicationAttemptId());
        if (fSSchedulerApp == null) {
            return null;
        }
        return fSSchedulerApp.getRMContainer(containerId);
    }

    protected synchronized void update() {
        this.queueMgr.reloadAllocsIfNecessary();
        updateRunnability();
        updatePreemptionVariables();
        FSParentQueue rootQueue = this.queueMgr.getRootQueue();
        rootQueue.updateDemand();
        rootQueue.setFairShare(this.clusterCapacity);
        rootQueue.recomputeShares();
    }

    private void updatePreemptionVariables() {
        long time = this.clock.getTime();
        this.lastPreemptionUpdateTime = time;
        for (FSLeafQueue fSLeafQueue : this.queueMgr.getLeafQueues()) {
            if (!isStarvedForMinShare(fSLeafQueue)) {
                fSLeafQueue.setLastTimeAtMinShare(time);
            }
            if (!isStarvedForFairShare(fSLeafQueue)) {
                fSLeafQueue.setLastTimeAtHalfFairShare(time);
            }
        }
    }

    boolean isStarvedForMinShare(FSLeafQueue fSLeafQueue) {
        return Resources.lessThan(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getResourceUsage(), Resources.min(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getMinShare(), fSLeafQueue.getDemand()));
    }

    boolean isStarvedForFairShare(FSLeafQueue fSLeafQueue) {
        return Resources.lessThan(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getResourceUsage(), Resources.min(RESOURCE_CALCULATOR, this.clusterCapacity, Resources.multiply(fSLeafQueue.getFairShare(), 0.5d), fSLeafQueue.getDemand()));
    }

    protected synchronized void preemptTasksIfNecessary() {
        if (this.preemptionEnabled) {
            long time = this.clock.getTime();
            if (time - this.lastPreemptCheckTime < this.preemptionInterval) {
                return;
            }
            this.lastPreemptCheckTime = time;
            Resource none = Resources.none();
            Iterator<FSLeafQueue> it = this.queueMgr.getLeafQueues().iterator();
            while (it.hasNext()) {
                none = Resources.add(none, resToPreempt(it.next(), time));
            }
            if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, none, Resources.none())) {
                preemptResources(this.queueMgr.getLeafQueues(), none);
            }
        }
    }

    protected void preemptResources(Collection<FSLeafQueue> collection, Resource resource) {
        if (collection.isEmpty() || Resources.equals(resource, Resources.none())) {
            return;
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (FSLeafQueue fSLeafQueue : collection) {
            if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getResourceUsage(), fSLeafQueue.getFairShare())) {
                for (AppSchedulable appSchedulable : fSLeafQueue.getAppSchedulables()) {
                    for (RMContainer rMContainer : appSchedulable.getApp().getLiveContainers()) {
                        arrayList.add(rMContainer);
                        hashMap.put(rMContainer, appSchedulable.getApp());
                        hashMap2.put(rMContainer, fSLeafQueue);
                    }
                }
            }
        }
        Collections.sort(arrayList, new Comparator<RMContainer>() { // from class: org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.1
            @Override // java.util.Comparator
            public int compare(RMContainer rMContainer2, RMContainer rMContainer3) {
                int compareTo = rMContainer2.getContainer().getPriority().compareTo(rMContainer3.getContainer().getPriority());
                return compareTo == 0 ? rMContainer3.getContainerId().compareTo(rMContainer2.getContainerId()) : compareTo;
            }
        });
        Iterator<RMContainer> it = this.warnedContainers.iterator();
        HashSet hashSet = new HashSet();
        while (it.hasNext()) {
            RMContainer next = it.next();
            if (next.getState() == RMContainerState.RUNNING && Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, resource, Resources.none())) {
                warnOrKillContainer(next, (FSSchedulerApp) hashMap.get(next), (FSLeafQueue) hashMap2.get(next));
                hashSet.add(next);
                Resources.subtractFrom(resource, next.getContainer().getResource());
            } else {
                it.remove();
            }
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext() && Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, resource, Resources.none())) {
            RMContainer rMContainer2 = (RMContainer) it2.next();
            FSLeafQueue fSLeafQueue2 = (FSLeafQueue) hashMap2.get(rMContainer2);
            if (!hashSet.contains(rMContainer2) && Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue2.getResourceUsage(), fSLeafQueue2.getFairShare())) {
                warnOrKillContainer(rMContainer2, (FSSchedulerApp) hashMap.get(rMContainer2), fSLeafQueue2);
                this.warnedContainers.add(rMContainer2);
                Resources.subtractFrom(resource, rMContainer2.getContainer().getResource());
            }
        }
    }

    private void warnOrKillContainer(RMContainer rMContainer, FSSchedulerApp fSSchedulerApp, FSLeafQueue fSLeafQueue) {
        LOG.info("Preempting container (prio=" + rMContainer.getContainer().getPriority() + "res=" + rMContainer.getContainer().getResource() + ") from queue " + fSLeafQueue.getName());
        Long containerPreemptionTime = fSSchedulerApp.getContainerPreemptionTime(rMContainer);
        if (containerPreemptionTime == null) {
            fSSchedulerApp.addPreemption(rMContainer, this.clock.getTime());
        } else if (containerPreemptionTime.longValue() + this.waitTimeBeforeKill < this.clock.getTime()) {
            completedContainer(rMContainer, SchedulerUtils.createPreemptedContainerStatus(rMContainer.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
            LOG.info("Killing container" + rMContainer + " (after waiting for premption for " + (this.clock.getTime() - containerPreemptionTime.longValue()) + "ms)");
        }
    }

    protected Resource resToPreempt(FSLeafQueue fSLeafQueue, long j) {
        long minSharePreemptionTimeout = this.queueMgr.getMinSharePreemptionTimeout(fSLeafQueue.getName());
        long fairSharePreemptionTimeout = this.queueMgr.getFairSharePreemptionTimeout();
        Resource none = Resources.none();
        Resource none2 = Resources.none();
        if (j - fSLeafQueue.getLastTimeAtMinShare() > minSharePreemptionTimeout) {
            none = Resources.max(RESOURCE_CALCULATOR, this.clusterCapacity, Resources.none(), Resources.subtract(Resources.min(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getMinShare(), fSLeafQueue.getDemand()), fSLeafQueue.getResourceUsage()));
        }
        if (j - fSLeafQueue.getLastTimeAtHalfFairShare() > fairSharePreemptionTimeout) {
            none2 = Resources.max(RESOURCE_CALCULATOR, this.clusterCapacity, Resources.none(), Resources.subtract(Resources.min(RESOURCE_CALCULATOR, this.clusterCapacity, fSLeafQueue.getFairShare(), fSLeafQueue.getDemand()), fSLeafQueue.getResourceUsage()));
        }
        Resource max = Resources.max(RESOURCE_CALCULATOR, this.clusterCapacity, none, none2);
        if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, max, Resources.none())) {
            LOG.info("Should preempt " + max + " res for queue " + fSLeafQueue.getName() + ": resDueToMinShare = " + none + ", resDueToFairShare = " + none2);
        }
        return max;
    }

    private void updateRunnability() {
        ArrayList<AppSchedulable> arrayList = new ArrayList();
        Iterator<FSLeafQueue> it = this.queueMgr.getLeafQueues().iterator();
        while (it.hasNext()) {
            for (AppSchedulable appSchedulable : it.next().getAppSchedulables()) {
                appSchedulable.setRunnable(false);
                arrayList.add(appSchedulable);
            }
        }
        Collections.sort(arrayList, new FifoAppComparator());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (AppSchedulable appSchedulable2 : arrayList) {
            String user = appSchedulable2.getApp().getUser();
            String queueName = appSchedulable2.getApp().getQueueName();
            int intValue = hashMap.containsKey(user) ? ((Integer) hashMap.get(user)).intValue() : 0;
            int intValue2 = hashMap2.containsKey(queueName) ? ((Integer) hashMap2.get(queueName)).intValue() : 0;
            if (intValue < this.queueMgr.getUserMaxApps(user) && intValue2 < this.queueMgr.getQueueMaxApps(queueName)) {
                hashMap.put(user, Integer.valueOf(intValue + 1));
                hashMap2.put(queueName, Integer.valueOf(intValue2 + 1));
                appSchedulable2.setRunnable(true);
            }
        }
    }

    public RMContainerTokenSecretManager getContainerTokenSecretManager() {
        return this.rmContext.getContainerTokenSecretManager();
    }

    public synchronized ResourceWeights getAppWeight(AppSchedulable appSchedulable) {
        if (!appSchedulable.getRunnable()) {
            return ResourceWeights.NEUTRAL;
        }
        double d = 1.0d;
        if (this.sizeBasedWeight) {
            d = Math.log1p(appSchedulable.getDemand().getMemory()) / Math.log(2.0d);
        }
        double priority = d * appSchedulable.getPriority().getPriority();
        if (this.weightAdjuster != null) {
            priority = this.weightAdjuster.adjustWeight(appSchedulable, priority);
        }
        return new ResourceWeights((float) priority);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext
    public Resource getMinimumResourceCapability() {
        return this.minimumAllocation;
    }

    public Resource getIncrementResourceCapability() {
        return this.incrAllocation;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext
    public Resource getMaximumResourceCapability() {
        return this.maximumAllocation;
    }

    public double getNodeLocalityThreshold() {
        return this.nodeLocalityThreshold;
    }

    public double getRackLocalityThreshold() {
        return this.rackLocalityThreshold;
    }

    public Resource getClusterCapacity() {
        return this.clusterCapacity;
    }

    public synchronized Clock getClock() {
        return this.clock;
    }

    protected synchronized void setClock(Clock clock) {
        this.clock = clock;
    }

    public FairSchedulerEventLog getEventLog() {
        return this.eventLog;
    }

    protected synchronized void addApplication(ApplicationAttemptId applicationAttemptId, String str, String str2) {
        FSLeafQueue assignToQueue = assignToQueue(this.rmContext.getRMApps().get(applicationAttemptId.getApplicationId()), str, str2);
        FSSchedulerApp fSSchedulerApp = new FSSchedulerApp(applicationAttemptId, str2, assignToQueue, new ActiveUsersManager(getRootQueueMetrics()), this.rmContext);
        UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(str2);
        if (!assignToQueue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, createRemoteUser) && !assignToQueue.hasAccess(QueueACL.ADMINISTER_QUEUE, createRemoteUser)) {
            String str3 = "User " + createRemoteUser.getUserName() + " cannot submit applications to queue " + assignToQueue.getName();
            LOG.info(str3);
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptRejectedEvent(applicationAttemptId, str3));
        } else {
            assignToQueue.addApp(fSSchedulerApp);
            assignToQueue.getMetrics().submitApp(str2, applicationAttemptId.getAttemptId());
            this.applications.put(applicationAttemptId, fSSchedulerApp);
            LOG.info("Application Submission: " + applicationAttemptId + ", user: " + str2 + ", currently active: " + this.applications.size());
            this.rmContext.getDispatcher().getEventHandler().handle(new RMAppAttemptEvent(applicationAttemptId, RMAppAttemptEventType.APP_ACCEPTED));
        }
    }

    @VisibleForTesting
    FSLeafQueue assignToQueue(RMApp rMApp, String str, String str2) {
        if (str.equals("default") && this.userAsDefaultQueue) {
            str = str2;
        }
        FSLeafQueue leafQueue = this.queueMgr.getLeafQueue(str, this.conf.getAllowUndeclaredPools());
        if (leafQueue == null) {
            leafQueue = this.queueMgr.getLeafQueue("default", false);
        }
        if (rMApp != null) {
            rMApp.setQueue(leafQueue.getName());
        } else {
            LOG.warn("Couldn't find RM app to set queue name on");
        }
        return leafQueue;
    }

    private synchronized void removeApplication(ApplicationAttemptId applicationAttemptId, RMAppAttemptState rMAppAttemptState) {
        LOG.info("Application " + applicationAttemptId + " is done. finalState=" + rMAppAttemptState);
        FSSchedulerApp fSSchedulerApp = this.applications.get(applicationAttemptId);
        if (fSSchedulerApp == null) {
            LOG.info("Unknown application " + applicationAttemptId + " has completed!");
            return;
        }
        for (RMContainer rMContainer : fSSchedulerApp.getLiveContainers()) {
            completedContainer(rMContainer, SchedulerUtils.createAbnormalContainerStatus(rMContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), RMContainerEventType.KILL);
        }
        for (RMContainer rMContainer2 : fSSchedulerApp.getReservedContainers()) {
            completedContainer(rMContainer2, SchedulerUtils.createAbnormalContainerStatus(rMContainer2.getContainerId(), "Application Complete"), RMContainerEventType.KILL);
        }
        fSSchedulerApp.stop(rMAppAttemptState);
        this.queueMgr.getLeafQueue(fSSchedulerApp.getQueue().getQueueName(), false).removeApp(fSSchedulerApp);
        this.applications.remove(applicationAttemptId);
    }

    private synchronized void completedContainer(RMContainer rMContainer, ContainerStatus containerStatus, RMContainerEventType rMContainerEventType) {
        if (rMContainer == null) {
            LOG.info("Null container completed...");
            return;
        }
        Container container = rMContainer.getContainer();
        ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
        FSSchedulerApp fSSchedulerApp = this.applications.get(applicationAttemptId);
        if (fSSchedulerApp == null) {
            LOG.info("Container " + container + " of unknown application " + applicationAttemptId + " completed with event " + rMContainerEventType);
            return;
        }
        FSSchedulerNode fSSchedulerNode = this.nodes.get(container.getNodeId());
        if (rMContainer.getState() == RMContainerState.RESERVED) {
            fSSchedulerApp.unreserve(fSSchedulerNode, rMContainer.getReservedPriority());
            fSSchedulerNode.unreserveResource(fSSchedulerApp);
        } else {
            fSSchedulerApp.containerCompleted(rMContainer, containerStatus, rMContainerEventType);
            fSSchedulerNode.releaseContainer(container);
            updateRootQueueMetrics();
        }
        LOG.info("Application " + applicationAttemptId + " released container " + container.getId() + " on node: " + fSSchedulerNode + " with event: " + rMContainerEventType);
    }

    private synchronized void addNode(RMNode rMNode) {
        this.nodes.put(rMNode.getNodeID(), new FSSchedulerNode(rMNode, this.usePortForNodeName));
        Resources.addTo(this.clusterCapacity, rMNode.getTotalCapability());
        updateRootQueueMetrics();
        LOG.info("Added node " + rMNode.getNodeAddress() + " cluster capacity: " + this.clusterCapacity);
    }

    private synchronized void removeNode(RMNode rMNode) {
        FSSchedulerNode fSSchedulerNode = this.nodes.get(rMNode.getNodeID());
        Resources.subtractFrom(this.clusterCapacity, rMNode.getTotalCapability());
        updateRootQueueMetrics();
        for (RMContainer rMContainer : fSSchedulerNode.getRunningContainers()) {
            completedContainer(rMContainer, SchedulerUtils.createAbnormalContainerStatus(rMContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
        }
        RMContainer reservedContainer = fSSchedulerNode.getReservedContainer();
        if (reservedContainer != null) {
            completedContainer(reservedContainer, SchedulerUtils.createAbnormalContainerStatus(reservedContainer.getContainerId(), SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
        }
        this.nodes.remove(rMNode.getNodeID());
        LOG.info("Removed node " + rMNode.getNodeAddress() + " cluster capacity: " + this.clusterCapacity);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public Allocation allocate(ApplicationAttemptId applicationAttemptId, List<ResourceRequest> list, List<ContainerId> list2, List<String> list3, List<String> list4) {
        Allocation allocation;
        FSSchedulerApp fSSchedulerApp = this.applications.get(applicationAttemptId);
        if (fSSchedulerApp == null) {
            LOG.info("Calling allocate on removed or non existant application " + applicationAttemptId);
            return EMPTY_ALLOCATION;
        }
        SchedulerUtils.normalizeRequests(list, new DominantResourceCalculator(), this.clusterCapacity, this.minimumAllocation, this.maximumAllocation, this.incrAllocation);
        for (ContainerId containerId : list2) {
            RMContainer rMContainer = getRMContainer(containerId);
            if (rMContainer == null) {
                RMAuditLogger.logFailure(fSSchedulerApp.getUser(), RMAuditLogger.AuditConstants.RELEASE_CONTAINER, "Unauthorized access or invalid container", "FairScheduler", "Trying to release container not owned by app or with invalid id", fSSchedulerApp.getApplicationId(), containerId);
            }
            completedContainer(rMContainer, SchedulerUtils.createAbnormalContainerStatus(containerId, SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
        }
        synchronized (fSSchedulerApp) {
            if (!list.isEmpty()) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("allocate: pre-update applicationAttemptId=" + applicationAttemptId + " application=" + fSSchedulerApp.getApplicationId());
                }
                fSSchedulerApp.showRequests();
                fSSchedulerApp.updateResourceRequests(list);
                LOG.debug("allocate: post-update");
                fSSchedulerApp.showRequests();
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("allocate: applicationAttemptId=" + applicationAttemptId + " #ask=" + list.size());
                LOG.debug("Preempting " + fSSchedulerApp.getPreemptionContainers().size() + " container(s)");
            }
            HashSet hashSet = new HashSet();
            Iterator<RMContainer> it = fSSchedulerApp.getPreemptionContainers().iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().getContainerId());
            }
            allocation = new Allocation(fSSchedulerApp.pullNewlyAllocatedContainers(), fSSchedulerApp.getHeadroom(), hashSet);
        }
        return allocation;
    }

    private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode fSSchedulerNode) {
        ApplicationAttemptId applicationAttemptId = containerId.getApplicationAttemptId();
        FSSchedulerApp fSSchedulerApp = this.applications.get(applicationAttemptId);
        if (fSSchedulerApp == null) {
            LOG.info("Unknown application: " + applicationAttemptId + " launched container " + containerId + " on node: " + fSSchedulerNode);
        } else {
            fSSchedulerApp.containerLaunchedOnNode(containerId, fSSchedulerNode.getNodeID());
        }
    }

    private synchronized void nodeUpdate(RMNode rMNode) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("nodeUpdate: " + rMNode + " cluster capacity: " + this.clusterCapacity);
        }
        this.eventLog.log("HEARTBEAT", rMNode.getHostName());
        FSSchedulerNode fSSchedulerNode = this.nodes.get(rMNode.getNodeID());
        List<UpdatedContainerInfo> pullContainerUpdates = rMNode.pullContainerUpdates();
        ArrayList arrayList = new ArrayList();
        ArrayList<ContainerStatus> arrayList2 = new ArrayList();
        for (UpdatedContainerInfo updatedContainerInfo : pullContainerUpdates) {
            arrayList.addAll(updatedContainerInfo.getNewlyLaunchedContainers());
            arrayList2.addAll(updatedContainerInfo.getCompletedContainers());
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            containerLaunchedOnNode(((ContainerStatus) it.next()).getContainerId(), fSSchedulerNode);
        }
        for (ContainerStatus containerStatus : arrayList2) {
            ContainerId containerId = containerStatus.getContainerId();
            LOG.debug("Container FINISHED: " + containerId);
            completedContainer(getRMContainer(containerId), containerStatus, RMContainerEventType.FINISHED);
        }
        AppSchedulable reservedAppSchedulable = fSSchedulerNode.getReservedAppSchedulable();
        if (reservedAppSchedulable != null) {
            Priority reservedPriority = fSSchedulerNode.getReservedContainer().getReservedPriority();
            if (reservedAppSchedulable == null || reservedAppSchedulable.hasContainerForNode(reservedPriority, fSSchedulerNode)) {
                LOG.info("Trying to fulfill reservation for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() + " on node: " + rMNode);
                fSSchedulerNode.getReservedAppSchedulable().assignReservedContainer(fSSchedulerNode);
            } else {
                LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApp().getApplicationAttemptId() + " on node " + rMNode);
                reservedAppSchedulable.unreserve(reservedPriority, fSSchedulerNode);
                reservedAppSchedulable = null;
            }
        }
        if (reservedAppSchedulable == null) {
            int i = 0;
            while (fSSchedulerNode.getReservedContainer() == null) {
                boolean z = false;
                if (Resources.greaterThan(RESOURCE_CALCULATOR, this.clusterCapacity, this.queueMgr.getRootQueue().assignContainer(fSSchedulerNode), Resources.none())) {
                    i++;
                    z = true;
                }
                if (!z || !this.assignMultiple || (i >= this.maxAssign && this.maxAssign > 0)) {
                    break;
                }
            }
        }
        updateRootQueueMetrics();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public SchedulerNodeReport getNodeReport(NodeId nodeId) {
        FSSchedulerNode fSSchedulerNode = this.nodes.get(nodeId);
        if (fSSchedulerNode == null) {
            return null;
        }
        return new SchedulerNodeReport(fSSchedulerNode);
    }

    public FSSchedulerApp getSchedulerApp(ApplicationAttemptId applicationAttemptId) {
        return this.applications.get(applicationAttemptId);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public SchedulerAppReport getSchedulerAppInfo(ApplicationAttemptId applicationAttemptId) {
        if (this.applications.containsKey(applicationAttemptId)) {
            return new SchedulerAppReport(this.applications.get(applicationAttemptId));
        }
        LOG.error("Request for appInfo of unknown attempt" + applicationAttemptId);
        return null;
    }

    private void updateRootQueueMetrics() {
        this.rootMetrics.setAvailableResourcesToQueue(Resources.subtract(this.clusterCapacity, this.rootMetrics.getAllocatedResources()));
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public QueueMetrics getRootQueueMetrics() {
        return this.rootMetrics;
    }

    @Override // org.apache.hadoop.yarn.event.EventHandler
    public void handle(SchedulerEvent schedulerEvent) {
        switch (schedulerEvent.getType()) {
            case NODE_ADDED:
                if (!(schedulerEvent instanceof NodeAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                addNode(((NodeAddedSchedulerEvent) schedulerEvent).getAddedRMNode());
                return;
            case NODE_REMOVED:
                if (!(schedulerEvent instanceof NodeRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                removeNode(((NodeRemovedSchedulerEvent) schedulerEvent).getRemovedRMNode());
                return;
            case NODE_UPDATE:
                if (!(schedulerEvent instanceof NodeUpdateSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                nodeUpdate(((NodeUpdateSchedulerEvent) schedulerEvent).getRMNode());
                return;
            case APP_ADDED:
                if (!(schedulerEvent instanceof AppAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppAddedSchedulerEvent appAddedSchedulerEvent = (AppAddedSchedulerEvent) schedulerEvent;
                addApplication(appAddedSchedulerEvent.getApplicationAttemptId(), appAddedSchedulerEvent.getQueue(), appAddedSchedulerEvent.getUser());
                return;
            case APP_REMOVED:
                if (!(schedulerEvent instanceof AppRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                AppRemovedSchedulerEvent appRemovedSchedulerEvent = (AppRemovedSchedulerEvent) schedulerEvent;
                removeApplication(appRemovedSchedulerEvent.getApplicationAttemptID(), appRemovedSchedulerEvent.getFinalAttemptState());
                return;
            case CONTAINER_EXPIRED:
                if (!(schedulerEvent instanceof ContainerExpiredSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                ContainerId containerId = ((ContainerExpiredSchedulerEvent) schedulerEvent).getContainerId();
                completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus(containerId, SchedulerUtils.EXPIRED_CONTAINER), RMContainerEventType.EXPIRE);
                return;
            default:
                LOG.error("Unknown event arrived at FairScheduler: " + schedulerEvent.toString());
                return;
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable
    public void recover(RMStateStore.RMState rMState) throws Exception {
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
    public synchronized void reinitialize(Configuration configuration, RMContext rMContext) throws IOException {
        this.conf = new FairSchedulerConfiguration(configuration);
        validateConf(this.conf);
        this.minimumAllocation = this.conf.getMinimumAllocation();
        this.maximumAllocation = this.conf.getMaximumAllocation();
        this.incrAllocation = this.conf.getIncrementAllocation();
        this.userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
        this.nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
        this.rackLocalityThreshold = this.conf.getLocalityThresholdRack();
        this.preemptionEnabled = this.conf.getPreemptionEnabled();
        this.assignMultiple = this.conf.getAssignMultiple();
        this.maxAssign = this.conf.getMaxAssign();
        this.sizeBasedWeight = this.conf.getSizeBasedWeight();
        this.preemptionInterval = this.conf.getPreemptionInterval();
        this.waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
        this.usePortForNodeName = this.conf.getUsePortForNodeName();
        if (this.initialized) {
            try {
                this.queueMgr.reloadAllocs();
                return;
            } catch (Exception e) {
                throw new IOException("Failed to initialize FairScheduler", e);
            }
        }
        this.rootMetrics = FSQueueMetrics.forQueue("root", (Queue) null, true, configuration);
        this.rmContext = rMContext;
        this.eventLog = new FairSchedulerEventLog();
        this.eventLog.init(this.conf);
        this.initialized = true;
        try {
            this.queueMgr.initialize();
            Thread thread = new Thread(new UpdateThread());
            thread.setName("FairSchedulerUpdateThread");
            thread.setDaemon(true);
            thread.start();
        } catch (Exception e2) {
            throw new IOException("Failed to start FairScheduler", e2);
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public QueueInfo getQueueInfo(String str, boolean z, boolean z2) throws IOException {
        if (this.queueMgr.exists(str)) {
            return this.queueMgr.getQueue(str).getQueueInfo(z, z2);
        }
        throw new IOException("queue " + str + " does not exist");
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public List<QueueUserACLInfo> getQueueUserAclInfo() {
        try {
            return this.queueMgr.getRootQueue().getQueueUserAclInfo(UserGroupInformation.getCurrentUser());
        } catch (IOException e) {
            return new ArrayList();
        }
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler, org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext
    public int getNumClusterNodes() {
        return this.nodes.size();
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler
    public synchronized boolean checkAccess(UserGroupInformation userGroupInformation, QueueACL queueACL, String str) {
        FSQueue queue = getQueueManager().getQueue(str);
        if (queue != null) {
            return queue.hasAccess(queueACL, userGroupInformation);
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("ACL not found for queue access-type " + queueACL + " for queue " + str);
        return false;
    }
}
