首页 > 代码库 > Flink 1.1 – ResourceManager

Flink 1.1 – ResourceManager

Flink resource manager的作用如图,

技术分享

 

FlinkResourceManager

/** * * <h1>Worker allocation steps</h1> * * <ol> *     <li>The resource manager decides to request more workers. This can happen in order *         to fill the initial pool, or as a result of the JobManager requesting more workers.</li> * *     <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests *         for more containers. After that, the {@link #getNumWorkerRequestsPending()} *         should reflect the pending requests.</li> * *     <li>The concrete framework may acquire containers and then trigger to start TaskManagers *         in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li> * *     <li>At some point, the TaskManager processes will have started and send a registration *         message to the JobManager. The JobManager will perform *         a lookup with the ResourceManager to check if it really started this TaskManager. *         The method {@link #workerStarted(ResourceID)} will be called *         to inform about a registered worker.</li> * </ol> * */public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor {    /** The service to find the right leader JobManager (to support high availability) */    private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知    /** Map which contains the workers from which we know that they have been successfully started     * in a container. This notification is sent by the JM when a TM tries to register at it. */    private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知    /** The JobManager that the framework master manages resources for */    private ActorRef jobManager;    /** Our JobManager‘s leader session */    private UUID leaderSessionID;    /** The size of the worker pool that the resource master strives to maintain */    private int designatedPoolSize; //resource pool大小

上面注释里面,把申请resource的过程写的蛮清楚的

ResourceManager作为actor, 主要是处理message,

    @Override    protected void handleMessage(Object message) {        try {            // --- messages about worker allocation and pool sizes            if (message instanceof CheckAndAllocateContainers) {                checkWorkersPool();            }            else if (message instanceof SetWorkerPoolSize) {                SetWorkerPoolSize msg = (SetWorkerPoolSize) message;                adjustDesignatedNumberOfWorkers(msg.numberOfWorkers());            }            else if (message instanceof RemoveResource) {                RemoveResource msg = (RemoveResource) message;                removeRegisteredResource(msg.resourceId());            }            // --- lookup of registered resources            else if (message instanceof NotifyResourceStarted) {                NotifyResourceStarted msg = (NotifyResourceStarted) message;                handleResourceStarted(sender(), msg.getResourceID());            }            // --- messages about JobManager leader status and registration            else if (message instanceof NewLeaderAvailable) {                NewLeaderAvailable msg = (NewLeaderAvailable) message;                newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());            }            else if (message instanceof TriggerRegistrationAtJobManager) {                TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message;                triggerConnectingToJobManager(msg.jobManagerAddress());            }            else if (message instanceof RegisterResourceManagerSuccessful) {                RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message;                jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers());            }

 

其中关键的是,

checkWorkersPool

/** * This method causes the resource framework master to <b>synchronously</b>re-examine * the set of available and pending workers containers, and allocate containers * if needed. * * This method does not automatically release workers, because it is not visible to * this resource master which workers can be released. Instead, the JobManager must * explicitly release individual workers. */private void checkWorkersPool() {    int numWorkersPending = getNumWorkerRequestsPending();    int numWorkersPendingRegistration = getNumWorkersPendingRegistration();    // see how many workers we want, and whether we have enough    int allAvailableAndPending = startedWorkers.size() +        numWorkersPending + numWorkersPendingRegistration;    int missing = designatedPoolSize - allAvailableAndPending;    if (missing > 0) {        requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker    }}

job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted

/** * Tells the ResourceManager that a TaskManager had been started in a container with the given * resource id. * * @param jobManager The sender (JobManager) of the message * @param resourceID The resource id of the started TaskManager */private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) {    if (resourceID != null) {        // check if resourceID is already registered (TaskManager may send duplicate register messages)        WorkerType oldWorker = startedWorkers.get(resourceID);        if (oldWorker != null) { //看看该worker是否已经存在            LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID);        } else {            WorkerType newWorker = workerStarted(resourceID); //取得worker            if (newWorker != null) {                startedWorkers.put(resourceID, newWorker); //注册新的worker                LOG.info("TaskManager {} has started.", resourceID);            } else {                LOG.info("TaskManager {} has not been started by this resource manager.", resourceID);            }        }    }    // Acknowledge the resource registration    jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册}

 

Job资源分配的过程,

在submitJob中,会生成ExecutionGraph
最终调用到,

executionGraph.scheduleForExecution(scheduler)

接着,ExecutionGraph

public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.
for (ExecutionJobVertex ejv : this.tasks.values()) {
if (ejv.getJobVertex().isInputVertex()) {
ejv.scheduleAll(slotProvider, allowQueuedScheduling);
}
}

然后,ExecutionJobVertex

public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {

ExecutionVertex[] vertices = this.taskVertices;

// kick off the tasks
for (ExecutionVertex ev : vertices) {
ev.scheduleForExecution(slotProvider, queued);
}
}
再,ExecutionVertex
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {
return this.currentExecution.scheduleForExecution(slotProvider, queued);
}

最终,Execution

public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException {    final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup();    final CoLocationConstraint locationConstraint = vertex.getLocationConstraint();    if (transitionState(CREATED, SCHEDULED)) {        ScheduledUnit toSchedule = locationConstraint == null ?            new ScheduledUnit(this, sharingGroup) :            new ScheduledUnit(this, sharingGroup, locationConstraint);        // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned        //     in all cases where the deployment failed. we use many try {} finally {} clauses to assure that        final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源        // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so        // that we directly deploy the tasks if the slot allocation future is completed. This is        // necessary for immediate deployment.        final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() {            @Override            public Void apply(SimpleSlot simpleSlot, Throwable throwable) {                if (simpleSlot != null) {                    try {                        deployToSlot(simpleSlot); //如果申请到,去部署                    } catch (Throwable t) {                        try {                            simpleSlot.releaseSlot();                        } finally {                            markFailed(t);                        }                    }                }                else {                    markFailed(throwable);                }                return null;            }        });                return true;    }

 

调用到,slotProvider.allocateSlot, slotProvider即Scheduler

@Overridepublic Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued)        throws NoResourceAvailableException {    final Object ret = scheduleTask(task, allowQueued);    if (ret instanceof SimpleSlot) {        return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束    }    else if (ret instanceof Future) {        return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future    }    else {        throw new RuntimeException();    }}

 

scheduleTask

/**     * Returns either a {@link SimpleSlot}, or a {@link Future}.     */    private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {        final ExecutionVertex vertex = task.getTaskToExecute().getVertex();                final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();        final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&                                    preferredLocations != null && preferredLocations.iterator().hasNext();            synchronized (globalLock) { //全局锁                        SlotSharingGroup sharingUnit = task.getSlotSharingGroup();                        if (sharingUnit != null) { //如果是共享slot                // 1)  === If the task has a slot sharing group, schedule with shared slots ===                                final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment();                final CoLocationConstraint constraint = task.getLocationConstraint();                                // get a slot from the group, if the group has one for us (and can fulfill the constraint)                final SimpleSlot slotFromGroup;                if (constraint == null) {                    slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的                }                else {                    slotFromGroup = assignment.getSlotForTask(vertex, constraint);                }                SimpleSlot newSlot = null;                SimpleSlot toUse = null;                // the following needs to make sure any allocated slot is released in case of an error                try {                                        // check whether the slot from the group is already what we want.                    // any slot that is local, or where the assignment was unconstrained is good!                    if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的                        updateLocalityCounters(slotFromGroup, vertex);                        return slotFromGroup; //已经找到合适的slot,返回                    }                                        // the group did not have a local slot for us. see if we can one (or a better one)                    newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot                                        if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) {                        // if there is no slot from the group, or the new slot is local,                        // then we use the new slot                        if (slotFromGroup != null) {                            slotFromGroup.releaseSlot();                        }                        toUse = newSlot;                    }                    else {                        // both are available and usable. neither is local. in that case, we may                        // as well use the slot from the sharing group, to minimize the number of                        // instances that the job occupies                        newSlot.releaseSlot();                        toUse = slotFromGroup;                    }                    // if this is the first slot for the co-location constraint, we lock                    // the location, because we are going to use that slot                    if (constraint != null && !constraint.isAssigned()) {                        constraint.lockLocation();                    }                                        updateLocalityCounters(toUse, vertex);                }                                return toUse; //返回申请的slot            }            else { //如果不是共享slot,比较简单                                // 2) === schedule without hints and sharing ===                                SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot                if (slot != null) {                    updateLocalityCounters(slot, vertex);                    return slot; //申请到了就返回slot                }                else {                    // no resource available now, so queue the request                    if (queueIfNoResource) { //如果可以queue                        CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();                        this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请                        return future;                    }                }            }        }    }

我们直接看非共享slot的case,

会调用到, getFreeSlotForTask

/**     * Gets a suitable instance to schedule the vertex execution to.     * <p>     * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller.     *      * @param vertex The task to run.      * @return The instance to run the vertex on, it {@code null}, if no instance is available.     */    protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,                                            Iterable<TaskManagerLocation> requestedLocations,                                            boolean localOnly) {        // we need potentially to loop multiple times, because there may be false positives        // in the set-with-available-instances        while (true) {            Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance            if (instanceLocalityPair == null){                return null; //没有合适的instance,分配失败            }            Instance instanceToUse = instanceLocalityPair.getLeft();            Locality locality = instanceLocalityPair.getRight();            try {                SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot                                // if the instance has further available slots, re-add it to the set of available resources.                if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配                    this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse);                }                                if (slot != null) {                    slot.setLocality(locality);                    return slot; //成功就返回slot                }            }            catch (InstanceDiedException e) {                // the instance died it has not yet been propagated to this scheduler                // remove the instance from the set of available instances                removeInstance(instanceToUse);            }                        // if we failed to get a slot, fall through the loop        }    }

 

findInstance

/**     * Tries to find a requested instance. If no such instance is available it will return a non-     * local instance. If no such instance exists (all slots occupied), then return null.     *      * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p>     *     * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that     *                           no locality preference exists.        * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.       */    private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {                // drain the queue of newly available instances        while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources            Instance queuedInstance = this.newlyAvailableInstances.poll();            if (queuedInstance != null) {                this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance);            }        }                // if nothing is available at all, return null        if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败            return null;        }        Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();        if (locations != null && locations.hasNext()) { //按照locality preference依次找instance            // we have a locality preference            while (locations.hasNext()) {                TaskManagerLocation location = locations.next();                if (location != null) {                    Instance instance = instancesWithAvailableResources.remove(location.getResourceID());                    if (instance != null) {                        return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);                    }                }            }                        // no local instance available            if (localOnly) {                return null;            }            else {                // take the first instance from the instances with resources                Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();                Instance instanceToUse = instances.next();                instances.remove();                return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);            }        }        else {            // no location preference, so use some instance            Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();            Instance instanceToUse = instances.next();            instances.remove();            return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);        }    }

 

那么继续,newlyAvailableInstances,哪儿来的?

@Overridepublic void newInstanceAvailable(Instance instance) {        // synchronize globally for instance changes    synchronized (this.globalLock) {                try {            // make sure we get notifications about slots becoming available            instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener                        // store the instance in the by-host-lookup            String instanceHostName = instance.getTaskManagerLocation().getHostname();            Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);            if (instanceSet == null) {                instanceSet = new HashSet<Instance>();                allInstancesByHost.put(instanceHostName, instanceSet);            }            instanceSet.add(instance);            // add it to the available resources and let potential waiters know            this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources            // add all slots as available            for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {                newSlotAvailable(instance);            }        }    }}

 

    @Override    public void newSlotAvailable(final Instance instance) {                // WARNING: The asynchrony here is necessary, because  we cannot guarantee the order        // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks:        //         // -> The scheduler needs to grab them (1) global scheduler lock        //                                     (2) slot/instance lock        // -> The slot releasing grabs (1) slot/instance (for releasing) and        //                             (2) scheduler (to check whether to take a new task item        //         // that leads with a high probability to deadlocks, when scheduling fast        this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances        Futures.future(new Callable<Object>() {            @Override            public Object call() throws Exception {                handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉                return null;            }        }, executionContext);    }

 

接着newInstanceAvailable,在InstanceManager里面被调用,

    private void notifyNewInstance(Instance instance) {        synchronized (this.instanceListeners) {            for (InstanceListener listener : this.instanceListeners) {                try {                    listener.newInstanceAvailable(instance);                }                catch (Throwable t) {                    LOG.error("Notification of new instance availability failed.", t);                }            }        }    }

notifyNewInstance在registerTaskManager中被调用,

registerTaskManager是在JobManager里面当taskManager注册时被调用的

case msg @ RegisterTaskManager(      resourceId,      connectionInfo,      hardwareInformation,      numberOfSlots) =>  val taskManager = sender()  currentResourceManager match {    case Some(rm) => //如果有resourceManager      val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动  }  // ResourceManager is told about the resource, now let‘s try to register TaskManager  if (instanceManager.isRegistered(resourceId)) { //如果已经注册过    val instanceID = instanceManager.getRegisteredInstance(resourceId).getId    taskManager ! decorateMessage(      AlreadyRegistered(        instanceID,        libraryCacheManager.getBlobServerPort))  } else { //新的resource    try {      val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull)      val taskManagerGateway = new ActorTaskManagerGateway(actorGateway)      val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager        taskManagerGateway,        connectionInfo,        hardwareInformation,        numberOfSlots)      taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager      taskManager ! decorateMessage(        AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册      // to be notified when the taskManager is no longer reachable      context.watch(taskManager)    }  }

 

这个版本没有实现图中的架构

当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager

当前ResourceManager只是起到一个记录的作用

ResourceManager没有从JobManager中独立出来

仍然是这种架构,

技术分享

Flink 1.1 – ResourceManager