
    %`i_                     L   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
Zd dlmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZmZ d dlmZmZmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)  ej*        e+          Z,dZ-e" G d d                      Z.dS )    N)wraps)Lock)Optional)Languagecross_language)ray_option_utils))_warn_if_using_deprecated_placement_group)pickle_dumps)wrap_auto_init)client_mode_convert_functionclient_mode_should_convert)get_runtime_env_info#parse_runtime_env_for_task_or_actor)STREAMING_GENERATOR_RETURNObjectRefGeneratorPythonFunctionDescriptor)DeveloperAPI	PublicAPI)+_configure_placement_group_based_on_context) PlacementGroupSchedulingStrategy)_inject_tracing_into_function_tracing_task_invocationc                       e Zd ZdZd Zd Zd Zd Zd Ze	e
	 	 	 ddee         fd	                        Zed
             ZdS )RemoteFunctiona
  A remote function.

    This is a decorated function. It can be used to spawn tasks.

    Attributes:
        _language: The target language.
        _function: The original function.
        _function_descriptor: The function descriptor. This is not defined
            until the remote function is first invoked because that is when the
            function is pickled, and the pickled function is used to compute
            the function descriptor.
        _function_name: The module and function name.
        _num_cpus: The default number of CPUs to use for invocations of this
            remote function.
        _num_gpus: The default number of GPUs to use for invocations of this
            remote function.
        _memory: The heap memory request in bytes for this task/actor,
            rounded down to the nearest integer.
        _label_selector: The label requirements on a node for scheduling of the task or actor.
        _fallback_strategy: Soft constraints of a list of decorator options to fall back on when scheduling on a node.
        _resources: The default custom resource requirements for invocations of
            this remote function.
        _num_returns: The default number of return values for invocations
            of this remote function.
        _max_calls: The number of times a worker can execute this function
            before exiting.
        _max_retries: The number of times this task may be retried
            on worker failure.
        _retry_exceptions: Whether application-level errors should be retried.
            This can be a boolean or a list/tuple of exceptions that should be retried.
        _runtime_env: The runtime environment for this task.
        _decorator: An optional decorator that should be applied to the remote
            function invocation (as opposed to the function execution) before
            invoking the function. The decorator must return a function that
            takes in two arguments ("args" and "kwargs"). In most cases, it
            should call the function that was passed into the decorator and
            return the resulting ObjectRefs. For an example, see
            "test_decorated_function" in "python/ray/tests/test_basic.py".
        _function_signature: The function signature.
        _last_export_cluster_and_job: A pair of the last exported cluster
            and job to help us to know whether this function was exported.
            This is an imperfect mechanism used to determine if we need to
            export the remote function again. It is imperfect in the sense that
            the actor class definition could be exported multiple times by
            different workers.
        _scheduling_strategy: Strategy about how to schedule
            this remote function.
    c           	          t          j        |          rt          d          | _         j                            d          pd}|dk    r j                            dd           t           fddD                       r
d j        d<   t          j                                        D ]2\  }}t           d|z   |                    ||j
                             3t           j                   _        d	 j        v r j         j        d	<   d
 _         j        rt           j        dd           _        | _        t          j        |           _        | _        d  _        t)                       _        |j        dz   |j        z    _        | _        |t4          j        k     _        t;          |dd            _        d  _        tA          j!                     _"        tG          |           fd            }| _$        d S )Nz'async def' should not be used for remote tasks. You can wrap the async function with `asyncio.run(f())`. See more at:https://docs.ray.io/en/latest/ray-core/actors/async_api.html num_gpusr   	max_callsc                 L    g | ] }|j                             |          pi v !S  )_default_optionsget).0sselfs     g/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/remote_function.py
<listcomp>z+RemoteFunction.__init__.<locals>.<listcomp>s   sD        d+//228b9      )nsightzrocprof-sys   _runtime_env FTis_job_runtime_env	serialize.__ray_invocation_decorator__c                  :     j         dj        | |dj        S )N)serialized_runtime_env_infoargskwargsr   )_remote!_serialized_base_runtime_env_infor    )r4   r5   r$   s     r%   _remote_proxyz.RemoteFunction.__init__.<locals>._remote_proxy   s<    4< ,0,R  '	  r'   )%inspectiscoroutinefunction
ValueErrorr    r!   anyr   task_optionsitemssetattrdefault_valuer   _runtime_envr7   r   	_languageisgeneratorfunction_is_generator	_function_function_signaturer   _inject_lock
__module____name___function_name_function_descriptorr   PYTHON_is_cross_languagegetattr
_decorator_last_export_cluster_and_jobuuiduuid4_uuidr   remote)	r$   languagefunctionfunction_descriptorr=   r   kvr8   s	   `        r%   __init__zRemoteFunction.__init__[   sA    &x00 	P  
 !- (,,Z88=AqLLT266{DIIQ   2  
 
 R 23D!+. %17799 	I 	IDAqD#'<#3#3Aq#G#GHHHH?@QRRD111373DD!-0 24. 	5I!#(6 6 6D2 "$8BB!#'  FF&1C7(:KK$7!"*ho"=!(,JDQQ,0)Z\\
 
x	 	 	 	 
	 $r'   c                 B    t          d| j         d| j         d          )Nz@Remote functions cannot be called directly. Instead of running 'z
()', try 'z.remote()'.)	TypeErrorrJ   )r$   r4   r5   s      r%   __call__zRemoteFunction.__call__   s>    5.5 5'5 5 5
 
 	
r'   c                 >    | j                                         }|d= |S NrG   )__dict__copy)r$   attrss     r%   __getstate__zRemoteFunction.__getstate__   s"    ""$$.!r'   c                 f    | j                             |           t                      | j         d<   d S r_   )r`   updater   )r$   states     r%   __setstate__zRemoteFunction.__setstate__   s.    U###(,n%%%r'   c                 |   | | j                                         }|                    dd           t          j        ||          t          j        d           | j        d|v r8t          d                   d<   d         rt          d         dd           G fdd	          } |            S )
a;  Configures and overrides the task invocation parameters.

        The arguments are the same as those that can be passed to :obj:`ray.remote`.
        Overriding `max_calls` is not supported.

        Args:
            num_returns: It specifies the number of object refs returned by
                the remote function invocation.
            num_cpus: The quantity of CPU cores to reserve
                for this task or for the lifetime of the actor.
            num_gpus: The quantity of GPUs to reserve
                for this task or for the lifetime of the actor.
            resources (Dict[str, float]): The quantity of various custom resources
                to reserve for this task or for the lifetime of the actor.
                This is a dictionary mapping strings (resource names) to floats.
            label_selector (Dict[str, str]): If specified, the labels required for the node on
                which this actor can be scheduled on. The label selector consist of key-value pairs,
                where the keys are label names and the value are expressions consisting of an operator
                with label values or just a value to indicate equality.
            fallback_strategy (List[Dict[str, Any]]): If specified, expresses soft constraints
                through a list of decorator options to fall back on when scheduling on a node.
            accelerator_type: If specified, requires that the task or actor run
                on a node with the specified type of accelerator.
                See :ref:`accelerator types <accelerator_types>`.
            memory: The heap memory request in bytes for this task/actor,
                rounded down to the nearest integer.
            object_store_memory: The object store memory request for actors only.
            max_calls: This specifies the
                maximum number of times that a given worker can execute
                the given remote function before it must exit
                (this can be used to address memory leaks in third-party
                libraries or to reclaim resources that cannot easily be
                released, e.g., GPU memory that was acquired by TensorFlow).
                By default this is infinite for CPU tasks and 1 for GPU tasks
                (to force GPU tasks to release resources after finishing).
            max_retries: This specifies the maximum number of times that the remote
                function should be rerun when the worker process executing it
                crashes unexpectedly. The minimum valid value is 0,
                the default is 3 (default), and a value of -1 indicates
                infinite retries.
            runtime_env (Dict[str, Any]): Specifies the runtime environment for
                this actor or task and its children. See
                :ref:`runtime-environments` for detailed documentation.
            retry_exceptions: This specifies whether application-level errors
                should be retried up to max_retries times.
            scheduling_strategy: Strategy about how to
                schedule a remote function or actor. Possible values are
                None: ray will figure out the scheduling strategy to use, it
                will either be the PlacementGroupSchedulingStrategy using parent's
                placement group if parent has one and has
                placement_group_capture_child_tasks set to true,
                or "DEFAULT";
                "DEFAULT": default hybrid scheduling;
                "SPREAD": best effort spread scheduling;
                `PlacementGroupSchedulingStrategy`:
                placement group based scheduling;
                `NodeAffinitySchedulingStrategy`:
                node id based affinity scheduling.
            enable_task_events: This specifies whether to enable task events for this
                task. If set to True, task events such as (task running, finished)
                are emitted, and available to Ray Dashboard and State API.
                See :ref:`state-api-overview-ref` for more details.
            _labels: The key-value labels of a task.

        Examples:

        .. code-block:: python

            @ray.remote(num_gpus=1, max_calls=1, num_returns=2)
            def f():
               return 1, 2
            # Task g will require 2 gpus instead of 1.
            g = f.options(num_gpus=2)
        r   NT)
in_optionsr+   Fr-   c                   :    e Zd Z fdZe fd            ZdS )+RemoteFunction.options.<locals>.FuncWrapperc                 &     j         d||dS )N)r4   r5   r3   r   )r6   )r$   r4   r5   func_clsr3   updated_optionss      r%   rT   z2RemoteFunction.options.<locals>.FuncWrapper.remote$  s7    'x' !0K  &	  r'   c                 6    ddl m}  |j        ||          S )z
                For Ray DAG building that creates static graph from decorated
                class or functions.
                r   FunctionNode)ray.dag.function_noderq   rE   )r$   r4   r5   rq   rm   rn   s       r%   bindz0RemoteFunction.options.<locals>.FuncWrapper.bind,  s0     ?>>>>>#|H$6foVVVr'   N)rI   rH   __qualname__rT   r   rs   )rm   r3   rn   s   r%   FuncWrapperrk   #  sj               W W W W W \W W Wr'   ru   )	r    ra   popr   update_optionsvalidate_task_optionsr7   r   r   )r$   r=   default_optionsru   rm   r3   rn   s       @@@r%   optionszRemoteFunction.options   s   X  /4466 	K...*9/<XX.4PPPP '+&L#L((-P.. .OM* }- .B#M2',"/ / /+	W 	W 	W 	W 	W 	W 	W 	W 	W 	W 	W 	W& {}}r'   Nr3   c                     |                     dd           t                      rt           ||fi |S t          j        j        j                                         j        t          j        j        j	        k    rddl
m} |                    d            j        5   j        Gt           j                   _        t          j        j                             j                   _        ddd           n# 1 swxY w Y    j        s j        j        k    rqt/          j         j         j                   _        t7           j        d j        j                    _        j         _        j                                        |i n|}|g n|}t@          j!        "                                D ]l\  }}|dk    rCtF          j$        %                    d|j&                  |_&        tO          |j&                  |_&        |%                    ||j&                  ||<   m|                     dd           |d	         |d
         }|d         }	|d         }
|d         |d          j(        rdnddk    rdndk    rt          j)        j*        |d         d|d         |d         tW          tX          tZ          f          rt[                    dndtW          t\                    st_          |d           t          j        j0        1                    |          tW          t\                    rmtW          t\                    rj2        }j3        }	j4        }
|
j5        }
tm          |
|	i  j        j7        |          }|j8        st]          ||	|
          ndtr          rts           j                   |%                    d          |%                    d          |%                    d          |%                    d           fd} j:         :                    |          } |||          S )z)Submit the remote function for execution.r   Nr   )	usage_libcorez!Could not serialize the function max_retriesRAY_TASK_MAX_RETRIESnameplacement_groupplacement_group_bundle_index#placement_group_capture_child_tasksscheduling_strategynum_returns	streamingr)   dynamic#_generator_backpressure_num_objectsretry_exceptionsT   )r   DEFAULTenable_task_events_labelslabel_selectorfallback_strategyc                 r   j         rt          j        | |          }n9| s|s
j        sg }n+t          j        j                            j        | |          }j        t          j	        j
        j        k    rj         r
J d            j                            j        j        |nd
j        pd	          }d_        t"          k    r-t%          |          dk    sJ |d         }t'          |          S t%          |          dk    r|d         S t%          |          dk    r|S d S )Nz:Cross language remote function cannot be executed locally.r,   z{}r'   r)   r   )rM   r   _format_argsrF   ray_common	signatureflatten_argsmode_privateworker
LOCAL_MODEcore_workersubmit_taskrB   rK   debugger_breakpointr   lenr   )r4   r5   	list_argsobject_refsgenerator_refr   r   "generator_backpressure_num_objectsr   labelsr~   r   r   	resourcesretry_exception_allowlistr   r   r$   r3   r   s        r%   
invocationz*RemoteFunction._remote.<locals>.invocation  s   & *7fMM		 & 1I 		K1>>,dF 	 {cl1<<</P POP P/ ,88)(b )#*+3t2"!# K* *-F&888 ;''1,,,, +A)-@@@;1$$"1~%[!!A%%"" &%r'   );rv   r   r   r   r   r   global_workercheck_connectedr   WORKER_MODEray._common.usager|   record_library_usagerG   rF   r   rE   r   r   extract_signaturerM   rP   current_cluster_and_jobr   from_functionrS   rK   r
   repr_pickled_functionfunction_actor_managerexportr   r=   r>   osenvironr!   r@   intrD   _rayletr   
isinstancelisttupler   r	   utilsresources_from_ray_optionsr   r   r   -should_capture_child_tasks_in_placement_groupr   function_nameis_empty_task_launch_hookrO   )r$   r4   r5   r3   r=   r|   rX   rY   r   r   r   r   r   r   r   r   r   r~   r   r   r   r   r   r   r   s   `  `        @@@@@@@@@@@@@r%   r6   zRemoteFunction._remote8  s    	d+++%'' 	T/dFSSlSSS$2   ;#,-999 433333**6222  	 	'/!>t~!N!N+.;+@+R+RN, ,(	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 '	71V5SSS(@(N
) )D% &2TD4M4RTT& &D"
 170ND-)00666~6\rrt %17799 		C 		CDAqM!! #%*..*AO# # #&ao"6"6*..q!/BBLOO 	d+++ F#&'89'34R'S$.:1/
+ ++@A"=1!  ))##KKK'' +@K-91.
* .513."=1'(:;&u66 	-(-.>(?(?%#(,%&j!A/
 /
& 6lAFFFK%@@NN	&*!A+
 +
& -/OPP "5"E'D - (K 4 3:H 4 J3,)7 /  O #+ 0&F#07' '## '0# 	Yd7DWXXX *--.BCC!!),,%))*:;;(,,-@AA-	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	# -	#^ ?&44Jz$'''s   .AD		DDc                 >    ddl m}  || j        ||| j                  S )zk
        For Ray DAG building that creates static graph from decorated
        class or functions.
        r   rp   )rr   rq   rE   r    )r$   r4   r5   rq   s       r%   rs   zRemoteFunction.bind  s1     	766666|DND&$:OPPPr'   )NNN)rI   rH   rt   __doc__rZ   r]   rc   rg   rz   r   r   r   strr6   r   rs   r   r'   r%   r   r   (   s        / /bP$ P$ P$d
 
 
  
/ / /x x xt  59	U( U( &.c]	U( U( U(  ^U(n Q Q \Q Q Qr'   r   )/r9   loggingr   rQ   	functoolsr   	threadingr   typingr   ray._common.signaturer   r   r   ray._commonr   ray._common.ray_option_utilsr	   ray._common.serializationr
   ray._private.auto_init_hookr   ray._private.client_mode_hookr   r   ray._private.utilsr   r   ray._rayletr   r   r   ray.util.annotationsr   r   ray.util.placement_groupr   ray.util.scheduling_strategiesr   ray.util.tracing.tracing_helperr   r   	getLoggerrI   loggerr   r   r   r'   r%   <module>r      s     				                        ( ( ( ( ( ( ( ( ( ( ( ( ( ( R R R R R R 2 2 2 2 2 2 6 6 6 6 6 6        Y X X X X X X X         
 9 8 8 8 8 8 8 8 P P P P P P K K K K K K       
 
	8	$	$   qQ qQ qQ qQ qQ qQ qQ qQ qQ qQr'   