/*
 * Decompiled with CFR 0.152.
 */
package io.ray.runtime.task;

import io.ray.api.ActorHandle;
import io.ray.api.BaseActorHandle;
import io.ray.api.id.ActorId;
import io.ray.api.id.ObjectId;
import io.ray.api.id.PlacementGroupId;
import io.ray.api.id.TaskId;
import io.ray.api.id.UniqueId;
import io.ray.api.options.ActorCreationOptions;
import io.ray.api.options.CallOptions;
import io.ray.api.options.PlacementGroupCreationOptions;
import io.ray.api.placementgroup.PlacementGroup;
import io.ray.runtime.AbstractRayRuntime;
import io.ray.runtime.ConcurrencyGroupImpl;
import io.ray.runtime.actor.LocalModeActorHandle;
import io.ray.runtime.context.LocalModeWorkerContext;
import io.ray.runtime.functionmanager.FunctionDescriptor;
import io.ray.runtime.functionmanager.JavaFunctionDescriptor;
import io.ray.runtime.generated.Common;
import io.ray.runtime.object.LocalModeObjectStore;
import io.ray.runtime.object.NativeRayObject;
import io.ray.runtime.placementgroup.PlacementGroupImpl;
import io.ray.runtime.task.FunctionArg;
import io.ray.runtime.task.LocalModeTaskExecutor;
import io.ray.runtime.task.TaskExecutor;
import io.ray.runtime.task.TaskSubmitter;
import io.ray.runtime.util.IdUtil;
import io.ray.shaded.com.google.common.base.Preconditions;
import io.ray.shaded.com.google.common.collect.ImmutableList;
import io.ray.shaded.com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalModeTaskSubmitter
implements TaskSubmitter {
    private static final Logger LOGGER = LoggerFactory.getLogger(LocalModeTaskSubmitter.class);
    private final Map<ObjectId, Set<Common.TaskSpec>> waitingTasks = new HashMap<ObjectId, Set<Common.TaskSpec>>();
    private final Object taskAndObjectLock = new Object();
    private final AbstractRayRuntime runtime;
    private final TaskExecutor taskExecutor;
    private final LocalModeObjectStore objectStore;
    private final Map<ActorId, Integer> actorMaxConcurrency = new ConcurrentHashMap<ActorId, Integer>();
    private final ExecutorService normalTaskExecutorService;
    private final Map<ActorId, LocalModeActorHandle> actorHandles = new ConcurrentHashMap<ActorId, LocalModeActorHandle>();
    private final Map<String, ActorHandle> namedActors = new ConcurrentHashMap<String, ActorHandle>();
    private final Map<ActorId, TaskExecutor.ActorContext> actorContexts = new ConcurrentHashMap<ActorId, TaskExecutor.ActorContext>();
    private final Map<PlacementGroupId, PlacementGroup> placementGroups = new ConcurrentHashMap<PlacementGroupId, PlacementGroup>();
    private static final String DEFAULT_CONCURRENCY_GROUP_NAME = "DEFAULT_CONCURRENCY_GROUP_NAME";
    private final ActorConcurrencyGroupManager actorConcurrencyGroupManager;

    public LocalModeTaskSubmitter(AbstractRayRuntime runtime, TaskExecutor taskExecutor, LocalModeObjectStore objectStore) {
        this.runtime = runtime;
        this.taskExecutor = taskExecutor;
        this.objectStore = objectStore;
        this.normalTaskExecutorService = Executors.newCachedThreadPool();
        this.actorConcurrencyGroupManager = new ActorConcurrencyGroupManager();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onObjectPut(ObjectId id) {
        Object object = this.taskAndObjectLock;
        synchronized (object) {
            Set<Common.TaskSpec> tasks = this.waitingTasks.remove(id);
            if (tasks != null) {
                for (Common.TaskSpec task : tasks) {
                    Set<ObjectId> unreadyObjects = this.getUnreadyObjects(task);
                    if (!unreadyObjects.isEmpty()) continue;
                    this.submitTaskSpec(task);
                }
            }
        }
    }

    private Set<ObjectId> getUnreadyObjects(Common.TaskSpec taskSpec) {
        ActorId actorId;
        ObjectId dummyActorCreationObjectId;
        HashSet<ObjectId> unreadyObjects = new HashSet<ObjectId>();
        for (Common.TaskArg arg : taskSpec.getArgsList()) {
            ObjectId id;
            ByteString idByteString = arg.getObjectRef().getObjectId();
            if (idByteString == ByteString.EMPTY || this.objectStore.isObjectReady(id = new ObjectId(idByteString.toByteArray()))) continue;
            unreadyObjects.add(id);
        }
        if (taskSpec.getType() == Common.TaskType.ACTOR_TASK && !this.objectStore.isObjectReady(dummyActorCreationObjectId = IdUtil.getActorCreationDummyObjectId(actorId = ActorId.fromBytes(taskSpec.getActorTaskSpec().getActorId().toByteArray())))) {
            unreadyObjects.add(dummyActorCreationObjectId);
        }
        return unreadyObjects;
    }

    private Common.TaskSpec.Builder getTaskSpecBuilder(Common.TaskType taskType, FunctionDescriptor functionDescriptor, List<FunctionArg> args) {
        byte[] taskIdBytes = new byte[24];
        new Random().nextBytes(taskIdBytes);
        List<String> functionDescriptorList = functionDescriptor.toList();
        Preconditions.checkState(functionDescriptorList.size() >= 3);
        return Common.TaskSpec.newBuilder().setType(taskType).setLanguage(Common.Language.JAVA).setJobId(ByteString.copyFrom(this.runtime.getRayConfig().getJobId().getBytes())).setTaskId(ByteString.copyFrom(taskIdBytes)).setFunctionDescriptor(Common.FunctionDescriptor.newBuilder().setJavaFunctionDescriptor(Common.JavaFunctionDescriptor.newBuilder().setClassName(functionDescriptorList.get(0)).setFunctionName(functionDescriptorList.get(1)).setSignature(functionDescriptorList.get(2)))).addAllArgs(args.stream().map(arg -> arg.id != null ? Common.TaskArg.newBuilder().setObjectRef(Common.ObjectReference.newBuilder().setObjectId(ByteString.copyFrom(arg.id.getBytes()))).build() : Common.TaskArg.newBuilder().setData(ByteString.copyFrom(arg.value.data)).setMetadata(arg.value.metadata != null ? ByteString.copyFrom(arg.value.metadata) : ByteString.EMPTY).build()).collect(Collectors.toList()));
    }

    @Override
    public List<ObjectId> submitTask(FunctionDescriptor functionDescriptor, List<FunctionArg> args, int numReturns, CallOptions options) {
        Preconditions.checkState(numReturns <= 1);
        Common.TaskSpec taskSpec = this.getTaskSpecBuilder(Common.TaskType.NORMAL_TASK, functionDescriptor, args).setNumReturns(numReturns).build();
        this.submitTaskSpec(taskSpec);
        return LocalModeTaskSubmitter.getReturnIds(taskSpec);
    }

    @Override
    public BaseActorHandle createActor(FunctionDescriptor functionDescriptor, List<FunctionArg> args, ActorCreationOptions options) throws IllegalArgumentException {
        if (options != null && options.getGroup() != null) {
            PlacementGroupImpl group = (PlacementGroupImpl)options.getGroup();
            Preconditions.checkArgument(options.getBundleIndex() == -1 || options.getBundleIndex() >= 0 && options.getBundleIndex() < group.getBundles().size(), String.format("Bundle index %s is invalid, the correct bundle index should be either in the range of 0 to the number of bundles or -1 which means put the task to any available bundles.", options.getBundleIndex()));
        }
        ActorId actorId = ActorId.fromRandom();
        Common.ActorCreationTaskSpec.Builder actorCreationTaskSpecBuilder = Common.ActorCreationTaskSpec.newBuilder().setActorId(ByteString.copyFrom(actorId.toByteBuffer())).setMaxConcurrency(options.getMaxConcurrency()).setMaxPendingCalls(options.getMaxPendingCalls());
        LocalModeTaskSubmitter.appendConcurrencyGroupsBuilder(actorCreationTaskSpecBuilder, options);
        Common.TaskSpec taskSpec = this.getTaskSpecBuilder(Common.TaskType.ACTOR_CREATION_TASK, functionDescriptor, args).setNumReturns(1L).setActorCreationTaskSpec(actorCreationTaskSpecBuilder.build()).build();
        this.submitTaskSpec(taskSpec);
        LocalModeActorHandle actorHandle = new LocalModeActorHandle(actorId, LocalModeTaskSubmitter.getReturnIds(taskSpec).get(0));
        this.actorHandles.put(actorId, actorHandle.copy());
        if (StringUtils.isNotBlank(options.getName())) {
            Preconditions.checkArgument(!this.namedActors.containsKey(options.getName()), String.format("Actor of name %s exists", options.getName()));
            this.namedActors.put(options.getName(), actorHandle);
        }
        return actorHandle;
    }

    @Override
    public List<ObjectId> submitActorTask(BaseActorHandle actor, FunctionDescriptor functionDescriptor, List<FunctionArg> args, int numReturns, CallOptions options) {
        Preconditions.checkState(numReturns <= 1);
        Common.TaskSpec.Builder builder = this.getTaskSpecBuilder(Common.TaskType.ACTOR_TASK, functionDescriptor, args);
        List<ObjectId> returnIds = LocalModeTaskSubmitter.getReturnIds(TaskId.fromBytes(builder.getTaskId().toByteArray()), numReturns);
        Common.TaskSpec taskSpec = builder.setNumReturns(numReturns).setActorTaskSpec(Common.ActorTaskSpec.newBuilder().setActorId(ByteString.copyFrom(actor.getId().getBytes())).build()).setConcurrencyGroupName(options.getConcurrencyGroupName()).build();
        this.submitTaskSpec(taskSpec);
        if (numReturns == 0) {
            return ImmutableList.of();
        }
        return ImmutableList.of(returnIds.get(0));
    }

    @Override
    public PlacementGroup createPlacementGroup(PlacementGroupCreationOptions creationOptions) {
        PlacementGroupImpl placementGroup = new PlacementGroupImpl.Builder().setId(PlacementGroupId.fromRandom()).setName(creationOptions.getName()).setBundles(creationOptions.getBundles()).setStrategy(creationOptions.getStrategy()).build();
        this.placementGroups.put(placementGroup.getId(), placementGroup);
        return placementGroup;
    }

    @Override
    public void removePlacementGroup(PlacementGroupId id) {
        this.placementGroups.remove(id);
    }

    @Override
    public boolean waitPlacementGroupReady(PlacementGroupId id, int timeoutSeconds) {
        return true;
    }

    @Override
    public BaseActorHandle getActor(ActorId actorId) {
        return this.actorHandles.get(actorId).copy();
    }

    public Optional<BaseActorHandle> getActor(String name) {
        ActorHandle actorHandle = this.namedActors.get(name);
        if (null == actorHandle) {
            return Optional.empty();
        }
        return Optional.of(actorHandle);
    }

    public void shutdown() {
        this.actorConcurrencyGroupManager.shutdown();
        this.normalTaskExecutorService.shutdown();
    }

    public static ActorId getActorId(Common.TaskSpec taskSpec) {
        ByteString actorId = null;
        if (taskSpec.getType() == Common.TaskType.ACTOR_CREATION_TASK) {
            actorId = taskSpec.getActorCreationTaskSpec().getActorId();
        } else if (taskSpec.getType() == Common.TaskType.ACTOR_TASK) {
            actorId = taskSpec.getActorTaskSpec().getActorId();
        }
        if (actorId == null) {
            return null;
        }
        return ActorId.fromBytes(actorId.toByteArray());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void submitTaskSpec(Common.TaskSpec taskSpec) {
        LOGGER.debug("Submitting task: {}.", (Object)taskSpec);
        Object object = this.taskAndObjectLock;
        synchronized (object) {
            block19: {
                Set<ObjectId> unreadyObjects = this.getUnreadyObjects(taskSpec);
                Runnable runnable = () -> {
                    try {
                        this.executeTask(taskSpec);
                        if (taskSpec.getType() == Common.TaskType.ACTOR_CREATION_TASK) {
                            ObjectId dummy = IdUtil.getActorCreationDummyObjectId(ActorId.fromBytes(taskSpec.getActorCreationTaskSpec().getActorId().toByteArray()));
                            this.objectStore.put(new Object(), dummy);
                        }
                    }
                    catch (Exception ex) {
                        LOGGER.error("Unexpected exception when executing a task.", ex);
                        System.exit(-1);
                    }
                };
                if (taskSpec.getType() == Common.TaskType.ACTOR_CREATION_TASK) {
                    this.actorMaxConcurrency.put(LocalModeTaskSubmitter.getActorId(taskSpec), taskSpec.getActorCreationTaskSpec().getMaxConcurrency());
                }
                if (unreadyObjects.isEmpty()) {
                    ExecutorService executorService;
                    if (taskSpec.getType() == Common.TaskType.ACTOR_CREATION_TASK) {
                        ActorConcurrencyGroupManager actorConcurrencyGroupManager = this.actorConcurrencyGroupManager;
                        synchronized (actorConcurrencyGroupManager) {
                            this.actorConcurrencyGroupManager.registerActor(LocalModeTaskSubmitter.getActorId(taskSpec), taskSpec);
                        }
                        executorService = this.normalTaskExecutorService;
                    } else if (taskSpec.getType() == Common.TaskType.ACTOR_TASK) {
                        ActorConcurrencyGroupManager actorConcurrencyGroupManager = this.actorConcurrencyGroupManager;
                        synchronized (actorConcurrencyGroupManager) {
                            executorService = this.actorConcurrencyGroupManager.getExecutorServiceForConcurrencyGroup(taskSpec);
                        }
                    } else {
                        executorService = this.normalTaskExecutorService;
                    }
                    try {
                        executorService.submit(runnable);
                    }
                    catch (RejectedExecutionException e) {
                        if (!executorService.isShutdown()) break block19;
                        LOGGER.warn("Ignore task submission due to the ExecutorService is shutdown. Task: {}", (Object)taskSpec);
                    }
                } else {
                    for (ObjectId id : unreadyObjects) {
                        this.waitingTasks.computeIfAbsent(id, k -> new HashSet()).add(taskSpec);
                    }
                }
            }
        }
    }

    private void executeTask(Common.TaskSpec taskSpec) {
        UniqueId workerId;
        TaskExecutor.ActorContext actorContext = null;
        if (taskSpec.getType() == Common.TaskType.ACTOR_TASK) {
            actorContext = this.actorContexts.get(LocalModeTaskSubmitter.getActorId(taskSpec));
            Preconditions.checkNotNull(actorContext);
            workerId = ((LocalModeTaskExecutor.LocalActorContext)actorContext).getWorkerId();
        } else {
            workerId = UniqueId.randomId();
        }
        this.taskExecutor.setActorContext(workerId, actorContext);
        List<Object> args = LocalModeTaskSubmitter.getFunctionArgs(taskSpec).stream().map(arg -> arg.id != null ? this.objectStore.getRaw(Collections.singletonList(arg.id), -1L).get(0) : arg.value).collect(Collectors.toList());
        ((LocalModeWorkerContext)this.runtime.getWorkerContext()).setCurrentTask(taskSpec);
        ((LocalModeWorkerContext)this.runtime.getWorkerContext()).setCurrentWorkerId(workerId);
        List<String> rayFunctionInfo = LocalModeTaskSubmitter.getJavaFunctionDescriptor(taskSpec).toList();
        this.taskExecutor.checkByteBufferArguments(rayFunctionInfo);
        List<NativeRayObject> returnObjects = this.taskExecutor.execute(rayFunctionInfo, args);
        if (taskSpec.getType() == Common.TaskType.ACTOR_CREATION_TASK) {
            Object ac = this.taskExecutor.getActorContext();
            Preconditions.checkNotNull(ac);
            this.actorContexts.put(LocalModeTaskSubmitter.getActorId(taskSpec), (TaskExecutor.ActorContext)ac);
        }
        ((LocalModeWorkerContext)this.runtime.getWorkerContext()).setCurrentTask(null);
        List<ObjectId> returnIds = LocalModeTaskSubmitter.getReturnIds(taskSpec);
        for (int i = 0; i < returnIds.size(); ++i) {
            NativeRayObject putObject = i >= returnObjects.size() ? new NativeRayObject(new byte[]{1}, null) : returnObjects.get(i);
            this.objectStore.putRaw(putObject, returnIds.get(i));
        }
    }

    private static JavaFunctionDescriptor getJavaFunctionDescriptor(Common.TaskSpec taskSpec) {
        Common.FunctionDescriptor functionDescriptor = taskSpec.getFunctionDescriptor();
        if (functionDescriptor.getFunctionDescriptorCase() == Common.FunctionDescriptor.FunctionDescriptorCase.JAVA_FUNCTION_DESCRIPTOR) {
            return new JavaFunctionDescriptor(functionDescriptor.getJavaFunctionDescriptor().getClassName(), functionDescriptor.getJavaFunctionDescriptor().getFunctionName(), functionDescriptor.getJavaFunctionDescriptor().getSignature());
        }
        throw new RuntimeException("Can't build non java function descriptor");
    }

    private static List<FunctionArg> getFunctionArgs(Common.TaskSpec taskSpec) {
        ArrayList<FunctionArg> functionArgs = new ArrayList<FunctionArg>();
        for (int i = 0; i < taskSpec.getArgsCount(); ++i) {
            Common.TaskArg arg = taskSpec.getArgs(i);
            if (arg.getObjectRef().getObjectId() != ByteString.EMPTY) {
                functionArgs.add(FunctionArg.passByReference(new ObjectId(arg.getObjectRef().getObjectId().toByteArray()), Common.Address.getDefaultInstance()));
                continue;
            }
            functionArgs.add(FunctionArg.passByValue(new NativeRayObject(arg.getData().toByteArray(), arg.getMetadata().toByteArray())));
        }
        return functionArgs;
    }

    private static List<ObjectId> getReturnIds(Common.TaskSpec taskSpec) {
        return LocalModeTaskSubmitter.getReturnIds(TaskId.fromBytes(taskSpec.getTaskId().toByteArray()), taskSpec.getNumReturns());
    }

    private static List<ObjectId> getReturnIds(TaskId taskId, long numReturns) {
        ArrayList<ObjectId> returnIds = new ArrayList<ObjectId>();
        int i = 0;
        while ((long)i < numReturns) {
            returnIds.add(ObjectId.fromByteBuffer((ByteBuffer)ByteBuffer.allocate(28).put(taskId.getBytes()).putInt(24, i + 1).position(0)));
            ++i;
        }
        return returnIds;
    }

    private boolean isConcurrentActor(Common.TaskSpec taskSpec) {
        ActorId actorId = LocalModeTaskSubmitter.getActorId(taskSpec);
        Preconditions.checkNotNull(actorId);
        return this.actorMaxConcurrency.containsKey(actorId) && this.actorMaxConcurrency.get(actorId) > 1;
    }

    private static void appendConcurrencyGroupsBuilder(Common.ActorCreationTaskSpec.Builder actorCreationTaskSpecBuilder, ActorCreationOptions options) {
        Preconditions.checkNotNull(actorCreationTaskSpecBuilder);
        if (options == null || options.getConcurrencyGroups() == null || options.getConcurrencyGroups().isEmpty()) {
            return;
        }
        options.getConcurrencyGroups().forEach(concurrencyGroup -> {
            Common.ConcurrencyGroup.Builder concurrencyGroupBuilder = Common.ConcurrencyGroup.newBuilder();
            ConcurrencyGroupImpl impl = (ConcurrencyGroupImpl)concurrencyGroup;
            concurrencyGroupBuilder.setMaxConcurrency(impl.getMaxConcurrency()).setName(impl.getName());
            LocalModeTaskSubmitter.appendFunctionDescriptors(concurrencyGroupBuilder, impl.getFunctionDescriptors());
            actorCreationTaskSpecBuilder.addConcurrencyGroups(concurrencyGroupBuilder);
        });
    }

    private static void appendFunctionDescriptors(Common.ConcurrencyGroup.Builder builder, List<FunctionDescriptor> functionDescriptors) {
        Preconditions.checkNotNull(functionDescriptors);
        Preconditions.checkState(!functionDescriptors.isEmpty());
        functionDescriptors.stream().map(functionDescriptor -> (JavaFunctionDescriptor)functionDescriptor).map(javaFunctionDescriptor -> Common.FunctionDescriptor.newBuilder().setJavaFunctionDescriptor(Common.JavaFunctionDescriptor.newBuilder().setClassName(javaFunctionDescriptor.className).setFunctionName(javaFunctionDescriptor.name).setSignature(javaFunctionDescriptor.signature))).forEach(builder::addFunctionDescriptors);
    }

    private static JavaFunctionDescriptor protoFunctionDescriptorToJava(Common.FunctionDescriptor protoFunctionDescriptor) {
        Preconditions.checkNotNull(protoFunctionDescriptor);
        Common.JavaFunctionDescriptor protoJavaFunctionDescriptor = protoFunctionDescriptor.getJavaFunctionDescriptor();
        return new JavaFunctionDescriptor(protoJavaFunctionDescriptor.getClassName(), protoJavaFunctionDescriptor.getFunctionName(), protoJavaFunctionDescriptor.getSignature());
    }

    private static final class ActorConcurrencyGroupManager {
        private Map<ActorId, ActorExecutorService> actorExecutorServices = new ConcurrentHashMap<ActorId, ActorExecutorService>();

        private ActorConcurrencyGroupManager() {
        }

        public synchronized void registerActor(ActorId actorId, Common.TaskSpec taskSpec) {
            Preconditions.checkState(!this.actorExecutorServices.containsKey(actorId));
            ActorExecutorService actorExecutorService = new ActorExecutorService(taskSpec);
            this.actorExecutorServices.put(actorId, actorExecutorService);
        }

        public synchronized ExecutorService getExecutorServiceForConcurrencyGroup(Common.TaskSpec taskSpec) {
            ActorId actorId = LocalModeTaskSubmitter.getActorId(taskSpec);
            Preconditions.checkState(this.actorExecutorServices.containsKey(actorId));
            ActorExecutorService actorExecutorService = this.actorExecutorServices.get(actorId);
            return actorExecutorService.getExecutorService(taskSpec);
        }

        public synchronized void shutdown() {
            this.actorExecutorServices.forEach((actorId, actorExecutorService) -> actorExecutorService.shutdown());
            this.actorExecutorServices.clear();
        }
    }

    private static final class ActorExecutorService {
        private Map<String, ExecutorService> services = new ConcurrentHashMap<String, ExecutorService>();
        private Map<JavaFunctionDescriptor, String> indexFunctionToConcurrencyGroupName = new ConcurrentHashMap<JavaFunctionDescriptor, String>();

        public ActorExecutorService(Common.TaskSpec taskSpec) {
            Common.ActorCreationTaskSpec actorCreationTaskSpec = taskSpec.getActorCreationTaskSpec();
            Preconditions.checkNotNull(actorCreationTaskSpec);
            List<Common.ConcurrencyGroup> concurrencyGroups = actorCreationTaskSpec.getConcurrencyGroupsList();
            concurrencyGroups.forEach(concurrencyGroup -> {
                ExecutorService executorService = Executors.newFixedThreadPool(concurrencyGroup.getMaxConcurrency());
                Preconditions.checkState(!this.services.containsKey(concurrencyGroup.getName()));
                this.services.put(concurrencyGroup.getName(), executorService);
                concurrencyGroup.getFunctionDescriptorsList().forEach(fd -> this.indexFunctionToConcurrencyGroupName.put(LocalModeTaskSubmitter.protoFunctionDescriptorToJava(fd), concurrencyGroup.getName()));
            });
            this.services.put(LocalModeTaskSubmitter.DEFAULT_CONCURRENCY_GROUP_NAME, Executors.newFixedThreadPool(actorCreationTaskSpec.getMaxConcurrency()));
        }

        public synchronized ExecutorService getExecutorService(Common.TaskSpec taskSpec) {
            String concurrencyGroupName = taskSpec.getConcurrencyGroupName();
            Preconditions.checkNotNull(concurrencyGroupName);
            if (!concurrencyGroupName.isEmpty()) {
                Preconditions.checkState(this.services.containsKey(concurrencyGroupName));
                return this.services.get(concurrencyGroupName);
            }
            JavaFunctionDescriptor javaFunctionDescriptor = LocalModeTaskSubmitter.protoFunctionDescriptorToJava(taskSpec.getFunctionDescriptor());
            if (this.indexFunctionToConcurrencyGroupName.containsKey(javaFunctionDescriptor)) {
                concurrencyGroupName = this.indexFunctionToConcurrencyGroupName.get(javaFunctionDescriptor);
                Preconditions.checkState(this.services.containsKey(concurrencyGroupName));
                return this.services.get(concurrencyGroupName);
            }
            return this.services.get(LocalModeTaskSubmitter.DEFAULT_CONCURRENCY_GROUP_NAME);
        }

        public synchronized void shutdown() {
            this.services.forEach((key, service) -> service.shutdown());
            this.services.clear();
        }
    }
}

