import inspect
import logging
from types import FunctionType
from typing import Any, Dict, Union

import ray
from ray._common.pydantic_compat import is_subclass_of_base_model
from ray._common.usage import usage_lib
from ray.actor import ActorHandle
from ray.serve._private.client import ServeControllerClient
from ray.serve._private.constants import (
    HTTP_PROXY_TIMEOUT,
    SERVE_LOGGER_NAME,
    SERVE_NAMESPACE,
)
from ray.serve._private.default_impl import get_controller_impl
from ray.serve.config import HTTPOptions, gRPCOptions
from ray.serve.context import _get_global_client, _set_global_client
from ray.serve.deployment import Application
from ray.serve.exceptions import RayServeException
from ray.serve.schema import LoggingConfig

logger = logging.getLogger(SERVE_LOGGER_NAME)


def _check_http_options(
    client: ServeControllerClient, http_options: Union[dict, HTTPOptions]
) -> None:
    if http_options:
        client_http_options = client.http_config
        new_http_options = (
            http_options
            if isinstance(http_options, HTTPOptions)
            else HTTPOptions.parse_obj(http_options)
        )
        different_fields = []
        all_http_option_fields = new_http_options.__dict__
        for field in all_http_option_fields:
            if getattr(new_http_options, field) != getattr(client_http_options, field):
                different_fields.append(field)

        if len(different_fields):
            logger.warning(
                "The new client HTTP config differs from the existing one "
                f"in the following fields: {different_fields}. "
                "The new HTTP config is ignored."
            )


def _start_controller(
    http_options: Union[None, dict, HTTPOptions] = None,
    grpc_options: Union[None, dict, gRPCOptions] = None,
    global_logging_config: Union[None, dict, LoggingConfig] = None,
    **kwargs,
) -> ActorHandle:
    """Start Ray Serve controller.

    The function makes sure controller is ready to start deploying apps
    after it returns.

    Parameters are same as ray.serve._private.api.serve_start().

    Returns: controller actor handle.
    """

    # Initialize ray if needed.
    ray._private.worker.global_worker._filter_logs_by_job = False
    if not ray.is_initialized():
        ray.init(namespace=SERVE_NAMESPACE)

    # Legacy http proxy actor check
    http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
    for key in http_deprecated_args:
        if key in kwargs:
            raise ValueError(
                f"{key} is deprecated, please use serve.start(http_options="
                f'{{"{key}": {kwargs[key]}}}) instead.'
            )

    if isinstance(http_options, dict):
        http_options = HTTPOptions.parse_obj(http_options)
    if http_options is None:
        http_options = HTTPOptions()

    if isinstance(grpc_options, dict):
        grpc_options = gRPCOptions(**grpc_options)

    if global_logging_config is None:
        global_logging_config = LoggingConfig()
    elif isinstance(global_logging_config, dict):
        global_logging_config = LoggingConfig(**global_logging_config)

    controller_impl = get_controller_impl()
    controller = controller_impl.remote(
        http_options=http_options,
        grpc_options=grpc_options,
        global_logging_config=global_logging_config,
    )

    proxy_handles = ray.get(controller.get_proxies.remote())
    if len(proxy_handles) > 0:
        try:
            ray.get(
                [handle.ready.remote() for handle in proxy_handles.values()],
                timeout=HTTP_PROXY_TIMEOUT,
            )
        except ray.exceptions.GetTimeoutError:
            raise TimeoutError(
                f"HTTP proxies not available after {HTTP_PROXY_TIMEOUT}s."
            )
    return controller


async def serve_start_async(
    http_options: Union[None, dict, HTTPOptions] = None,
    grpc_options: Union[None, dict, gRPCOptions] = None,
    global_logging_config: Union[None, dict, LoggingConfig] = None,
    **kwargs,
) -> ServeControllerClient:
    """Initialize a serve instance asynchronously.

    This function is not thread-safe. The caller should maintain the async lock in order
    to start the serve instance asynchronously.

    This function has the same functionality as ray.serve._private.api.serve_start().

    Parameters & Returns are same as ray.serve._private.api.serve_start().
    """

    usage_lib.record_library_usage("serve")

    try:
        client = _get_global_client(_health_check_controller=True)
        logger.info(
            f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
            " New http options will not be applied."
        )
        if http_options:
            _check_http_options(client, http_options)
        return client
    except RayServeException:
        pass

    controller = (
        await ray.remote(_start_controller)
        .options(num_cpus=0)
        .remote(http_options, grpc_options, global_logging_config, **kwargs)
    )

    client = ServeControllerClient(
        controller,
    )
    _set_global_client(client)
    logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
    return client


def serve_start(
    http_options: Union[None, dict, HTTPOptions] = None,
    grpc_options: Union[None, dict, gRPCOptions] = None,
    global_logging_config: Union[None, dict, LoggingConfig] = None,
    **kwargs,
) -> ServeControllerClient:
    """Initialize a serve instance.

    By default, the instance will be scoped to the lifetime of the returned
    Client object (or when the script exits). This is
    only relevant if connecting to a long-running Ray cluster (e.g., with
    ray.init(address="auto") or ray.init("ray://<remote_addr>")).

    Args:
        http_options (Optional[Dict, serve.HTTPOptions]): Configuration options
          for HTTP proxy. You can pass in a dictionary or HTTPOptions object
          with fields:

            - host(str, None): Host for HTTP servers to listen on. Defaults to
              "127.0.0.1". To expose Serve publicly, you probably want to set
              this to "0.0.0.0".
            - port(int): Port for HTTP server. Defaults to 8000.
            - root_path(str): Root path to mount the serve application
              (for example, "/serve"). All deployment routes will be prefixed
              with this path. Defaults to "".
            - middlewares(list): A list of Starlette middlewares that will be
              applied to the HTTP servers in the cluster. Defaults to [].
            - location(str, serve.config.DeploymentMode): The deployment
              location of HTTP servers:

                - "HeadOnly": start one HTTP server on the head node. Serve
                  assumes the head node is the node you executed serve.start
                  on. This is the default.
                - "EveryNode": start one HTTP server per node.
                - "NoServer" or None: disable HTTP server.
            - num_cpus (int): The number of CPU cores to reserve for each
              internal Serve HTTP proxy actor.  Defaults to 0.
        grpc_options: [Experimental] Configuration options for gRPC proxy.
          You can pass in a gRPCOptions object with fields:

            - port(int): Port for gRPC server. Defaults to 9000.
            - grpc_servicer_functions(list): List of import paths for gRPC
                `add_servicer_to_server` functions to add to Serve's gRPC proxy.
                Default empty list, meaning not to start the gRPC server.
    """

    usage_lib.record_library_usage("serve")

    try:
        client = _get_global_client(_health_check_controller=True)
        logger.info(
            f'Connecting to existing Serve app in namespace "{SERVE_NAMESPACE}".'
            " New http options will not be applied."
        )
        if http_options:
            _check_http_options(client, http_options)
        return client
    except RayServeException:
        pass

    controller = _start_controller(
        http_options, grpc_options, global_logging_config, **kwargs
    )

    client = ServeControllerClient(
        controller,
    )
    _set_global_client(client)
    logger.info(f'Started Serve in namespace "{SERVE_NAMESPACE}".')
    return client


def call_user_app_builder_with_args_if_necessary(
    builder: Union[Application, FunctionType],
    args: Dict[str, Any],
) -> Application:
    """Calls a user-provided function that returns Serve application.

    If an Application object is passed, this is a no-op.

    Else, we validate the signature of the function, convert the args dictionary to
    the user-annotated Pydantic model if provided, and call the function.

    The output of the function is returned (must be an Application).
    """
    if isinstance(builder, Application):
        if len(args) > 0:
            raise ValueError(
                "Arguments can only be passed to an application builder function, "
                "not an already built application."
            )
        return builder
    elif not isinstance(builder, FunctionType):
        raise TypeError(
            "Expected a built Serve application or an application builder function "
            f"but got: {type(builder)}."
        )

    # Check that the builder only takes a single argument.
    # TODO(edoakes): we may want to loosen this to allow optional kwargs in the future.
    signature = inspect.signature(builder)
    if len(signature.parameters) != 1:
        raise TypeError(
            "Application builder functions should take exactly one parameter, "
            "a dictionary containing the passed arguments."
        )

    # If the sole argument to the builder is a pydantic model, convert the args dict to
    # that model. This will perform standard pydantic validation (e.g., raise an
    # exception if required fields are missing).
    param = signature.parameters[list(signature.parameters.keys())[0]]
    if inspect.isclass(param.annotation) and is_subclass_of_base_model(
        param.annotation
    ):
        args = param.annotation.parse_obj(args)

    app = builder(args)
    if not isinstance(app, Application):
        raise TypeError(
            "Application builder functions must return an `Application` returned "
            f"`from `Deployment.bind()`, but got: {type(app)}."
        )

    return app
