import base64
import collections
import errno
import io
import json
import logging
import mmap
import multiprocessing
import os
import shutil
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
from typing import IO, AnyStr, List, Optional

from filelock import FileLock

# Ray modules
import ray
import ray._private.ray_constants as ray_constants
from ray._common.network_utils import (
    build_address,
    get_localhost_ip,
    is_ipv6,
    node_ip_address_from_perspective,
    parse_address,
)
from ray._private.ray_constants import RAY_NODE_IP_FILENAME
from ray._private.resource_isolation_config import ResourceIsolationConfig
from ray._raylet import GcsClient, GcsClientOptions
from ray.core.generated.common_pb2 import Language

# Import psutil after ray so the packaged version is used.
import psutil

resource = None
if sys.platform != "win32":
    _timeout = 30
else:
    _timeout = 60

EXE_SUFFIX = ".exe" if sys.platform == "win32" else ""

# True if processes are run in the valgrind profiler.
RUN_RAYLET_PROFILER = False

# Location of the redis server.
RAY_HOME = os.path.join(os.path.dirname(os.path.dirname(__file__)), "..", "..")
RAY_PATH = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
RAY_PRIVATE_DIR = "_private"
AUTOSCALER_PRIVATE_DIR = os.path.join("autoscaler", "_private")
AUTOSCALER_V2_DIR = os.path.join("autoscaler", "v2")

# Location of the raylet executables.
RAYLET_EXECUTABLE = os.path.join(
    RAY_PATH, "core", "src", "ray", "raylet", "raylet" + EXE_SUFFIX
)
GCS_SERVER_EXECUTABLE = os.path.join(
    RAY_PATH, "core", "src", "ray", "gcs", "gcs_server" + EXE_SUFFIX
)

JEMALLOC_SO = os.path.join(RAY_PATH, "core", "libjemalloc.so")

JEMALLOC_SO = JEMALLOC_SO if os.path.exists(JEMALLOC_SO) else None

# Location of the cpp default worker executables.
DEFAULT_WORKER_EXECUTABLE = os.path.join(RAY_PATH, "cpp", "default_worker" + EXE_SUFFIX)

# Location of the native libraries.
DEFAULT_NATIVE_LIBRARY_PATH = os.path.join(RAY_PATH, "cpp", "lib")

DASHBOARD_DEPENDENCY_ERROR_MESSAGE = (
    "Not all Ray Dashboard dependencies were "
    "found. To use the dashboard please "
    "install Ray using `pip install "
    "ray[default]`."
)

RAY_JEMALLOC_LIB_PATH = "RAY_JEMALLOC_LIB_PATH"
RAY_JEMALLOC_CONF = "RAY_JEMALLOC_CONF"
RAY_JEMALLOC_PROFILE = "RAY_JEMALLOC_PROFILE"

# Comma separated name of components that will run memory profiler.
# Ray uses `memray` to memory profile internal components.
# The name of the component must be one of ray_constants.PROCESS_TYPE*.
RAY_MEMRAY_PROFILE_COMPONENT_ENV = "RAY_INTERNAL_MEM_PROFILE_COMPONENTS"
# Options to specify for `memray run` command. See
# `memray run --help` for more details.
# Example:
# RAY_INTERNAL_MEM_PROFILE_OPTIONS="--live,--live-port,3456,-q,"
# -> `memray run --live --live-port 3456 -q`
RAY_MEMRAY_PROFILE_OPTIONS_ENV = "RAY_INTERNAL_MEM_PROFILE_OPTIONS"

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray provides a default configuration at
# entry/init points.
logger = logging.getLogger(__name__)

ProcessInfo = collections.namedtuple(
    "ProcessInfo",
    [
        "process",
        "stdout_file",
        "stderr_file",
        "use_valgrind",
        "use_gdb",
        "use_valgrind_profiler",
        "use_perftools_profiler",
        "use_tmux",
    ],
)


def _site_flags() -> List[str]:
    """Detect whether flags related to site packages are enabled for the current
    interpreter. To run Ray in hermetic build environments, it helps to pass these flags
    down to Python workers.
    """
    flags = []
    # sys.flags hidden behind helper methods for unit testing.
    if _no_site():
        flags.append("-S")
    if _no_user_site():
        flags.append("-s")
    return flags


# sys.flags hidden behind helper methods for unit testing.
def _no_site():
    return sys.flags.no_site


# sys.flags hidden behind helper methods for unit testing.
def _no_user_site():
    return sys.flags.no_user_site


def _build_python_executable_command_memory_profileable(
    component: str, session_dir: str, unbuffered: bool = True
):
    """Build the Python executable command.

    It runs a memory profiler if env var is configured.

    Args:
        component: Name of the component. It must be one of
            ray_constants.PROCESS_TYPE*.
        session_dir: The directory name of the Ray session.
        unbuffered: If true, Python executable is started with unbuffered option.
            e.g., `-u`.
            It means the logs are flushed immediately (good when there's a failure),
            but writing to a log file can be slower.
    """
    command = [
        sys.executable,
    ]
    if unbuffered:
        command.append("-u")
    components_to_memory_profile = os.getenv(RAY_MEMRAY_PROFILE_COMPONENT_ENV, "")
    if not components_to_memory_profile:
        return command

    components_to_memory_profile = set(components_to_memory_profile.split(","))
    try:
        import memray  # noqa: F401
    except ImportError:
        raise ImportError(
            "Memray is required to memory profiler on components "
            f"{components_to_memory_profile}. Run `pip install memray`."
        )
    if component in components_to_memory_profile:
        session_dir = Path(session_dir)
        session_name = session_dir.name
        profile_dir = session_dir / "profile"
        profile_dir.mkdir(exist_ok=True)
        output_file_path = profile_dir / f"{session_name}_memory_{component}.bin"
        options = os.getenv(RAY_MEMRAY_PROFILE_OPTIONS_ENV, None)
        options = options.split(",") if options else []
        # If neither --live nor any output option (-o/--output) is specified, add the default output path
        if not any(opt in options for opt in ("--live", "-o", "--output")):
            options[0:0] = ["-o", str(output_file_path)]
        command.extend(["-m", "memray", "run", *options])

    return command


def _get_gcs_client_options(gcs_server_address):
    return GcsClientOptions.create(
        gcs_server_address,
        None,
        allow_cluster_id_nil=True,
        fetch_cluster_id_if_nil=False,
    )


def serialize_config(config):
    return base64.b64encode(json.dumps(config).encode("utf-8")).decode("utf-8")


def propagate_jemalloc_env_var(
    *,
    jemalloc_path: str,
    jemalloc_conf: str,
    jemalloc_comps: List[str],
    process_type: str,
):
    """Read the jemalloc memory profiling related
    env var and return the dictionary that translates
    them to proper jemalloc related env vars.

    For example, if users specify `RAY_JEMALLOC_LIB_PATH`,
    it is translated into `LD_PRELOAD` which is needed to
    run Jemalloc as a shared library.

    Params:
        jemalloc_path: The path to the jemalloc shared library.
        jemalloc_conf: `,` separated string of jemalloc config.
        jemalloc_comps: The list of Ray components
            that we will profile.
        process_type: The process type that needs jemalloc
            env var for memory profiling. If it doesn't match one of
            jemalloc_comps, the function will return an empty dict.

    Returns:
        dictionary of {env_var: value}
            that are needed to jemalloc profiling. The caller can
            call `dict.update(return_value_of_this_func)` to
            update the dict of env vars. If the process_type doesn't
            match jemalloc_comps, it will return an empty dict.
    """
    assert isinstance(jemalloc_comps, list)
    assert process_type is not None
    process_type = process_type.lower()
    if not jemalloc_path:
        return {}

    env_vars = {
        "LD_PRELOAD": jemalloc_path,
        "RAY_LD_PRELOAD_ON_WORKERS": os.environ.get("RAY_LD_PRELOAD_ON_WORKERS", "0"),
    }
    if process_type in jemalloc_comps and jemalloc_conf:
        env_vars.update({"MALLOC_CONF": jemalloc_conf})
    return env_vars


class ConsolePopen(subprocess.Popen):
    if sys.platform == "win32":

        def terminate(self):
            if isinstance(self.stdin, io.IOBase):
                self.stdin.close()
            if self._use_signals:
                self.send_signal(signal.CTRL_BREAK_EVENT)
            else:
                super(ConsolePopen, self).terminate()

        def __init__(self, *args, **kwargs):
            # CREATE_NEW_PROCESS_GROUP is used to send Ctrl+C on Windows:
            # https://docs.python.org/3/library/subprocess.html#subprocess.Popen.send_signal
            new_pgroup = subprocess.CREATE_NEW_PROCESS_GROUP
            flags_to_add = 0
            if ray._private.utils.detect_fate_sharing_support():
                # If we don't have kernel-mode fate-sharing, then don't do this
                # because our children need to be in out process group for
                # the process reaper to properly terminate them.
                flags_to_add = new_pgroup
            flags_key = "creationflags"
            if flags_to_add:
                kwargs[flags_key] = (kwargs.get(flags_key) or 0) | flags_to_add
            self._use_signals = kwargs[flags_key] & new_pgroup
            super(ConsolePopen, self).__init__(*args, **kwargs)


def _find_address_from_flag(flag: str):
    """
    Attempts to find all valid Ray addresses on this node, specified by the
    flag.

    Params:
        flag: `--redis-address` or `--gcs-address`
    Returns:
        Set of detected addresses.
    """
    # Using Redis address `--redis-address` as an example:
    # Currently, this extracts the deprecated --redis-address from the command
    # that launched the raylet running on this node, if any. Anyone looking to
    # edit this function should be warned that these commands look like, for
    # example:
    # /usr/local/lib/python3.8/dist-packages/ray/core/src/ray/raylet/raylet
    # --redis_address=123.456.78.910 --node_ip_address=123.456.78.910
    # --raylet_socket_name=... --store_socket_name=... --object_manager_port=0
    # --min_worker_port=10000 --max_worker_port=19999
    # --node_manager_port=58578 --redis_port=6379
    # --maximum_startup_concurrency=8
    # --static_resource_list=node:123.456.78.910,1.0,object_store_memory,66
    # --config_list=plasma_store_as_thread,True
    # --python_worker_command=/usr/bin/python
    #     /usr/local/lib/python3.8/dist-packages/ray/workers/default_worker.py
    #     --redis-address=123.456.78.910:6379
    #     --node-ip-address=123.456.78.910 --node-manager-port=58578
    #     --object-store-name=... --raylet-name=...
    #     --temp-dir=/tmp/ray
    #     --metrics-agent-port=41856 --redis-password=[MASKED]
    #     --java_worker_command= --cpp_worker_command=
    #     --redis_password=[MASKED] --temp_dir=/tmp/ray --session_dir=...
    #     --metrics-agent-port=41856 --metrics_export_port=64229
    #     --dashboard_agent_command=/usr/bin/python
    #     -u /usr/local/lib/python3.8/dist-packages/ray/dashboard/agent.py
    #         --redis-address=123.456.78.910:6379 --metrics-export-port=64229
    #         --dashboard-agent-port=41856 --node-manager-port=58578
    #         --object-store-name=... --raylet-name=... --temp-dir=/tmp/ray
    #         --log-dir=/tmp/ray/session_2020-11-08_14-29-07_199128_278000/logs
    #         --redis-password=[MASKED] --object_store_memory=5037192806
    #         --plasma_directory=/tmp
    # Longer arguments are elided with ... but all arguments from this instance
    # are included, to provide a sense of what is in these.
    # Indeed, we had to pull --redis-address to the front of each call to make
    # this readable.
    # As you can see, this is very long and complex, which is why we can't
    # simply extract all the arguments using regular expressions and
    # present a dict as if we never lost track of these arguments, for
    # example. Picking out --redis-address below looks like it might grab the
    # wrong thing, but double-checking that we're finding the correct process
    # by checking that the contents look like we expect would probably be prone
    # to choking in unexpected ways.
    # Notice that --redis-address appears twice. This is not a copy-paste
    # error; this is the reason why the for loop below attempts to pick out
    # every appearance of --redis-address.

    # The --redis-address here is what is now called the --address, but it
    # appears in the default_worker.py and agent.py calls as --redis-address.
    addresses = set()
    for proc in psutil.process_iter(["cmdline"]):
        try:
            # HACK: Workaround for UNIX idiosyncrasy
            # Normally, cmdline() is supposed to return the argument list.
            # But it in some cases (such as when setproctitle is called),
            # an arbitrary string resembling a command-line is stored in
            # the first argument.
            # Explanation: https://unix.stackexchange.com/a/432681
            # More info: https://github.com/giampaolo/psutil/issues/1179
            cmdline = proc.info["cmdline"]
            # NOTE(kfstorm): To support Windows, we can't use
            # `os.path.basename(cmdline[0]) == "raylet"` here.

            if _is_raylet_process(cmdline):
                for arglist in cmdline:
                    # Given we're merely seeking --redis-address, we just split
                    # every argument on spaces for now.
                    for arg in arglist.split(" "):
                        # TODO(ekl): Find a robust solution for locating Redis.
                        if arg.startswith(flag):
                            proc_addr = arg.split("=")[1]
                            # TODO(mwtian): remove this workaround after Ray
                            # no longer sets --redis-address to None.
                            if proc_addr != "" and proc_addr != "None":
                                addresses.add(proc_addr)
        except psutil.AccessDenied:
            pass
        except psutil.NoSuchProcess:
            pass
    return addresses


def find_gcs_addresses():
    """Finds any local GCS processes based on grepping ps."""
    return _find_address_from_flag("--gcs-address")


def find_bootstrap_address(temp_dir: Optional[str]):
    """Finds the latest Ray cluster address to connect to, if any. This is the
    GCS address connected to by the last successful `ray start`."""
    return ray._private.utils.read_ray_address(temp_dir)


def get_ray_address_from_environment(addr: str, temp_dir: Optional[str]):
    """Attempts to find the address of Ray cluster to use, in this order:

    1. Use RAY_ADDRESS if defined and nonempty.
    2. If no address is provided or the provided address is "auto", use the
    address in /tmp/ray/ray_current_cluster if available. This will error if
    the specified address is None and there is no address found. For "auto",
    we will fallback to connecting to any detected Ray cluster (legacy).
    3. Otherwise, use the provided address.

    Returns:
        A string to pass into `ray.init(address=...)`, e.g. ip:port, `auto`.
    """
    env_addr = os.environ.get(ray_constants.RAY_ADDRESS_ENVIRONMENT_VARIABLE)
    if env_addr is not None and env_addr != "":
        addr = env_addr

    if addr is not None and addr != "auto":
        return addr
    # We should try to automatically find an active local instance.
    gcs_addrs = find_gcs_addresses()
    bootstrap_addr = find_bootstrap_address(temp_dir)

    if len(gcs_addrs) > 1 and bootstrap_addr is not None:
        logger.warning(
            f"Found multiple active Ray instances: {gcs_addrs}. "
            f"Connecting to latest cluster at {bootstrap_addr}. "
            "You can override this by setting the `--address` flag "
            "or `RAY_ADDRESS` environment variable."
        )
    elif len(gcs_addrs) > 0 and addr == "auto":
        # Preserve legacy "auto" behavior of connecting to any cluster, even if not
        # started with ray start. However if addr is None, we will raise an error.
        bootstrap_addr = list(gcs_addrs).pop()

    if bootstrap_addr is None:
        if addr is None:
            # Caller should start a new instance.
            return None
        else:
            raise ConnectionError(
                "Could not find any running Ray instance. "
                "Please specify the one to connect to by setting `--address` flag "
                "or `RAY_ADDRESS` environment variable."
            )

    return bootstrap_addr


def wait_for_node(
    gcs_address: str,
    node_plasma_store_socket_name: str,
    timeout: int = _timeout,
):
    """Wait until this node has appeared in the client table.
    NOTE: Makes an RPC to the GCS up to every 0.1 seconds to
    get all node info. Use only for testing.

    Args:
        gcs_address: The gcs address
        node_plasma_store_socket_name: The
            plasma_store_socket_name for the given node which we wait for.
        timeout: The amount of time in seconds to wait before raising an
            exception.

    Raises:
        TimeoutError: An exception is raised if the timeout expires before
            the node appears in the client table.
    """
    gcs_options = GcsClientOptions.create(
        gcs_address, None, allow_cluster_id_nil=True, fetch_cluster_id_if_nil=False
    )
    global_state = ray._private.state.GlobalState()
    global_state._initialize_global_state(gcs_options)
    start_time = time.time()
    while time.time() - start_time < timeout:
        clients = global_state.node_table()
        object_store_socket_names = [
            client["ObjectStoreSocketName"] for client in clients
        ]
        if node_plasma_store_socket_name in object_store_socket_names:
            return
        else:
            time.sleep(0.1)
    raise TimeoutError(
        f"Timed out after {timeout} seconds while waiting for node to startup. "
        f"Did not find socket name {node_plasma_store_socket_name} in the list "
        "of object store socket names."
    )


def get_node_to_connect_for_driver(gcs_address, node_ip_address):
    # Get node table from global state accessor.
    global_state = ray._private.state.GlobalState()
    gcs_options = _get_gcs_client_options(gcs_address)
    global_state._initialize_global_state(gcs_options)
    return global_state.get_node_to_connect_for_driver(node_ip_address)


def get_node(gcs_address, node_id):
    """
    Get the node information from the global state accessor.
    """
    global_state = ray._private.state.GlobalState()
    gcs_options = _get_gcs_client_options(gcs_address)
    global_state._initialize_global_state(gcs_options)
    return global_state.get_node(node_id)


def get_webui_url_from_internal_kv():
    assert ray.experimental.internal_kv._internal_kv_initialized()
    webui_url = ray.experimental.internal_kv._internal_kv_get(
        "webui:url", namespace=ray_constants.KV_NAMESPACE_DASHBOARD
    )
    return ray._common.utils.decode(webui_url) if webui_url is not None else None


def remaining_processes_alive():
    """See if the remaining processes are alive or not.

    Note that this ignores processes that have been explicitly killed,
    e.g., via a command like node.kill_raylet().

    Returns:
        True if the remaining processes started by ray.init() are alive and
            False otherwise.

    Raises:
        Exception: An exception is raised if the processes were not started by
            ray.init().
    """
    if ray._private.worker._global_node is None:
        raise RuntimeError(
            "This process is not in a position to determine "
            "whether all processes are alive or not."
        )
    return ray._private.worker._global_node.remaining_processes_alive()


def canonicalize_bootstrap_address(
    addr: str, temp_dir: Optional[str] = None
) -> Optional[str]:
    """Canonicalizes Ray cluster bootstrap address to host:port.
    Reads address from the environment if needed.

    This function should be used to process user supplied Ray cluster address,
    via ray.init() or `--address` flags, before using the address to connect.

    Returns:
        Ray cluster address string in <host:port> format or None if the caller
        should start a local Ray instance.
    """
    if addr is None or addr == "auto":
        addr = get_ray_address_from_environment(addr, temp_dir)
    if addr is None or addr == "local":
        return None

    parsed = parse_address(addr)
    if parsed is None:
        raise ValueError(f"Invalid address format: {addr}")
    host, port = parsed

    try:
        bootstrap_host = resolve_ip_for_localhost(host)
    except Exception:
        logger.exception(f"Failed to convert {addr} to host:port")
        raise
    return build_address(bootstrap_host, port)


def canonicalize_bootstrap_address_or_die(
    addr: str, temp_dir: Optional[str] = None
) -> str:
    """Canonicalizes Ray cluster bootstrap address to host:port.

    This function should be used when the caller expects there to be an active
    and local Ray instance. If no address is provided or address="auto", this
    will autodetect the latest Ray instance created with `ray start`.

    For convenience, if no address can be autodetected, this function will also
    look for any running local GCS processes, based on pgrep output. This is to
    allow easier use of Ray CLIs when debugging a local Ray instance (whose GCS
    addresses are not recorded).

    Returns:
        Ray cluster address string in <host:port> format. Throws a
        ConnectionError if zero or multiple active Ray instances are
        autodetected.
    """
    bootstrap_addr = canonicalize_bootstrap_address(addr, temp_dir=temp_dir)
    if bootstrap_addr is not None:
        return bootstrap_addr

    running_gcs_addresses = find_gcs_addresses()
    if len(running_gcs_addresses) == 0:
        raise ConnectionError(
            "Could not find any running Ray instance. "
            "Please specify the one to connect to by setting the `--address` "
            "flag or `RAY_ADDRESS` environment variable."
        )
    if len(running_gcs_addresses) > 1:
        raise ConnectionError(
            f"Found multiple active Ray instances: {running_gcs_addresses}. "
            "Please specify the one to connect to by setting the `--address` "
            "flag or `RAY_ADDRESS` environment variable."
        )
    return running_gcs_addresses.pop()


def extract_ip_port(bootstrap_address: str):
    ip_port = parse_address(bootstrap_address)
    if ip_port is None:
        raise ValueError(
            f"Malformed address {bootstrap_address}. " f"Expected '<host>:<port>'."
        )
    ip, port = ip_port
    try:
        port = int(port)
    except ValueError:
        raise ValueError(f"Malformed address port {port}. Must be an integer.")
    if port < 1024 or port > 65535:
        raise ValueError(
            f"Invalid address port {port}. Must be between 1024 "
            "and 65535 (inclusive)."
        )
    return ip, port


def resolve_ip_for_localhost(host: str):
    """Convert to a remotely reachable IP if the host is "localhost",
            "127.0.0.1", or "::1". Otherwise do nothing.

    Args:
        host: The hostname or IP address.

    Returns:
        The same host but with the local host replaced by remotely
            reachable IP.
    """
    if not host:
        raise ValueError(f"Malformed host: {host}")
    if host == "127.0.0.1" or host == "::1" or host == "localhost":
        # Make sure localhost isn't resolved to the loopback ip
        return get_node_ip_address()
    else:
        return host


# NOTE: This API should not be used when you obtain the
# IP address when ray.init is not called because
# it cannot find the IP address if it is specified by
# ray start --node-ip-address. You should instead use
# get_cached_node_ip_address.
def get_node_ip_address(address=None):
    if ray._private.worker._global_node is not None:
        return ray._private.worker._global_node.node_ip_address

    if not ray_constants.ENABLE_RAY_CLUSTER:
        # Use loopback IP as the local IP address to prevent bothersome
        # firewall popups on OSX and Windows.
        # https://github.com/ray-project/ray/issues/18730.
        return get_localhost_ip()

    return node_ip_address_from_perspective(address)


def get_cached_node_ip_address(session_dir: str) -> str:
    """Get a node address cached on this session.

    If a ray instance is started by `ray start --node-ip-address`,
    the node ip address is cached to a file RAY_NODE_IP_FILENAME.
    Otherwise, the file exists, but it is emptyl.

    This API is process-safe, meaning the file access is protected by
    a file lock.

    Args:
        session_dir: Path to the Ray session directory.

    Returns:
        node_ip_address cached on the current node. None if the node
        the file doesn't exist, meaning ray instance hasn't been
        started on a current node. If node_ip_address is not written
        to a file, it means --node-ip-address is not given, and in this
        case, we find the IP address ourselves.
    """
    file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME))
    cached_node_ip_address = {}

    with FileLock(str(file_path.absolute()) + ".lock"):
        if not file_path.exists():
            return None

        with file_path.open() as f:
            cached_node_ip_address.update(json.load(f))

        if "node_ip_address" in cached_node_ip_address:
            return cached_node_ip_address["node_ip_address"]
        else:
            return ray.util.get_node_ip_address()


def write_node_ip_address(session_dir: str, node_ip_address: Optional[str]) -> None:
    """Write a node ip address of the current session to
    RAY_NODE_IP_FILENAME.

    If a ray instance is started by `ray start --node-ip-address`,
    the node ip address is cached to a file RAY_NODE_IP_FILENAME.

    This API is process-safe, meaning the file access is protected by
    a file lock.

    The file contains a single string node_ip_address. If nothing
    is written, it means --node-ip-address was not given, and Ray
    resolves the IP address on its own. It assumes in a single node,
    you can have only 1 IP address (which is the assumption ray
    has in general).

    node_ip_address is the ip address of the current node.

    Args:
        session_dir: The path to Ray session directory.
        node_ip_address: The node IP address of the current node.
            If None, it means the node ip address is not given
            by --node-ip-address. In this case, we don't write
            anything to a file.
    """
    file_path = Path(os.path.join(session_dir, RAY_NODE_IP_FILENAME))
    cached_node_ip_address = {}

    with FileLock(str(file_path.absolute()) + ".lock"):
        if not file_path.exists():
            with file_path.open(mode="w") as f:
                json.dump({}, f)

        with file_path.open() as f:
            cached_node_ip_address.update(json.load(f))

        cached_node_ip = cached_node_ip_address.get("node_ip_address")

        if node_ip_address is not None:
            if cached_node_ip:
                if cached_node_ip == node_ip_address:
                    # Nothing to do.
                    return
                else:
                    logger.warning(
                        "The node IP address of the current host recorded "
                        f"in {RAY_NODE_IP_FILENAME} ({cached_node_ip}) "
                        "is different from the current IP address: "
                        f"{node_ip_address}. Ray will use {node_ip_address} "
                        "as the current node's IP address. "
                        "Creating 2 instances in the same host with different "
                        "IP address is not supported. "
                        "Please create an enhnacement request to"
                        "https://github.com/ray-project/ray/issues."
                    )

            cached_node_ip_address["node_ip_address"] = node_ip_address
            with file_path.open(mode="w") as f:
                json.dump(cached_node_ip_address, f)


def get_node_instance_id():
    """Get the specified node instance id of the current node.

    Returns:
        The node instance id of the current node.
    """
    return os.getenv("RAY_CLOUD_INSTANCE_ID", "")


def create_redis_client(redis_address, password=None, username=None):
    """Create a Redis client.

    Args:
        redis_address: The IP address and port of the Redis server.
        password: The password for Redis authentication.
        username: The username for Redis authentication.

    Returns:
        A Redis client.
    """
    import redis

    if not hasattr(create_redis_client, "instances"):
        create_redis_client.instances = {}

    num_retries = ray_constants.START_REDIS_WAIT_RETRIES
    delay = 0.001
    for i in range(num_retries):
        cli = create_redis_client.instances.get(redis_address)
        if cli is None:
            redis_ip_address, redis_port = extract_ip_port(
                canonicalize_bootstrap_address_or_die(redis_address)
            )
            cli = redis.StrictRedis(
                host=redis_ip_address,
                port=int(redis_port),
                username=username,
                password=password,
            )
            create_redis_client.instances[redis_address] = cli
        try:
            cli.ping()
            return cli
        except Exception as e:
            create_redis_client.instances.pop(redis_address)
            if i >= num_retries - 1:
                raise RuntimeError(
                    f"Unable to connect to Redis at {redis_address}: {e}"
                )
            # Wait a little bit.
            time.sleep(delay)
            # Make sure the retry interval doesn't increase too large.
            delay = min(1, delay * 2)


def start_ray_process(
    command: List[str],
    process_type: str,
    fate_share: bool,
    env_updates: Optional[dict] = None,
    cwd: Optional[str] = None,
    use_valgrind: bool = False,
    use_gdb: bool = False,
    use_valgrind_profiler: bool = False,
    use_perftools_profiler: bool = False,
    use_tmux: bool = False,
    stdout_file: Optional[IO[AnyStr]] = None,
    stderr_file: Optional[IO[AnyStr]] = None,
    pipe_stdin: bool = False,
):
    """Start one of the Ray processes.

    TODO(rkn): We need to figure out how these commands interact. For example,
    it may only make sense to start a process in gdb if we also start it in
    tmux. Similarly, certain combinations probably don't make sense, like
    simultaneously running the process in valgrind and the profiler.

    Args:
        command: The command to use to start the Ray process.
        process_type: The type of the process that is being started
            (e.g., "raylet").
        fate_share: If true, the child will be killed if its parent (us) dies.
            True must only be passed after detection of this functionality.
        env_updates: A dictionary of additional environment variables to
            run the command with (in addition to the caller's environment
            variables).
        cwd: The directory to run the process in.
        use_valgrind: True if we should start the process in valgrind.
        use_gdb: True if we should start the process in gdb.
        use_valgrind_profiler: True if we should start the process in
            the valgrind profiler.
        use_perftools_profiler: True if we should profile the process
            using perftools.
        use_tmux: True if we should start the process in tmux.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        pipe_stdin: If true, subprocess.PIPE will be passed to the process as
            stdin.

    Returns:
        Information about the process that was started including a handle to
            the process that was started.
    """
    # Detect which flags are set through environment variables.
    valgrind_env_var = f"RAY_{process_type.upper()}_VALGRIND"
    if os.environ.get(valgrind_env_var) == "1":
        logger.info("Detected environment variable '%s'.", valgrind_env_var)
        use_valgrind = True
    valgrind_profiler_env_var = f"RAY_{process_type.upper()}_VALGRIND_PROFILER"
    if os.environ.get(valgrind_profiler_env_var) == "1":
        logger.info("Detected environment variable '%s'.", valgrind_profiler_env_var)
        use_valgrind_profiler = True
    perftools_profiler_env_var = f"RAY_{process_type.upper()}_PERFTOOLS_PROFILER"
    if os.environ.get(perftools_profiler_env_var) == "1":
        logger.info("Detected environment variable '%s'.", perftools_profiler_env_var)
        use_perftools_profiler = True
    tmux_env_var = f"RAY_{process_type.upper()}_TMUX"
    if os.environ.get(tmux_env_var) == "1":
        logger.info("Detected environment variable '%s'.", tmux_env_var)
        use_tmux = True
    gdb_env_var = f"RAY_{process_type.upper()}_GDB"
    if os.environ.get(gdb_env_var) == "1":
        logger.info("Detected environment variable '%s'.", gdb_env_var)
        use_gdb = True
    # Jemalloc memory profiling.
    if os.environ.get("LD_PRELOAD") is None:
        jemalloc_lib_path = os.environ.get(RAY_JEMALLOC_LIB_PATH, JEMALLOC_SO)
        jemalloc_conf = os.environ.get(RAY_JEMALLOC_CONF)
        jemalloc_comps = os.environ.get(RAY_JEMALLOC_PROFILE)
        jemalloc_comps = [] if not jemalloc_comps else jemalloc_comps.split(",")
        jemalloc_env_vars = propagate_jemalloc_env_var(
            jemalloc_path=jemalloc_lib_path,
            jemalloc_conf=jemalloc_conf,
            jemalloc_comps=jemalloc_comps,
            process_type=process_type,
        )
    else:
        jemalloc_env_vars = {}

    use_jemalloc_mem_profiler = "MALLOC_CONF" in jemalloc_env_vars

    if (
        sum(
            [
                use_gdb,
                use_valgrind,
                use_valgrind_profiler,
                use_perftools_profiler,
                use_jemalloc_mem_profiler,
            ]
        )
        > 1
    ):
        raise ValueError(
            "At most one of the 'use_gdb', 'use_valgrind', "
            "'use_valgrind_profiler', 'use_perftools_profiler', "
            "and 'use_jemalloc_mem_profiler' flags can "
            "be used at a time."
        )
    if env_updates is None:
        env_updates = {}
    if not isinstance(env_updates, dict):
        raise ValueError("The 'env_updates' argument must be a dictionary.")

    modified_env = os.environ.copy()
    modified_env.update(env_updates)

    if use_gdb:
        if not use_tmux:
            raise ValueError(
                "If 'use_gdb' is true, then 'use_tmux' must be true as well."
            )

        # TODO(suquark): Any better temp file creation here?
        gdb_init_path = os.path.join(
            ray._common.utils.get_ray_temp_dir(),
            f"gdb_init_{process_type}_{time.time()}",
        )
        ray_process_path = command[0]
        ray_process_args = command[1:]
        run_args = " ".join(["'{}'".format(arg) for arg in ray_process_args])
        with open(gdb_init_path, "w") as gdb_init_file:
            gdb_init_file.write(f"run {run_args}")
        command = ["gdb", ray_process_path, "-x", gdb_init_path]

    if use_valgrind:
        command = [
            "valgrind",
            "--track-origins=yes",
            "--leak-check=full",
            "--show-leak-kinds=all",
            "--leak-check-heuristics=stdstring",
            "--error-exitcode=1",
        ] + command

    if use_valgrind_profiler:
        command = ["valgrind", "--tool=callgrind"] + command

    if use_perftools_profiler:
        modified_env["LD_PRELOAD"] = os.environ["PERFTOOLS_PATH"]
        modified_env["CPUPROFILE"] = os.environ["PERFTOOLS_LOGFILE"]

    modified_env.update(jemalloc_env_vars)

    if use_tmux:
        # The command has to be created exactly as below to ensure that it
        # works on all versions of tmux. (Tested with tmux 1.8-5, travis'
        # version, and tmux 2.1)
        command = ["tmux", "new-session", "-d", f"{' '.join(command)}"]

    if fate_share:
        assert ray._private.utils.detect_fate_sharing_support(), (
            "kernel-level fate-sharing must only be specified if "
            "detect_fate_sharing_support() has returned True"
        )

    def preexec_fn():
        import signal

        signal.pthread_sigmask(signal.SIG_BLOCK, {signal.SIGINT})
        if fate_share and sys.platform.startswith("linux"):
            ray._private.utils.set_kill_on_parent_death_linux()

    win32_fate_sharing = fate_share and sys.platform == "win32"
    # With Windows fate-sharing, we need special care:
    # The process must be added to the job before it is allowed to execute.
    # Otherwise, there's a race condition: the process might spawn children
    # before the process itself is assigned to the job.
    # After that point, its children will not be added to the job anymore.
    CREATE_SUSPENDED = 0x00000004  # from Windows headers
    if sys.platform == "win32":
        # CreateProcess, which underlies Popen, is limited to
        # 32,767 characters, including the Unicode terminating null
        # character
        total_chrs = sum([len(x) for x in command])
        if total_chrs > 31766:
            raise ValueError(
                f"command is limited to a total of 31767 characters, "
                f"got {total_chrs}"
            )

    process = ConsolePopen(
        command,
        env=modified_env,
        cwd=cwd,
        stdout=stdout_file,
        stderr=stderr_file,
        stdin=subprocess.PIPE if pipe_stdin else None,
        preexec_fn=preexec_fn if sys.platform != "win32" else None,
        creationflags=CREATE_SUSPENDED if win32_fate_sharing else 0,
    )

    if win32_fate_sharing:
        try:
            ray._private.utils.set_kill_child_on_death_win32(process)
            psutil.Process(process.pid).resume()
        except (psutil.Error, OSError):
            process.kill()
            raise

    def _get_stream_name(stream):
        if stream is not None:
            try:
                return stream.name
            except AttributeError:
                return str(stream)
        return None

    return ProcessInfo(
        process=process,
        stdout_file=_get_stream_name(stdout_file),
        stderr_file=_get_stream_name(stderr_file),
        use_valgrind=use_valgrind,
        use_gdb=use_gdb,
        use_valgrind_profiler=use_valgrind_profiler,
        use_perftools_profiler=use_perftools_profiler,
        use_tmux=use_tmux,
    )


def start_reaper(fate_share=None):
    """Start the reaper process.

    This is a lightweight process that simply
    waits for its parent process to die and then terminates its own
    process group. This allows us to ensure that ray processes are always
    terminated properly so long as that process itself isn't SIGKILLed.

    Returns:
        ProcessInfo for the process that was started.
    """
    # Make ourselves a process group leader so that the reaper can clean
    # up other ray processes without killing the process group of the
    # process that started us.
    try:
        if sys.platform != "win32":
            os.setpgrp()
    except OSError as e:
        errcode = e.errno
        if errcode == errno.EPERM and os.getpgrp() == os.getpid():
            # Nothing to do; we're already a session leader.
            pass
        else:
            logger.warning(
                f"setpgrp failed, processes may not be cleaned up properly: {e}."
            )
            # Don't start the reaper in this case as it could result in killing
            # other user processes.
            return None

    reaper_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "ray_process_reaper.py")
    command = [sys.executable, "-u", reaper_filepath]
    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_REAPER,
        pipe_stdin=True,
        fate_share=fate_share,
    )
    return process_info


def start_log_monitor(
    session_dir: str,
    logs_dir: str,
    gcs_address: str,
    fate_share: Optional[bool] = None,
    max_bytes: int = 0,
    backup_count: int = 0,
    stdout_filepath: Optional[str] = None,
    stderr_filepath: Optional[str] = None,
):
    """Start a log monitor process.

    Args:
        session_dir: The session directory.
        logs_dir: The directory of logging files.
        gcs_address: GCS address for pubsub.
        fate_share: Whether to share fate between log_monitor
            and this process.
        max_bytes: Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count: Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        redirect_logging: Whether we should redirect logging to
            the provided log directory.
        stdout_filepath: The file path to dump log monitor stdout.
            If None, stdout is not redirected.
        stderr_filepath: The file path to dump log monitor stderr.
            If None, stderr is not redirected.

    Returns:
        ProcessInfo for the process that was started.
    """
    log_monitor_filepath = os.path.join(RAY_PATH, RAY_PRIVATE_DIR, "log_monitor.py")

    command = [
        sys.executable,
        "-u",
        log_monitor_filepath,
        f"--session-dir={session_dir}",
        f"--logs-dir={logs_dir}",
        f"--gcs-address={gcs_address}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}",
    ]

    if stdout_filepath:
        command.append(f"--stdout-filepath={stdout_filepath}")
    if stderr_filepath:
        command.append(f"--stderr-filepath={stderr_filepath}")

    if stdout_filepath is None and stderr_filepath is None:
        # If not redirecting logging to files, unset log filename.
        # This will cause log records to go to stderr.
        command.append("--logging-filename=")
        # Use stderr log format with the component name as a message prefix.
        logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
            component=ray_constants.PROCESS_TYPE_LOG_MONITOR
        )
        command.append(f"--logging-format={logging_format}")

    stdout_file = None
    if stdout_filepath:
        stdout_file = open(os.devnull, "w")

    stderr_file = None
    if stderr_filepath:
        stderr_file = open(os.devnull, "w")

    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_LOG_MONITOR,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
    )
    return process_info


def start_api_server(
    include_dashboard: Optional[bool],
    raise_on_failure: bool,
    host: str,
    gcs_address: str,
    cluster_id_hex: str,
    node_ip_address: str,
    temp_dir: str,
    logdir: str,
    session_dir: str,
    port: Optional[int] = None,
    fate_share: Optional[bool] = None,
    max_bytes: int = 0,
    backup_count: int = 0,
    stdout_filepath: Optional[str] = None,
    stderr_filepath: Optional[str] = None,
):
    """Start a API server process.

    Args:
        include_dashboard: If true, this will load all dashboard-related modules
            when starting the API server, or fail. If None, it will load all
            dashboard-related modules conditioned on dependencies being present.
            Otherwise, it will only start the modules that are not relevant to
            the dashboard.
        raise_on_failure: If true, this will raise an exception
            if we fail to start the API server. Otherwise it will print
            a warning if we fail to start the API server.
        host: The host to bind the dashboard web server to.
        gcs_address: The gcs address the dashboard should connect to
        cluster_id_hex: Cluster ID in hex.
        node_ip_address: The IP address where this is running.
        temp_dir: The temporary directory used for log files and
            information for this Ray session.
        session_dir: The session directory under temp_dir.
            It is used as a identifier of individual cluster.
        logdir: The log directory used to generate dashboard log.
        port: The port to bind the dashboard web server to.
            Defaults to 8265.
        max_bytes: Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count: Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        stdout_filepath: The file path to dump dashboard stdout.
            If None, stdout is not redirected.
        stderr_filepath: The file path to dump dashboard stderr.
            If None, stderr is not redirected.

    Returns:
        A tuple of :
            - Dashboard URL if dashboard enabled and started.
            - ProcessInfo for the process that was started.
    """
    try:
        # Make sure port is available.
        if port is None:
            port_retries = 50
            port = ray_constants.DEFAULT_DASHBOARD_PORT
        else:
            port_retries = 0
            port_test_socket = socket.socket(
                socket.AF_INET6 if is_ipv6(host) else socket.AF_INET,
                socket.SOCK_STREAM,
            )
            port_test_socket.setsockopt(
                socket.SOL_SOCKET,
                socket.SO_REUSEADDR,
                1,
            )
            try:
                port_test_socket.bind((host, port))
                port_test_socket.close()
            except socket.error as e:
                # 10013 on windows is a bit more broad than just
                # "address in use": it can also indicate "permission denied".
                # TODO: improve the error message?
                if e.errno in {48, 98, 10013}:  # address already in use.
                    raise ValueError(
                        f"Failed to bind to {host}:{port} because it's "
                        "already occupied. You can use `ray start "
                        "--dashboard-port ...` or `ray.init(dashboard_port=..."
                        ")` to select a different port."
                    )
                else:
                    raise e
        # Make sure the process can start.
        dashboard_dependency_error = ray._private.utils.get_dashboard_dependency_error()

        # Explicitly check here that when the user explicitly specifies
        # dashboard inclusion, the install is not minimal.
        if include_dashboard and dashboard_dependency_error:
            logger.error(
                f"Ray dashboard dependencies failed to install properly: {dashboard_dependency_error}.\n"
                "Potential causes include:\n"
                "1. --include-dashboard is not supported when minimal ray is used. "
                "Download ray[default] to use the dashboard.\n"
                "2. Dashboard dependencies are conflicting with your python environment. "
                "Investigate your python environment and try reinstalling ray[default].\n"
            )
            raise Exception("Cannot include dashboard with missing packages.")

        include_dash: bool = True if include_dashboard is None else include_dashboard

        # Start the dashboard process.
        dashboard_dir = "dashboard"
        dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py")

        command = [
            *_build_python_executable_command_memory_profileable(
                ray_constants.PROCESS_TYPE_DASHBOARD,
                session_dir,
                unbuffered=False,
            ),
            dashboard_filepath,
            f"--host={host}",
            f"--port={port}",
            f"--port-retries={port_retries}",
            f"--temp-dir={temp_dir}",
            f"--log-dir={logdir}",
            f"--session-dir={session_dir}",
            f"--logging-rotate-bytes={max_bytes}",
            f"--logging-rotate-backup-count={backup_count}",
            f"--gcs-address={gcs_address}",
            f"--cluster-id-hex={cluster_id_hex}",
            f"--node-ip-address={node_ip_address}",
        ]

        if stdout_filepath:
            command.append(f"--stdout-filepath={stdout_filepath}")
        if stderr_filepath:
            command.append(f"--stderr-filepath={stderr_filepath}")

        if stdout_filepath is None and stderr_filepath is None:
            # If not redirecting logging to files, unset log filename.
            # This will cause log records to go to stderr.
            command.append("--logging-filename=")
            # Use stderr log format with the component name as a message prefix.
            logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
                component=ray_constants.PROCESS_TYPE_DASHBOARD
            )
            command.append(f"--logging-format={logging_format}")
        if dashboard_dependency_error is not None:
            command.append("--minimal")

        if not include_dash:
            # If dashboard is not included, load modules
            # that are irrelevant to the dashboard.
            # TODO(sang): Modules like job or state APIs should be
            # loaded although dashboard is disabled. Fix it.
            command.append("--modules-to-load=UsageStatsHead")
            command.append("--disable-frontend")

        stdout_file = None
        if stdout_filepath:
            stdout_file = open(os.devnull, "w")

        stderr_file = None
        if stderr_filepath:
            stderr_file = open(os.devnull, "w")

        process_info = start_ray_process(
            command,
            ray_constants.PROCESS_TYPE_DASHBOARD,
            stdout_file=stdout_file,
            stderr_file=stderr_file,
            fate_share=fate_share,
        )

        # Retrieve the dashboard url
        gcs_client = GcsClient(address=gcs_address, cluster_id=cluster_id_hex)
        ray.experimental.internal_kv._initialize_internal_kv(gcs_client)
        dashboard_url = None
        dashboard_returncode = None
        start_time_s = time.time()
        while (
            time.time() - start_time_s < ray_constants.RAY_DASHBOARD_STARTUP_TIMEOUT_S
        ):
            dashboard_url = ray.experimental.internal_kv._internal_kv_get(
                ray_constants.DASHBOARD_ADDRESS,
                namespace=ray_constants.KV_NAMESPACE_DASHBOARD,
            )
            if dashboard_url is not None:
                dashboard_url = dashboard_url.decode("utf-8")
                break
            dashboard_returncode = process_info.process.poll()
            if dashboard_returncode is not None:
                break

            # This is often on the critical path of ray.init() and ray start,
            # so we need to poll often.
            time.sleep(0.1)

        # Dashboard couldn't be started.
        if dashboard_url is None:
            returncode_str = (
                f", return code {dashboard_returncode}"
                if dashboard_returncode is not None
                else ""
            )
            logger.error(f"Failed to start the dashboard {returncode_str}")

            def read_log(filename, lines_to_read):
                """Read a log file and return the last 20 lines."""
                dashboard_log = os.path.join(logdir, filename)
                # Read last n lines of dashboard log. The log file may be large.
                lines_to_read = 20
                lines = []
                with open(dashboard_log, "rb") as f:
                    with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mm:
                        end = mm.size()
                        for _ in range(lines_to_read):
                            sep = mm.rfind(b"\n", 0, end - 1)
                            if sep == -1:
                                break
                            lines.append(mm[sep + 1 : end].decode("utf-8"))
                            end = sep
                lines.append(
                    f"The last {lines_to_read} lines of {dashboard_log} "
                    "(it contains the error message from the dashboard): "
                )
                return lines

            if logdir:
                lines_to_read = 20
                logger.error(
                    "Error should be written to 'dashboard.log' or "
                    "'dashboard.err'. We are printing the last "
                    f"{lines_to_read} lines for you. See "
                    "'https://docs.ray.io/en/master/ray-observability/user-guides/configure-logging.html#logging-directory-structure' "  # noqa
                    "to find where the log file is."
                )
                try:
                    lines = read_log("dashboard.log", lines_to_read=lines_to_read)
                except Exception as e:
                    logger.error(
                        f"Couldn't read dashboard.log file. Error: {e}. "
                        "It means the dashboard is broken even before it "
                        "initializes the logger (mostly dependency issues). "
                        "Reading the dashboard.err file which contains stdout/stderr."
                    )
                    # If we cannot read the .log file, we fallback to .err file.
                    # This is the case where dashboard couldn't be started at all
                    # and couldn't even initialize the logger to write logs to .log
                    # file.
                    try:
                        lines = read_log("dashboard.err", lines_to_read=lines_to_read)
                    except Exception as e:
                        raise Exception(
                            f"Failed to read dashboard.err file: {e}. "
                            "It is unexpected. Please report an issue to "
                            "Ray github. "
                            "https://github.com/ray-project/ray/issues"
                        )
                last_log_str = "\n" + "\n".join(reversed(lines[-lines_to_read:]))
                raise Exception(last_log_str)
            else:
                # Is it reachable?
                raise Exception("Failed to start a dashboard.")

        if dashboard_dependency_error is not None or not include_dash:
            # If it is the minimal installation, the web url (dashboard url)
            # shouldn't be configured because it doesn't start a server.
            dashboard_url = ""
        return dashboard_url, process_info
    except Exception as e:
        if raise_on_failure:
            raise e from e
        else:
            logger.error(e)
            return None, None


def get_address(redis_address):
    parts = redis_address.split("://", 1)
    enable_redis_ssl = False
    if len(parts) == 1:
        redis_ip_address, redis_port = parse_address(parts[0])
    else:
        # rediss for SSL
        if len(parts) != 2 or parts[0] not in ("redis", "rediss"):
            raise ValueError(
                f"Invalid redis address {redis_address}."
                "Expected format is ip:port or redis://ip:port, "
                "or rediss://ip:port for SSL."
            )
        redis_ip_address, redis_port = parse_address(parts[1])
        if parts[0] == "rediss":
            enable_redis_ssl = True
    return redis_ip_address, redis_port, enable_redis_ssl


def start_gcs_server(
    redis_address: str,
    log_dir: str,
    stdout_filepath: Optional[str],
    stderr_filepath: Optional[str],
    session_name: str,
    redis_username: Optional[str] = None,
    redis_password: Optional[str] = None,
    config: Optional[dict] = None,
    fate_share: Optional[bool] = None,
    gcs_server_port: Optional[int] = None,
    metrics_agent_port: Optional[int] = None,
    node_ip_address: Optional[str] = None,
):
    """Start a gcs server.

    Args:
        redis_address: The address that the Redis server is listening on.
        log_dir: The path of the dir where gcs log files are created.
        stdout_filepath: The file path to dump gcs server stdout.
            If None, stdout is not redirected.
        stderr_filepath: The file path to dump gcs server stderr.
            If None, stderr is not redirected.
        session_name: The current Ray session name.
        redis_username: The username of the Redis server.
        redis_password: The password of the Redis server.
        config: Optional configuration that will
            override defaults in RayConfig.
        gcs_server_port: Port number of the gcs server.
        metrics_agent_port: The port where metrics agent is bound to.
        node_ip_address: IP Address of a node where gcs server starts.

    Returns:
        ProcessInfo for the process that was started.
    """
    assert gcs_server_port > 0

    command = [
        GCS_SERVER_EXECUTABLE,
        f"--log_dir={log_dir}",
        f"--config_list={serialize_config(config)}",
        f"--gcs_server_port={gcs_server_port}",
        f"--metrics-agent-port={metrics_agent_port}",
        f"--node-ip-address={node_ip_address}",
        f"--session-name={session_name}",
        f"--ray-commit={ray.__commit__}",
    ]

    if stdout_filepath:
        command += [f"--stdout_filepath={stdout_filepath}"]
    if stderr_filepath:
        command += [f"--stderr_filepath={stderr_filepath}"]

    if redis_address:
        redis_ip_address, redis_port, enable_redis_ssl = get_address(redis_address)

        command += [
            f"--redis_address={redis_ip_address}",
            f"--redis_port={redis_port}",
            f"--redis_enable_ssl={'true' if enable_redis_ssl else 'false'}",
        ]
    if redis_username:
        command += [f"--redis_username={redis_username}"]
    if redis_password:
        command += [f"--redis_password={redis_password}"]

    stdout_file = None
    if stdout_filepath:
        stdout_file = open(os.devnull, "w")

    stderr_file = None
    if stderr_filepath:
        stderr_file = open(os.devnull, "w")

    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_GCS_SERVER,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
    )
    return process_info


def start_raylet(
    redis_address: str,
    gcs_address: str,
    node_id: str,
    node_ip_address: str,
    node_manager_port: int,
    raylet_name: str,
    plasma_store_name: str,
    cluster_id: str,
    worker_path: str,
    setup_worker_path: str,
    temp_dir: str,
    session_dir: str,
    resource_dir: str,
    log_dir: str,
    resource_and_label_spec,
    plasma_directory: str,
    fallback_directory: str,
    object_store_memory: int,
    session_name: str,
    is_head_node: bool,
    resource_isolation_config: ResourceIsolationConfig,
    min_worker_port: Optional[int] = None,
    max_worker_port: Optional[int] = None,
    worker_port_list: Optional[List[int]] = None,
    object_manager_port: Optional[int] = None,
    redis_username: Optional[str] = None,
    redis_password: Optional[str] = None,
    metrics_agent_port: Optional[int] = None,
    metrics_export_port: Optional[int] = None,
    dashboard_agent_listen_port: Optional[int] = None,
    runtime_env_agent_port: Optional[int] = None,
    use_valgrind: bool = False,
    use_profiler: bool = False,
    raylet_stdout_filepath: Optional[str] = None,
    raylet_stderr_filepath: Optional[str] = None,
    dashboard_agent_stdout_filepath: Optional[str] = None,
    dashboard_agent_stderr_filepath: Optional[str] = None,
    runtime_env_agent_stdout_filepath: Optional[str] = None,
    runtime_env_agent_stderr_filepath: Optional[str] = None,
    huge_pages: bool = False,
    fate_share: Optional[bool] = None,
    socket_to_use: Optional[int] = None,
    max_bytes: int = 0,
    backup_count: int = 0,
    ray_debugger_external: bool = False,
    env_updates: Optional[dict] = None,
    node_name: Optional[str] = None,
    webui: Optional[str] = None,
):
    """Start a raylet, which is a combined local scheduler and object manager.

    Args:
        redis_address: The address of the primary Redis server.
        gcs_address: The address of GCS server.
        node_id: The hex ID of this node.
        node_ip_address: The IP address of this node.
        node_manager_port: The port to use for the node manager. If it's
            0, a random port will be used.
        raylet_name: The name of the raylet socket to create.
        plasma_store_name: The name of the plasma store socket to connect
             to.
        worker_path: The path of the Python file that new worker
            processes will execute.
        setup_worker_path: The path of the Python file that will set up
            the environment for the worker process.
        temp_dir: The path of the temporary directory Ray will use.
        session_dir: The path of this session.
        resource_dir: The path of resource of this session .
        log_dir: The path of the dir where log files are created.
        resource_and_label_spec: Resources and key-value labels for this raylet.
        plasma_directory: A directory where the Plasma memory mapped files will
            be created.
        fallback_directory: A directory where the Object store fallback files will be created.
        object_store_memory: The amount of memory (in bytes) to start the
            object store with.
        session_name: The current Ray session name.
        resource_isolation_config: Resource isolation configuration for reserving
            memory and cpu resources for ray system processes through cgroupv2
        is_head_node: whether this node is the head node.
        min_worker_port: The lowest port number that workers will bind
            on. If not set, random ports will be chosen.
        max_worker_port: The highest port number that workers will bind
            on. If set, min_worker_port must also be set.
        worker_port_list: An explicit list of ports to be used for
            workers (comma-separated). Overrides min_worker_port and
            max_worker_port.
        object_manager_port: The port to use for the object manager. If this is
            None, then the object manager will choose its own port.
        redis_username: The username to use when connecting to Redis.
        redis_password: The password to use when connecting to Redis.
        metrics_agent_port: The port where metrics agent is bound to.
        metrics_export_port: The port at which metrics are exposed to.
        dashboard_agent_listen_port: The port at which the dashboard agent
            listens to for HTTP.
        runtime_env_agent_port: The port at which the runtime env agent
            listens to for HTTP.
        use_valgrind: True if the raylet should be started inside
            of valgrind. If this is True, use_profiler must be False.
        use_profiler: True if the raylet should be started inside
            a profiler. If this is True, use_valgrind must be False.
        raylet_stdout_filepath: The file path to dump raylet stdout.
            If None, stdout is not redirected.
        raylet_stderr_filepath: The file path to dump raylet stderr.
            If None, stderr is not redirected.
        dashboard_agent_stdout_filepath: The file path to dump
            dashboard agent stdout. If None, stdout is not redirected.
        dashboard_agent_stderr_filepath: The file path to dump
            dashboard agent stderr. If None, stderr is not redirected.
        runtime_env_agent_stdout_filepath: The file path to dump
            runtime env agent stdout. If None, stdout is not redirected.
        runtime_env_agent_stderr_filepath: The file path to dump
            runtime env agent stderr. If None, stderr is not redirected.
        huge_pages: Boolean flag indicating whether to start the Object
            Store with hugetlbfs support. Requires plasma_directory.
        fate_share: Whether to share fate between raylet and this process.
        max_bytes: Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count: Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        ray_debugger_external: True if the Ray debugger should be made
            available externally to this node.
        env_updates: Environment variable overrides.
        node_name: The name of the node.
        webui: The url of the UI.
    Returns:
        ProcessInfo for the process that was started.
    """
    assert node_manager_port is not None and type(node_manager_port) is int

    if use_valgrind and use_profiler:
        raise ValueError("Cannot use valgrind and profiler at the same time.")

    # Get the static resources and labels from the resolved ResourceAndLabelSpec
    static_resources = resource_and_label_spec.to_resource_dict()
    labels = resource_and_label_spec.labels

    # Limit the number of workers that can be started in parallel by the
    # raylet. However, make sure it is at least 1.
    num_cpus_static = static_resources.get("CPU", 0)
    maximum_startup_concurrency = max(
        1, min(multiprocessing.cpu_count(), num_cpus_static)
    )

    # Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
    resource_argument = ",".join(
        ["{},{}".format(*kv) for kv in static_resources.items()]
    )

    has_java_command = False
    if shutil.which("java") is not None:
        has_java_command = True

    ray_java_installed = False
    try:
        jars_dir = get_ray_jars_dir()
        if os.path.exists(jars_dir):
            ray_java_installed = True
    except Exception:
        pass

    include_java = has_java_command and ray_java_installed
    if include_java is True:
        java_worker_command = build_java_worker_command(
            gcs_address,
            plasma_store_name,
            raylet_name,
            redis_username,
            redis_password,
            session_dir,
            node_ip_address,
            setup_worker_path,
        )
    else:
        java_worker_command = []

    if os.path.exists(DEFAULT_WORKER_EXECUTABLE):
        cpp_worker_command = build_cpp_worker_command(
            gcs_address,
            plasma_store_name,
            raylet_name,
            redis_username,
            redis_password,
            session_dir,
            log_dir,
            node_ip_address,
            setup_worker_path,
        )
    else:
        cpp_worker_command = []

    # Create the command that the Raylet will use to start workers.
    # TODO(architkulkarni): Pipe in setup worker args separately instead of
    # inserting them into start_worker_command and later erasing them if
    # needed.
    start_worker_command = (
        [
            sys.executable,
            setup_worker_path,
        ]
        + _site_flags()  # Inherit "-S" and "-s" flags from current Python interpreter.
        + [
            worker_path,
            f"--node-ip-address={node_ip_address}",
            "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
            f"--object-store-name={plasma_store_name}",
            f"--raylet-name={raylet_name}",
            f"--redis-address={redis_address}",
            f"--metrics-agent-port={metrics_agent_port}",
            f"--logging-rotate-bytes={max_bytes}",
            f"--logging-rotate-backup-count={backup_count}",
            f"--runtime-env-agent-port={runtime_env_agent_port}",
            f"--gcs-address={gcs_address}",
            f"--session-name={session_name}",
            f"--temp-dir={temp_dir}",
            f"--webui={webui}",
            f"--cluster-id={cluster_id}",
        ]
    )

    start_worker_command.append("RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER")

    if redis_username:
        start_worker_command += [f"--redis-username={redis_username}"]

    if redis_password:
        start_worker_command += [f"--redis-password={redis_password}"]

    # If the object manager port is None, then use 0 to cause the object
    # manager to choose its own port.
    if object_manager_port is None:
        object_manager_port = 0

    if min_worker_port is None:
        min_worker_port = 0

    if max_worker_port is None:
        max_worker_port = 0

    labels_json_str = ""
    if labels:
        labels_json_str = json.dumps(labels)

    dashboard_agent_command = [
        *_build_python_executable_command_memory_profileable(
            ray_constants.PROCESS_TYPE_DASHBOARD_AGENT, session_dir
        ),
        os.path.join(RAY_PATH, "dashboard", "agent.py"),
        f"--node-ip-address={node_ip_address}",
        f"--metrics-export-port={metrics_export_port}",
        f"--grpc-port={metrics_agent_port}",
        f"--listen-port={dashboard_agent_listen_port}",
        "--node-manager-port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
        f"--object-store-name={plasma_store_name}",
        f"--raylet-name={raylet_name}",
        f"--temp-dir={temp_dir}",
        f"--session-dir={session_dir}",
        f"--log-dir={log_dir}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}",
        f"--session-name={session_name}",
        f"--gcs-address={gcs_address}",
        f"--cluster-id-hex={cluster_id}",
    ]
    if dashboard_agent_stdout_filepath:
        dashboard_agent_command.append(
            f"--stdout-filepath={dashboard_agent_stdout_filepath}"
        )
    if dashboard_agent_stderr_filepath:
        dashboard_agent_command.append(
            f"--stderr-filepath={dashboard_agent_stderr_filepath}"
        )
    if (
        dashboard_agent_stdout_filepath is None
        and dashboard_agent_stderr_filepath is None
    ):
        # If not redirecting logging to files, unset log filename.
        # This will cause log records to go to stderr.
        dashboard_agent_command.append("--logging-filename=")
        # Use stderr log format with the component name as a message prefix.
        logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
            component=ray_constants.PROCESS_TYPE_DASHBOARD_AGENT
        )
        dashboard_agent_command.append(f"--logging-format={logging_format}")

    if ray._private.utils.get_dashboard_dependency_error() is not None:
        # If dependencies are not installed, it is the minimally packaged
        # ray. We should restrict the features within dashboard agent
        # that requires additional dependencies to be downloaded.
        dashboard_agent_command.append("--minimal")

    runtime_env_agent_command = [
        *_build_python_executable_command_memory_profileable(
            ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT, session_dir
        ),
        os.path.join(RAY_PATH, "_private", "runtime_env", "agent", "main.py"),
        f"--node-ip-address={node_ip_address}",
        f"--runtime-env-agent-port={runtime_env_agent_port}",
        f"--gcs-address={gcs_address}",
        f"--cluster-id-hex={cluster_id}",
        f"--runtime-env-dir={resource_dir}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}",
        f"--log-dir={log_dir}",
        f"--temp-dir={temp_dir}",
    ]
    if runtime_env_agent_stdout_filepath:
        runtime_env_agent_command.append(
            f"--stdout-filepath={runtime_env_agent_stdout_filepath}"
        )
    if runtime_env_agent_stderr_filepath:
        runtime_env_agent_command.append(
            f"--stderr-filepath={runtime_env_agent_stderr_filepath}"
        )
    if (
        runtime_env_agent_stdout_filepath is None
        and runtime_env_agent_stderr_filepath is None
    ):
        # If not redirecting logging to files, unset log filename.
        # This will cause log records to go to stderr.
        runtime_env_agent_command.append("--logging-filename=")
        # Use stderr log format with the component name as a message prefix.
        logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
            component=ray_constants.PROCESS_TYPE_RUNTIME_ENV_AGENT
        )
        runtime_env_agent_command.append(f"--logging-format={logging_format}")

    command = [
        RAYLET_EXECUTABLE,
        f"--raylet_socket_name={raylet_name}",
        f"--store_socket_name={plasma_store_name}",
        f"--object_manager_port={object_manager_port}",
        f"--min_worker_port={min_worker_port}",
        f"--max_worker_port={max_worker_port}",
        f"--node_manager_port={node_manager_port}",
        f"--node_id={node_id}",
        f"--node_ip_address={node_ip_address}",
        f"--maximum_startup_concurrency={maximum_startup_concurrency}",
        f"--static_resource_list={resource_argument}",
        f"--python_worker_command={subprocess.list2cmdline(start_worker_command)}",  # noqa
        f"--java_worker_command={subprocess.list2cmdline(java_worker_command)}",  # noqa
        f"--cpp_worker_command={subprocess.list2cmdline(cpp_worker_command)}",  # noqa
        f"--native_library_path={DEFAULT_NATIVE_LIBRARY_PATH}",
        f"--temp_dir={temp_dir}",
        f"--session_dir={session_dir}",
        f"--log_dir={log_dir}",
        f"--resource_dir={resource_dir}",
        f"--metrics-agent-port={metrics_agent_port}",
        f"--metrics_export_port={metrics_export_port}",
        f"--runtime_env_agent_port={runtime_env_agent_port}",
        f"--object_store_memory={object_store_memory}",
        f"--plasma_directory={plasma_directory}",
        f"--fallback_directory={fallback_directory}",
        f"--ray-debugger-external={1 if ray_debugger_external else 0}",
        f"--gcs-address={gcs_address}",
        f"--session-name={session_name}",
        f"--labels={labels_json_str}",
        f"--cluster-id={cluster_id}",
    ]

    if resource_isolation_config.is_enabled():
        logging.info(
            f"Resource isolation enabled with cgroup_path={resource_isolation_config.cgroup_path}, "
            f"system_reserved_cpu={resource_isolation_config.system_reserved_cpu_weight} "
            f"system_reserved_memory={resource_isolation_config.system_reserved_memory}."
        )
        command.append("--enable-resource-isolation")
        command.append(f"--cgroup-path={resource_isolation_config.cgroup_path}")
        command.append(
            f"--system-reserved-cpu-weight={resource_isolation_config.system_reserved_cpu_weight}"
        )
        command.append(
            f"--system-reserved-memory-bytes={resource_isolation_config.system_reserved_memory}"
        )
        command.append(f"--system-pids={resource_isolation_config.system_pids}")

    if raylet_stdout_filepath:
        command.append(f"--stdout_filepath={raylet_stdout_filepath}")
    if raylet_stderr_filepath:
        command.append(f"--stderr_filepath={raylet_stderr_filepath}")

    if is_head_node:
        command.append("--head")

    if worker_port_list is not None:
        command.append(f"--worker_port_list={worker_port_list}")
    command.append(
        "--num_prestart_python_workers={}".format(int(resource_and_label_spec.num_cpus))
    )
    command.append(
        "--dashboard_agent_command={}".format(
            subprocess.list2cmdline(dashboard_agent_command)
        )
    )
    command.append(
        "--runtime_env_agent_command={}".format(
            subprocess.list2cmdline(runtime_env_agent_command)
        )
    )
    if huge_pages:
        command.append("--huge_pages")
    if socket_to_use:
        socket_to_use.close()
    if node_name is not None:
        command.append(
            f"--node-name={node_name}",
        )

    stdout_file = None
    if raylet_stdout_filepath:
        stdout_file = open(os.devnull, "w")

    stderr_file = None
    if raylet_stderr_filepath:
        stderr_file = open(os.devnull, "w")

    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_RAYLET,
        use_valgrind=use_valgrind,
        use_gdb=False,
        use_valgrind_profiler=use_profiler,
        use_perftools_profiler=("RAYLET_PERFTOOLS_PATH" in os.environ),
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
        env_updates=env_updates,
    )
    return process_info


def get_ray_jars_dir():
    """Return a directory where all ray-related jars and
    their dependencies locate."""
    current_dir = RAY_PATH
    jars_dir = os.path.abspath(os.path.join(current_dir, "jars"))
    if not os.path.exists(jars_dir):
        raise RuntimeError(
            "Ray jars is not packaged into ray. "
            "Please build ray with java enabled "
            "(set env var RAY_INSTALL_JAVA=1)"
        )
    return os.path.abspath(os.path.join(current_dir, "jars"))


def build_java_worker_command(
    bootstrap_address: str,
    plasma_store_name: str,
    raylet_name: str,
    redis_username: str,
    redis_password: str,
    session_dir: str,
    node_ip_address: str,
    setup_worker_path: str,
):
    """This method assembles the command used to start a Java worker.

    Args:
        bootstrap_address: Bootstrap address of ray cluster.
        plasma_store_name: The name of the plasma store socket to connect
           to.
        raylet_name: The name of the raylet socket to create.
        redis_username: The username to connect to Redis.
        redis_password: The password to connect to Redis.
        session_dir: The path of this session.
        node_ip_address: The IP address for this node.
        setup_worker_path: The path of the Python file that will set up
            the environment for the worker process.
    Returns:
        The command string for starting Java worker.
    """
    pairs = []
    if bootstrap_address is not None:
        pairs.append(("ray.address", bootstrap_address))
    pairs.append(("ray.raylet.node-manager-port", "RAY_NODE_MANAGER_PORT_PLACEHOLDER"))

    if plasma_store_name is not None:
        pairs.append(("ray.object-store.socket-name", plasma_store_name))

    if raylet_name is not None:
        pairs.append(("ray.raylet.socket-name", raylet_name))

    if redis_username is not None:
        pairs.append(("ray.redis.username", redis_username))

    if redis_password is not None:
        pairs.append(("ray.redis.password", redis_password))

    if node_ip_address is not None:
        pairs.append(("ray.node-ip", node_ip_address))

    pairs.append(("ray.home", RAY_HOME))
    pairs.append(("ray.logging.dir", os.path.join(session_dir, "logs")))
    pairs.append(("ray.session-dir", session_dir))
    command = (
        [sys.executable]
        + [setup_worker_path]
        + ["-D{}={}".format(*pair) for pair in pairs]
    )

    command += ["RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER"]
    command += ["io.ray.runtime.runner.worker.DefaultWorker"]

    return command


def build_cpp_worker_command(
    bootstrap_address: str,
    plasma_store_name: str,
    raylet_name: str,
    redis_username: str,
    redis_password: str,
    session_dir: str,
    log_dir: str,
    node_ip_address: str,
    setup_worker_path: str,
):
    """This method assembles the command used to start a CPP worker.

    Args:
        bootstrap_address: The bootstrap address of the cluster.
        plasma_store_name: The name of the plasma store socket to connect
           to.
        raylet_name: The name of the raylet socket to create.
        redis_username: The username to connect to Redis.
        redis_password: The password to connect to Redis.
        session_dir: The path of this session.
        log_dir: The path of logs.
        node_ip_address: The ip address for this node.
        setup_worker_path: The path of the Python file that will set up
            the environment for the worker process.
    Returns:
        The command string for starting CPP worker.
    """

    command = [
        sys.executable,
        setup_worker_path,
        DEFAULT_WORKER_EXECUTABLE,
        f"--ray_plasma_store_socket_name={plasma_store_name}",
        f"--ray_raylet_socket_name={raylet_name}",
        "--ray_node_manager_port=RAY_NODE_MANAGER_PORT_PLACEHOLDER",
        f"--ray_address={bootstrap_address}",
        f"--ray_redis_username={redis_username}",
        f"--ray_redis_password={redis_password}",
        f"--ray_session_dir={session_dir}",
        f"--ray_logs_dir={log_dir}",
        f"--ray_node_ip_address={node_ip_address}",
        "RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER",
    ]

    return command


def determine_plasma_store_config(
    object_store_memory: int,
    temp_dir: str,
    plasma_directory: Optional[str] = None,
    fallback_directory: Optional[str] = None,
    huge_pages: bool = False,
):
    """Figure out how to configure the plasma object store.

    This will determine:
    1. which directory to use for the plasma store. On Linux,
    we will try to use /dev/shm unless the shared memory file system is too
    small, in which case we will fall back to /tmp. If any of the object store
    memory or plasma directory parameters are specified by the user, then those
    values will be preserved.
    2. which directory to use for the fallback files. It will default to the temp_dir
    if it is not extracted from the object_spilling_config.

    Args:
        object_store_memory: The object store memory to use.
        plasma_directory: The user-specified plasma directory parameter.
        fallback_directory: The path extracted from the object_spilling_config when the
                            object spilling config is set and the spilling type is to
                            filesystem.
        huge_pages: The user-specified huge pages parameter.

    Returns:
        A tuple of plasma directory to use, the fallback directory to use, and the
        object store memory to use. If it is specified by the user, then that value will
        be preserved.
    """
    if not isinstance(object_store_memory, int):
        object_store_memory = int(object_store_memory)

    if huge_pages and not (sys.platform == "linux" or sys.platform == "linux2"):
        raise ValueError("The huge_pages argument is only supported on Linux.")

    system_memory = ray._common.utils.get_system_memory()

    # Determine which directory to use. By default, use /tmp on MacOS and
    # /dev/shm on Linux, unless the shared-memory file system is too small,
    # in which case we default to /tmp on Linux.
    if plasma_directory is None:
        if sys.platform == "linux" or sys.platform == "linux2":
            shm_avail = ray._private.utils.get_shared_memory_bytes()
            # Compare the requested memory size to the memory available in
            # /dev/shm.
            if shm_avail >= object_store_memory:
                plasma_directory = "/dev/shm"
            elif (
                not os.environ.get("RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE")
                and object_store_memory > ray_constants.REQUIRE_SHM_SIZE_THRESHOLD
            ):
                raise ValueError(
                    "The configured object store size ({} GB) exceeds "
                    "/dev/shm size ({} GB). This will harm performance. "
                    "Consider deleting files in /dev/shm or increasing its "
                    "size with "
                    "--shm-size in Docker. To ignore this warning, "
                    "set RAY_OBJECT_STORE_ALLOW_SLOW_STORAGE=1.".format(
                        object_store_memory / 1e9, shm_avail / 1e9
                    )
                )
            else:
                plasma_directory = ray._common.utils.get_user_temp_dir()
                logger.warning(
                    "WARNING: The object store is using {} instead of "
                    "/dev/shm because /dev/shm has only {} bytes available. "
                    "This will harm performance! You may be able to free up "
                    "space by deleting files in /dev/shm. If you are inside a "
                    "Docker container, you can increase /dev/shm size by "
                    "passing '--shm-size={:.2f}gb' to 'docker run' (or add it "
                    "to the run_options list in a Ray cluster config). Make "
                    "sure to set this to more than 30% of available RAM.".format(
                        ray._common.utils.get_user_temp_dir(),
                        shm_avail,
                        object_store_memory * (1.1) / (2**30),
                    )
                )
        else:
            plasma_directory = ray._common.utils.get_user_temp_dir()

        # Do some sanity checks.
        if object_store_memory > system_memory:
            raise ValueError(
                "The requested object store memory size is greater "
                "than the total available memory."
            )
    else:
        plasma_directory = os.path.abspath(plasma_directory)
        logger.info("object_store_memory is not verified when plasma_directory is set.")

    if not os.path.isdir(plasma_directory):
        raise ValueError(
            f"The plasma directory file {plasma_directory} does not exist or is not a directory."
        )

    if huge_pages and plasma_directory is None:
        raise ValueError(
            "If huge_pages is True, then the "
            "plasma_directory argument must be provided."
        )

    if object_store_memory < ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES:
        raise ValueError(
            "Attempting to cap object store memory usage at {} "
            "bytes, but the minimum allowed is {} bytes.".format(
                object_store_memory, ray_constants.OBJECT_STORE_MINIMUM_MEMORY_BYTES
            )
        )

    if (
        sys.platform == "darwin"
        and object_store_memory > ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT
        and os.environ.get("RAY_ENABLE_MAC_LARGE_OBJECT_STORE") != "1"
    ):
        raise ValueError(
            "The configured object store size ({:.4}GiB) exceeds "
            "the optimal size on Mac ({:.4}GiB). "
            "This will harm performance! There is a known issue where "
            "Ray's performance degrades with object store size greater"
            " than {:.4}GB on a Mac."
            "To reduce the object store capacity, specify"
            "`object_store_memory` when calling ray.init() or ray start."
            "To ignore this warning, "
            "set RAY_ENABLE_MAC_LARGE_OBJECT_STORE=1.".format(
                object_store_memory / 2**30,
                ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
                ray_constants.MAC_DEGRADED_PERF_MMAP_SIZE_LIMIT / 2**30,
            )
        )

    if fallback_directory is None:
        fallback_directory = temp_dir
    else:
        fallback_directory = os.path.abspath(fallback_directory)

    if not os.path.isdir(fallback_directory):
        raise ValueError(
            f"The fallback directory file {fallback_directory} does not exist or is not a directory."
        )

    # Print the object store memory using two decimal places.
    logger.debug(
        "Determine to start the Plasma object store with {} GB memory "
        "using {} and fallback to {}".format(
            round(object_store_memory / 10**9, 2),
            plasma_directory,
            fallback_directory,
        )
    )

    return plasma_directory, fallback_directory, object_store_memory


def start_monitor(
    gcs_address: str,
    logs_dir: str,
    stdout_filepath: Optional[str] = None,
    stderr_filepath: Optional[str] = None,
    autoscaling_config: Optional[str] = None,
    fate_share: Optional[bool] = None,
    max_bytes: int = 0,
    backup_count: int = 0,
    monitor_ip: Optional[str] = None,
    autoscaler_v2: bool = False,
):
    """Run a process to monitor the other processes.

    Args:
        gcs_address: The address of GCS server.
        logs_dir: The path to the log directory.
        stdout_filepath: The file path to dump monitor stdout.
            If None, stdout is not redirected.
        stderr_filepath: The file path to dump monitor stderr.
            If None, stderr is not redirected.
        autoscaling_config: path to autoscaling config file.
        max_bytes: Log rotation parameter. Corresponding to
            RotatingFileHandler's maxBytes.
        backup_count: Log rotation parameter. Corresponding to
            RotatingFileHandler's backupCount.
        monitor_ip: IP address of the machine that the monitor will be
            run on. Can be excluded, but required for autoscaler metrics.
    Returns:
        ProcessInfo for the process that was started.
    """
    if autoscaler_v2:
        entrypoint = os.path.join(RAY_PATH, AUTOSCALER_V2_DIR, "monitor.py")
    else:
        entrypoint = os.path.join(RAY_PATH, AUTOSCALER_PRIVATE_DIR, "monitor.py")

    command = [
        sys.executable,
        "-u",
        entrypoint,
        f"--logs-dir={logs_dir}",
        f"--logging-rotate-bytes={max_bytes}",
        f"--logging-rotate-backup-count={backup_count}",
    ]
    assert gcs_address is not None
    command.append(f"--gcs-address={gcs_address}")

    if stdout_filepath:
        command.append(f"--stdout-filepath={stdout_filepath}")
    if stderr_filepath:
        command.append(f"--stderr-filepath={stderr_filepath}")

    if stdout_filepath is None and stderr_filepath is None:
        # If not redirecting logging to files, unset log filename.
        # This will cause log records to go to stderr.
        command.append("--logging-filename=")
        # Use stderr log format with the component name as a message prefix.
        logging_format = ray_constants.LOGGER_FORMAT_STDERR.format(
            component=ray_constants.PROCESS_TYPE_MONITOR
        )
        command.append(f"--logging-format={logging_format}")
    if autoscaling_config:
        command.append("--autoscaling-config=" + str(autoscaling_config))
    if monitor_ip:
        command.append("--monitor-ip=" + monitor_ip)

    stdout_file = None
    if stdout_filepath:
        stdout_file = open(os.devnull, "w")

    stderr_file = None
    if stderr_filepath:
        stderr_file = open(os.devnull, "w")

    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_MONITOR,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
    )
    return process_info


def start_ray_client_server(
    address: str,
    ray_client_server_ip: str,
    ray_client_server_port: int,
    stdout_file: Optional[int] = None,
    stderr_file: Optional[int] = None,
    redis_username: Optional[str] = None,
    redis_password: Optional[str] = None,
    fate_share: Optional[bool] = None,
    runtime_env_agent_address: Optional[str] = None,
    server_type: str = "proxy",
    serialized_runtime_env_context: Optional[str] = None,
):
    """Run the server process of the Ray client.

    Args:
        address: The address of the cluster.
        ray_client_server_ip: Host IP the Ray client server listens on.
        ray_client_server_port: Port the Ray client server listens on.
        stdout_file: A file handle opened for writing to redirect stdout to. If
            no redirection should happen, then this should be None.
        stderr_file: A file handle opened for writing to redirect stderr to. If
            no redirection should happen, then this should be None.
        redis_username: The username of the Redis server.
        redis_password: The password of the Redis server.
        runtime_env_agent_address: Address to the Runtime Env Agent listens on via HTTP.
            Only needed when server_type == "proxy".
        server_type: Whether to start the proxy version of Ray Client.
        serialized_runtime_env_context (str|None): If specified, the serialized
            runtime_env_context to start the client server in.

    Returns:
        ProcessInfo for the process that was started.
    """
    root_ray_dir = Path(__file__).resolve().parents[1]
    setup_worker_path = os.path.join(
        root_ray_dir, "_private", "workers", ray_constants.SETUP_WORKER_FILENAME
    )

    ray_client_server_host = ray_client_server_ip
    command = [
        sys.executable,
        setup_worker_path,
        "-m",
        "ray.util.client.server",
        f"--address={address}",
        f"--host={ray_client_server_host}",
        f"--port={ray_client_server_port}",
        f"--mode={server_type}",
        f"--language={Language.Name(Language.PYTHON)}",
    ]
    if redis_username:
        command.append(f"--redis-username={redis_username}")
    if redis_password:
        command.append(f"--redis-password={redis_password}")
    if serialized_runtime_env_context:
        command.append(
            f"--serialized-runtime-env-context={serialized_runtime_env_context}"  # noqa: E501
        )
    if server_type == "proxy":
        assert len(runtime_env_agent_address) > 0
    if runtime_env_agent_address:
        command.append(f"--runtime-env-agent-address={runtime_env_agent_address}")

    process_info = start_ray_process(
        command,
        ray_constants.PROCESS_TYPE_RAY_CLIENT_SERVER,
        stdout_file=stdout_file,
        stderr_file=stderr_file,
        fate_share=fate_share,
    )
    return process_info


def _is_raylet_process(cmdline: Optional[List[str]]) -> bool:
    """Check if the command line belongs to a raylet process.

    Args:
        cmdline: List of command line arguments or None

    Returns:
        bool: True if this is a raylet process, False otherwise
    """
    if cmdline is None or len(cmdline) == 0:
        return False

    executable = os.path.basename(cmdline[0])
    return "raylet" in executable
