
    &`i                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dl	m
Z
mZmZmZmZmZmZmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ  ej        e          Ze G d	 d
                      Zee G d d                                  Z e G d d                      Z!e G d d                      Z"e G d d                      Z#dS )    N)defaultdict)	dataclassfield)AnyCallableDictIteratorListOptionalTupleUnion)ActorHandle)RayErrorRayTaskError)T)DeveloperAPIc                   @    e Zd ZdZddedefdZed             Zd Z	dS )	ResultOrErrorzA wrapper around a result or a RayError thrown during remote task/actor calls.

    This is used to return data from `FaultTolerantActorManager` that allows us to
    distinguish between RayErrors (remote actor related) and valid results.
    Nresulterrorc                 t    || _         t          |t                    r|                                n|| _        dS )a1  One and only one of result or error should be set.

        Args:
            result: The result of the computation. Note that None is a valid result if
                the remote function does not return anything.
            error: Alternatively, the error that occurred during the computation.
        N)_result
isinstancer   as_instanceof_cause_error)selfr   r   s      q/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/utils/actor_manager.py__init__zResultOrError.__init__   s>      %..E%%'''	 	    c                     | j         d u S N)r   r   s    r   okzResultOrError.ok*   s    {d""r   c                 ,    | j         r| j         S | j        S )z Returns the result or the error.)r   r   r"   s    r   getzResultOrError.get.   s    ; 	 ;<r   )NN)
__name__
__module____qualname____doc__r   	Exceptionr   propertyr#   r%    r   r   r   r      so         
 
s 
) 
 
 
 
  # # X#         r   r   c                   N    e Zd ZU dZeed<   eed<   eed<   ed             Z	d Z
dS )
CallResultzRepresents a single result from a call to an actor.

    Each CallResult contains the index of the actor that was called
    plus the result or error from the call.
    actor_idresult_or_errortagc                     | j         j        S )z8Passes through the ok property from the result_or_error.)r0   r#   r"   s    r   r#   zCallResult.okC   s     #&&r   c                 4    | j                                         S )z7Passes through the get method from the result_or_error.)r0   r%   r"   s    r   r%   zCallResult.getH   s    #'')))r   N)r&   r'   r(   r)   int__annotations__r   strr+   r#   r%   r,   r   r   r.   r.   6   sh           MMM""""	HHH' ' X'* * * * *r   r.   c                       e Zd ZdZ G d d          Zd ZdededefdZ	d	e
e         fd
Zd	efdZd	e
e         fdZd	e
e         fdZdS )RemoteCallResultsa  Represents a list of results from calls to a set of actors.

    CallResults provides convenient APIs to iterate over the results
    while skipping errors, etc.

    .. testcode::
        :skipif: True

        manager = FaultTolerantActorManager(
            actors, max_remote_requests_in_flight_per_actor=2,
        )
        results = manager.foreach_actor(lambda w: w.call())

        # Iterate over all results ignoring errors.
        for result in results.ignore_errors():
            print(result.get())
    c                   N    e Zd ZdZdee         fdZdee         fdZdefdZ	dS )RemoteCallResults._Iteratorz.An iterator over the results of a remote call.call_resultsc                     || _         d S r!   )_call_results)r   r;   s     r   r   z$RemoteCallResults._Iterator.__init__d   s    !-Dr   returnc                     | S r!   r,   r"   s    r   __iter__z$RemoteCallResults._Iterator.__iter__g   s    Kr   c                 R    | j         st          | j                             d          S Nr   )r=   StopIterationpopr"   s    r   __next__z$RemoteCallResults._Iterator.__next__j   s*    % $##%))!,,,r   N)
r&   r'   r(   r)   r
   r.   r   r	   r@   rE   r,   r   r   	_Iteratorr:   a   su        <<	.j)9 	. 	. 	. 	.	hz2 	 	 	 		-j 	- 	- 	- 	- 	- 	-r   rF   c                     g | _         d S r!   )result_or_errorsr"   s    r   r   zRemoteCallResults.__init__o   s    24r   r/   r0   r1   c                 X    | j                             t          |||                     dS )zAdd index of a remote actor plus the call result to the list.

        Args:
            actor_id: ID of the remote actor.
            result_or_error: The result or error from the call.
            tag: A description to identify the call.
        N)rH   appendr.   )r   r/   r0   r1   s       r   
add_resultzRemoteCallResults.add_resultr   s-     	$$Z/3%O%OPPPPPr   r>   c                 Z    |                      t          j        | j                            S )z$Return an iterator over the results.)rF   copyrH   r"   s    r   r@   zRemoteCallResults.__iter__|   s#     ~~di(=>>???r   c                 *    t          | j                  S r!   )lenrH   r"   s    r   __len__zRemoteCallResults.__len__   s    4()))r   c                 J    |                      d | j        D                       S )z9Return an iterator over the results, skipping all errors.c                      g | ]}|j         	|S r,   )r#   .0rs     r   
<listcomp>z3RemoteCallResults.ignore_errors.<locals>.<listcomp>   s    HHHQ14HqHHHr   rF   rH   r"   s    r   ignore_errorszRemoteCallResults.ignore_errors   s&    ~~HH$*?HHHIIIr   c                 J    |                      d | j        D                       S )aC  Return an iterator over the results, skipping only Ray errors.

        Similar to ignore_errors, but only skips Errors raised because of
        remote actor problems (often get restored automatcially).
        This is useful for callers that want to handle application errors differently
        from Ray errors.
        c                 `    g | ]+}t          |                                t                    )|,S r,   )r   r%   r   rS   s     r   rV   z7RemoteCallResults.ignore_ray_errors.<locals>.<listcomp>   s/    SSS1Z5R5RSQSSSr   rW   r"   s    r   ignore_ray_errorsz#RemoteCallResults.ignore_ray_errors   s.     ~~SS-SSS
 
 	
r   N)r&   r'   r(   r)   rF   r   r4   r   r6   rK   r	   r@   rP   rX   r[   r,   r   r   r8   r8   M   s
        $- - - - - - - -5 5 5Q3 Q QS Q Q Q Q@(=1 @ @ @ @
* * * * *Jx6 J J J J

8M#: 

 

 

 

 

 

r   r8   c                   x    e Zd Zedefd            Zedeeee         ee         ge	f         de	fd            Z
dS )FaultAwareApplyr>   c                     dS )zuPing the actor. Can be used as a health check.

        Returns:
            "pong" if actor is up and well.
        pongr,   r"   s    r   pingzFaultAwareApply.ping   s	     vr   funcc                    	  || g|R i |S # t           $ ri}| j        j        rPt                              d|            t          j        | j        j                   t          j	        d           n|Y d}~dS d}~ww xY w)a  Calls the given function with this Actor instance.

        A generic interface for applying arbitrary member functions on a
        remote actor.

        Args:
            func: The function to call, with this actor as first
                argument, followed by args, and kwargs.
            args: Optional additional args to pass to the function call.
            kwargs: Optional additional kwargs to pass to the function call.

        Returns:
            The return value of the function call.
        z*Worker exception caught during `apply()`:    N)
r*   configrestart_failed_env_runnerslogger	exceptiontimesleep#delay_between_env_runner_restarts_ssysexit)r   ra   argskwargses        r   applyzFaultAwareApply.apply   s    *	4.t...v... 
	 
	 
	{5   !Qa!Q!QRRR
4;JKKK  
	s    
BAA<<BN)r&   r'   r(   r   r6   r`   r   r   r   r   rp   r,   r   r   r]   r]      s        c    \  Xc]HSM:A=> 
 
      \     r   r]   c                      e Zd ZdZe G d d                      Z	 	 	 d>deee                  de	d	e	fd
Z
edee	         fd            Zedee	         fd            Zedee         fd            Zede	defd            Zede	fd            Zede	fd            Zede	fd            Zed?dee         de	fd            Zede	defd            Zede	deddfd            Zed             Zeddddddddeeegef         eeegef                  eee         f         deeeeef         eeeef                  f                  ded eee	                  d!ee         d"ed#edefd$            Ze	 d?dddd%deeegef         eeegef                  eee         f         dee         deeeeef         eeeef                  f                  ded eee	                  de	fd&            Z ed'd(ddd)d*eeee         e!ed+f         f         d!ee         d"ed#edef
d,            Z"e	 d?dd(ddddddd-deeegef         eeegef                  eee         f         dee         deeeeef         eeeef                  f                  d!ee         d"ed#eded eee	                  d.ed/edeee!e	ef         ef                  fd0            Z#e$d1ed.eddfd2            Z%e	 	 d@d!ee         d#edee	         fd3            Z&ddd4deeegef         eeegef                  eee         f         deeeeef         eeeef                  f                  d ee	         dee'j(                 fd5Z)edddd6d ee	         d7ee'j(                 d*ee         d!ee         d"ed#ede!ee'j(                 ef         fd8            Z*dd9deeegef         eeegef                  f         deeeee         f                  d ee	         fd:Z+	 d?d*eeeee         e!ed+f         f                  de!ee'j(                 ee         ee         f         fd;Z,de	fd<Z-d= Z.dS )AFaultTolerantActorManagera}  A manager that is aware of the healthiness of remote actors.

    .. testcode::

        import time
        import ray
        from ray.rllib.utils.actor_manager import FaultTolerantActorManager

        @ray.remote
        class MyActor:
            def apply(self, func):
                return func(self)

            def do_something(self):
                return True

        actors = [MyActor.remote() for _ in range(3)]
        manager = FaultTolerantActorManager(
            actors, max_remote_requests_in_flight_per_actor=2,
        )

        # Synchronous remote calls.
        results = manager.foreach_actor(lambda actor: actor.do_something())
        # Print results ignoring returned errors.
        print([r.get() for r in results.ignore_errors()])

        # Asynchronous remote calls.
        manager.foreach_actor_async(lambda actor: actor.do_something())
        time.sleep(2)  # Wait for the tasks to finish.
        for r in manager.fetch_ready_async_reqs():
            # Handle result and errors.
            if r.ok:
                print(r.get())
            else:
                print("Error: {}".format(r.get()))
    c                       e Zd ZU dZ ee          Zeee	         e
f         ed<   dZeed<   ddee	         de
fd	Zddee	         ddfd
Zddee	         ddfdZdS )%FaultTolerantActorManager._ActorStatezState of a single actor.)default_factory#num_in_flight_async_requests_by_tagT
is_healthyNr1   r>   c                     |&t          | j                                                  S | j                            |d          S )z@Get number of in-flight requests for a specific tag or all tags.Nr   )sumrv   valuesr%   r   r1   s     r   get_num_in_flight_requestsz@FaultTolerantActorManager._ActorState.get_num_in_flight_requests   s=    {4CJJLLMMM;??QGGGr   c                 V    || j         vr
d| j         |<   | j         |xx         dz  cc<   dS )z4Increment the count of in-flight requests for a tag.r   rc   Nrv   r{   s     r   increment_requestsz8FaultTolerantActorManager._ActorState.increment_requests   sB    $BBB@A8=4S999Q>99999r   c                 |    || j         v r0| j         |xx         dz  cc<   | j         |         dk    r| j         |= dS dS dS )z4Decrement the count of in-flight requests for a tag.rc   r   Nr~   r{   s     r   decrement_requestsz8FaultTolerantActorManager._ActorState.decrement_requests  sa    d>>>8===B===;C@AEE@EEE ?>EEr   r!   )r&   r'   r(   r)   r   dictrv   r   r   r6   r4   r5   rw   boolr|   r   r   r,   r   r   _ActorStatert      s         && IN I
 I
 I
+T(3-2D-E 	
 	
 	
  
D	H 	H(3- 	H3 	H 	H 	H 	H	? 	?(3- 	?4 	? 	? 	? 	?	F 	F(3- 	F4 	F 	F 	F 	F 	F 	Fr   r   N   r   actors'max_remote_requests_in_flight_per_actorinit_idc                     |x| _         | _        i | _        i | _        t	                      | _        |                     |pg            i | _        || _        d| _	        dS )a  Construct a FaultTolerantActorManager.

        Args:
            actors: A list of ray remote actors to manage on. These actors must have an
                ``apply`` method which takes a function with only one parameter (the
                actor instance itself).
            max_remote_requests_in_flight_per_actor: The maximum number of remote
                requests that can be in flight per actor. Any requests made to the pool
                that cannot be scheduled because the limit has been reached will be
                dropped. This only applies to the asynchronous remote call mode.
            init_id: The initial ID to use for the next remote actor. Default is 0.
        r   N)
_next_id_current_actor_id_actors_remote_actor_statesset_restored_actors
add_actors_in_flight_req_to_actor_id(_max_remote_requests_in_flight_per_actor_num_actor_restarts)r   r   r   r   s       r   r   z"FaultTolerantActorManager.__init__	  so    ( 298. 02AC! #"%%% EG' 4 	5
 $%   r   r>   c                 N    t          | j                                                  S )z2Returns a list of all worker IDs (healthy or not).)listr   keysr"   s    r   	actor_idsz#FaultTolerantActorManager.actor_ids0  s      DL%%''(((r   c                 H    d | j                                         D             S )z.Returns a list of worker IDs that are healthy.c                 &    g | ]\  }}|j         |S r,   rw   )rT   kvs      r   rV   z?FaultTolerantActorManager.healthy_actor_ids.<locals>.<listcomp>8  s#    PPPda1<PPPPr   )r   itemsr"   s    r   healthy_actor_idsz+FaultTolerantActorManager.healthy_actor_ids5  s(     QPd7==??PPPPr   c                     |D ]B}|| j         | j        <   |                                 | j        | j        <   | xj        dz  c_        CdS )zAdd a list of actors to the pool.

        Args:
            actors: A list of ray remote actors to be added to the pool.
        rc   N)r   r   r   r   )r   r   actors      r   r   z$FaultTolerantActorManager.add_actors:  sX      	 	E*/DL'7;7G7G7I7ID%dm4MMQMMM	 	r   r/   c                     | j         |         }| j         |= | j        |= | j                            |           |                     |           |S )zRemove an actor from the pool.

        Args:
            actor_id: ID of the actor to remove.

        Returns:
            Handle to the actor that was removed.
        )r   r   r   discard_remove_async_state)r   r/   r   s      r   remove_actorz&FaultTolerantActorManager.remove_actorF  sW     X& L"%h/%%h///  ***r   c                 *    t          | j                  S )z.Return the total number of actors in the pool.)rO   r   r"   s    r   
num_actorsz$FaultTolerantActorManager.num_actorsZ  s     4<   r   c                 b    t          d | j                                        D                       S )z+Return the number of healthy remote actors.c              3   $   K   | ]}|j         V  d S r!   r   )rT   ss     r   	<genexpr>z?FaultTolerantActorManager.num_healthy_actors.<locals>.<genexpr>b  s$      LLA1<LLLLLLr   ry   r   rz   r"   s    r   num_healthy_actorsz,FaultTolerantActorManager.num_healthy_actors_  s0     LL)B)I)I)K)KLLLLLLr   c                     | j         S )z<Return the number of remote actors that have been restarted.)r   r"   s    r   total_num_restartsz,FaultTolerantActorManager.total_num_restartsd  s     ''r   r1   c                 h    t          fd| j                                        D                       S )z0Return the number of outstanding async requests.c              3   B   K   | ]}|                               V  d S r!   )r|   )rT   r   r1   s     r   r   zGFaultTolerantActorManager.num_outstanding_async_reqs.<locals>.<genexpr>l  sE       
 
 ((--
 
 
 
 
 
r   r   r{   s    `r   num_outstanding_async_reqsz4FaultTolerantActorManager.num_outstanding_async_reqsi  sL      
 
 
 
.5577
 
 
 
 
 	
r   c                 \    || j         vrt          d|           | j         |         j        S )zWhether a remote actor is in healthy state.

        Args:
            actor_id: ID of the remote actor.

        Returns:
            True if the actor is healthy, False otherwise.
        Unknown actor id: )r   
ValueErrorrw   )r   r/   s     r   is_actor_healthyz*FaultTolerantActorManager.is_actor_healthyq  s:     4444<(<<===(2==r   healthyc                 0   || j         vrt          d|           | j         |         j        }|s|r| j                            |           n|r|s| j                            |           || j         |         _        |s|                     |           dS dS )zUpdate activate state for a specific remote actor.

        Args:
            actor_id: ID of the remote actor.
            healthy: Whether the remote actor is healthy.
        r   N)r   r   rw   r   addr   r   )r   r/   r   was_healthys       r   set_actor_statez)FaultTolerantActorManager.set_actor_state  s     4444<(<<===/9D 	4w 	4!%%h//// 	4 	4!))(3339@!(+6 	/$$X.....	/ 	/r   c                 .   | j                                         D ]}t          j        |           | j                                          | j                                         | j                                         | j                                         dS )zClean up managed actors.N)r   rz   raykillclearr   r   r   )r   r   s     r   r   zFaultTolerantActorManager.clear  s     \((** 	 	EHUOOOO!'')))##%%%'--/////r   TF)rn   healthy_onlyremote_actor_idstimeout_secondsreturn_obj_refsmark_healthyra   rn   r   r   r   r   r   c                    |p|                                  }|r|                     |||          \  }}}|                     |||          }|                     ||dgt	          |          z  |||          \  }	}
|
S )a
  Calls the given function with each actor instance as arg.

        Automatically marks actors unhealthy if they crash during the remote call.

        Args:
            func: A single Callable applied to all specified remote actors or a list
                of Callables, that get applied on the list of specified remote actors.
                In the latter case, both list of Callables and list of specified actors
                must have the same length. Alternatively, you can use the name of the
                remote method to be called, instead, or a list of remote method names.
            kwargs: An optional single kwargs dict or a list of kwargs dict matching the
                list of provided `func` or `remote_actor_ids`. In the first case (single
                dict), use `kwargs` on all remote calls. The latter case (list of
                dicts) allows you to define individualized kwarg dicts per actor.
            healthy_only: If True, applies `func` only to actors currently tagged
                "healthy", otherwise to all actors. If `healthy_only=False` and
                `mark_healthy=True`, will send `func` to all actors and mark those
                actors "healthy" that respond to the request within `timeout_seconds`
                and are currently tagged as "unhealthy".
            remote_actor_ids: Apply func on a selected set of remote actors. Use None
                (default) for all actors.
            timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
                fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
                synchronous execution).
            return_obj_refs: whether to return ObjectRef instead of actual results.
                Note, for fault tolerance reasons, these returned ObjectRefs should
                never be resolved with ray.get() outside of the context of this manager.
            mark_healthy: Whether to mark all those actors healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that actors are NOT set unhealthy, if they simply time out
                (only if they return a RayActorError).
                Also not that this setting is ignored if `healthy_only=True` (b/c this
                setting only affects actors that are currently tagged as unhealthy).

        Returns:
            The list of return values of all calls to `func(actor)`. The values may be
            actual data returned or exceptions raised during the remote call in the
            format of RemoteCallResults.
        ra   rn   r   Nr   remote_callstagsr   r   r   )r   _filter_by_healthy_state_call_actors_fetch_resultrO   )r   ra   rn   r   r   r   r   r   r   _remote_resultss              r   foreach_actorz'FaultTolerantActorManager.foreach_actor  s    h ,?t~~/?/? 	-1-J-J&;K .K . .*D&*
 ((- ) 
 
 !..-%#l+++++% / 
 
> r   )rn   r   r   c                    |s                                  }t          |t                    rt          |          n2t          |t                    rt          |          nt          |          }|t          |          k    rM fdt	          |          D             } xj        |z  c_         xj                                         z  c_        |r                     |||          \  }}}t          d           }t          |t                    rt          |          t          |          k    sJ g }g }	g }
t          t          ||                    D ]\  }\  }} j        |                             |          }|||         z    j        k     rp||xx         dz  cc<   t          |t                    r||         n|pi }|                    |           |	                    |           |
                    |           nd|}|}	g }
|D ][} j        |                             |          }|||         z    j        k     r%||xx         dz  cc<   |
                    |           \|
sdS                      ||	|
          }t          |
|          D ]1\  }} j        |                             |           ||f j        |<   2t          |          S )a  Calls given functions against each actors without waiting for results.

        Args:
            func: A single Callable applied to all specified remote actors or a list
                of Callables, that get applied on the list of specified remote actors.
                In the latter case, both list of Callables and list of specified actors
                must have the same length. Alternatively, you can use the name of the
                remote method to be called, instead, or a list of remote method names.
            tag: A tag to identify the results from this async call.
            kwargs: An optional single kwargs dict or a list of kwargs dict matching the
                list of provided `func` or `remote_actor_ids`. In the first case (single
                dict), use `kwargs` on all remote calls. The latter case (list of
                dicts) allows you to define individualized kwarg dicts per actor.
            healthy_only: If True, applies `func` only to actors currently tagged
                "healthy", otherwise to all actors. If `healthy_only=False` and
                later, `self.fetch_ready_async_reqs()` is called with
                `mark_healthy=True`, will send `func` to all actors and mark those
                actors "healthy" that respond to the request within `timeout_seconds`
                and are currently tagged as "unhealthy".
            remote_actor_ids: Apply func on a selected set of remote actors.
                Note, for fault tolerance reasons, these returned ObjectRefs should
                never be resolved with ray.get() outside of the context of this manager.

        Returns:
            The number of async requests that are actually fired.
        c                 N    g | ]!}j         |z                                   z  "S r,   )r   r   rT   ir   s     r   rV   zAFaultTolerantActorManager.foreach_actor_async.<locals>.<listcomp>+  s@           '!+t/@/@@     r   r   c                      dS rB   r,   r,   r   r   <lambda>z?FaultTolerantActorManager.foreach_actor_async.<locals>.<lambda>8  s     r   rc   r   )r   r   r   rO   ranger   r   r   r   	enumeratezipr   r|   r   rJ   r   r   r   )r   ra   r1   rn   r   r   	num_callsnum_calls_to_makelimited_funclimited_kwargslimited_remote_actor_idsr   fraidnum_outstanding_reqs_for_tagr   r   idcalls   `                  r   foreach_actor_asyncz-FaultTolerantActorManager.foreach_actor_async  s   Z   	0#~~// $%%'CIII &$'''V%&& 	 ,----       y))     
 ""i/""""doo&7&77"" 	-1-J-J&;K .K . .*D&* -8		,B,BdD!! 	:t99$4 5 55555LN')$ )#d4D*E*E F F : :9At/3/H0,,S11 - 13DT3JJCD D &d+++q0+++%/%=%=Qq		FLbA ''***"))!,,,,33D999:  L#N')$( 	: 	:/3/H0,,S11 - 13DT3JJCD D &d+++q0+++,33D999' 	1((!5 ) 
 
 4lCC 	> 	>HB%b)<<SAAA58"ID+D11<   r   r,   g        r   r   r   r   r   .c                ,   |                      |          \  }}}|                     ||||||          \  }}	t          ||	          D ]K\  }
}|
| j        v r=| j        |
         \  }}| j        |j                                     |           | j        |
= L|	S )aB  Get results from outstanding async requests that are ready.

        Automatically mark actors unhealthy if they fail to respond.

        Note: If tags is an empty tuple then results from all ready async requests are
        returned.

        Args:
            timeout_seconds: ray.get() timeout. Default is 0, which only fetched those
                results (immediately) that are already ready.
            tags: A tag or a list of tags to identify the results from this async call.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark all those actors healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that actors are NOT set to unhealthy, if they simply time out,
                meaning take a longer time to fulfil the remote request. We only ever
                mark an actor unhealthy, if they raise a RayActorError inside the remote
                request.
                Also note that this settings is ignored if the preceding
                `foreach_actor_async()` call used the `healthy_only=True` argument (b/c
                `mark_healthy` only affects actors that are currently tagged as
                unhealthy).

        Returns:
            A list of return values of all calls to `func(actor)` that are ready.
            The values may be actual data returned or exceptions raised during the
            remote call in the format of RemoteCallResults.
        r   )_filter_calls_by_tagr   r   r   r   r/   r   )r   r   r   r   r   r   r   
valid_tagsreadyr   obj_refr   r1   r/   s                 r   fetch_ready_async_reqsz0FaultTolerantActorManager.fetch_ready_async_reqsk  s    N 6:5N5Nt5T5T2&
 $ 2 2-%++% !3 !
 !
~  #5.99 	= 	=OGV$999 $ ? HX)&/:MMcRRR3G<r   )rn   r   r   r   r   r   r[   return_actor_idsr[   r   c                   |                      ||||          }|                     |||||           t                              ||	           |
rd |                                D             S d |                                D             S )a:  Calls the given function asynchronously and returns previous results if any.

        This is a convenience function that calls `fetch_ready_async_reqs()` to get
        previous results and then `foreach_actor_async()` to start new async calls.

        Args:
            func: A single Callable applied to all specified remote actors or a list
                of Callables, that get applied on the list of specified remote actors.
                In the latter case, both list of Callables and list of specified actors
                must have the same length. Alternatively, you can use the name of the
                remote method to be called, instead, or a list of remote method names.
            tag: A tag to identify the results from this async call.
            kwargs: An optional single kwargs dict or a list of kwargs dict matching the
                list of provided `func` or `remote_actor_ids`. In the first case (single
                dict), use `kwargs` on all remote calls. The latter case (list of
                dicts) allows you to define individualized kwarg dicts per actor.
            timeout_seconds: Time to wait for results from previous calls. Default is 0,
                meaning those requests that are already ready.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark all those actors healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
            healthy_only: Apply `func` on known-to-be healthy actors only.
            remote_actor_ids: Apply func on a selected set of remote actors.
            ignore_ray_errors: Whether to ignore RayErrors in results.
            return_actor_ids: Whether to return actor IDs in the results.
                If True, the results will be a list of (actor_id, result) tuples.
                If False, the results will be a list of results.
        Returns:
            The results from previous async requests that were ready.
        r   )r1   rn   r   r   )r[   c                 D    g | ]}|j         |                                fS r,   )r/   r%   rS   s     r   rV   zMFaultTolerantActorManager.foreach_actor_async_fetch_ready.<locals>.<listcomp>  s'    RRRaQZ)RRRr   c                 6    g | ]}|                                 S r,   )r%   rS   s     r   rV   zMFaultTolerantActorManager.foreach_actor_async_fetch_ready.<locals>.<listcomp>  s     DDDAEEGGDDDr   )r   r   rr    handle_remote_call_result_errorsrX   )r   ra   r1   rn   r   r   r   r   r   r[   r   r   s               r   foreach_actor_async_fetch_readyz9FaultTolerantActorManager.foreach_actor_async_fetch_ready  s    ^ 44++%	 5 
 
 	  %- 	! 	
 	
 	
 	"BB/ 	C 	
 	
 	

  	ERR>3O3O3Q3QRRRRDD^%A%A%C%CDDDDr   results_or_errorsc                    | D ]L}|j         r
|r-t                              |                                           9|                                dS )a  Checks given results for application errors and raises them if necessary.

        Args:
            results_or_errors: The results or errors to check.
            ignore_ray_errors: Whether to ignore RayErrors within the elements of
                `results_or_errors`.
        N)r#   rf   rg   r%   )r   r[   r0   s      r   r   z:FaultTolerantActorManager.handle_remote_call_result_errors  sj      1 		, 		,O! ," ,  !4!4!6!67777 &))+++		, 		,r   c                      t           j                  } fd                                 D             }g }|r(                     d |d|d|          }d |D             } j                                         ||z   S )a  Ping all unhealthy actors to try bringing them back.

        Args:
            timeout_seconds: Timeout in seconds (to avoid pinging hanging workers
                indefinitely).
            mark_healthy: Whether to mark all those actors healthy again that are
                currently marked unhealthy AND that respond to the `ping` remote request
                (within the given `timeout_seconds`).
                Note that actors are NOT set to unhealthy, if they simply time out,
                meaning take a longer time to fulfil the remote request. We only ever
                mark and actor unhealthy, if they return a RayActorError from the remote
                request.
                Also note that this settings is ignored if `healthy_only=True` (b/c this
                setting only affects actors that are currently tagged as unhealthy).

        Returns:
            A list of actor IDs that were restored by the `ping.remote()` call PLUS
            those actors that were previously restored via other remote requests.
            The cached set of such previously restored actors will be erased in this
            call.
        c                 >    g | ]}                     |          |S r,   r   )rT   r/   r   s     r   rV   zDFaultTolerantActorManager.probe_unhealthy_actors.<locals>.<listcomp>(  s=     
 
 
((22

 
 
r   c                 *    |                                  S r!   )r`   )r   s    r   r   zBFaultTolerantActorManager.probe_unhealthy_actors.<locals>.<lambda>2  s    5::<< r   F)ra   r   r   r   r   r   c                 *    g | ]}|j         	|j        S r,   )r#   r/   )rT   r   s     r   rV   zDFaultTolerantActorManager.probe_unhealthy_actors.<locals>.<listcomp>9  s1     $ $ $$*	$$ $ $r   )r   r   r   r   r   )r   r   r   already_restored_actorsunhealthy_actor_idsjust_restored_actorsr   s   `      r   probe_unhealthy_actorsz0FaultTolerantActorManager.probe_unhealthy_actors  s    : #'t'<"="=
 
 
 
 NN,,
 
 
  " 	!////!4" / %) 0  N$ $.<$ $ $  	##%%% ')===r   )rn   r   c                   ||                                  }g }t          |t                    rt          |          t          |          k    s
J d            t          |t                    s
J d            t	          t          ||                    D ]\  }\  }}t          |t                    rY|                     t          | j	        |         |          j
        di t          |t                    r||         n|pi            v|                    | j	        |         j        
                    |                     nt          |t                    rnt	          |          D ]]\  }}|                     t          | j	        |         |          j
        di t          |t                    r||         n|pi            ^n9|D ]6}|                     | j	        |         j        j
        dd|i|pi            7|S )a  Apply functions on a list of remote actors.

        Args:
            func: A single Callable applied to all specified remote actors or a list
                of Callables, that get applied on the list of specified remote actors.
                In the latter case, both list of Callables and list of specified actors
                must have the same length. Alternatively, you can use the name of the
                remote method to be called, instead, or a list of remote method names.
            kwargs: An optional single kwargs dict or a list of kwargs dict matching the
                list of provided `func` or `remote_actor_ids`. In the first case (single
                dict), use `kwargs` on all remote calls. The latter case (list of
                dicts) allows you to define individualized kwarg dicts per actor.
            remote_actor_ids: Apply func on this selected set of remote actors.

        Returns:
            A list of ObjectRefs returned from the remote calls.
        Nz>Funcs must have the same number of callables as actor indices.zBIf func is a list of functions, kwargs has to be a list of kwargs.ra   r,   )r   r   r   rO   r   r   r6   rJ   getattrr   remoterp   )r   ra   rn   r   callsr   r   r   s           r   r   z&FaultTolerantActorManager._call_actorsD  su   0 ##~~//dD!! 	Y'((C- -   O     T TST T  !*#.>*E*E F F E E9D!a%% ELL=T 2A66=   $.fd#;#;!4q		&,l	     LLd!3!9!@!@!C!CDDDDE c"" 		Y$%566  4<GDL.55<  (264(@(@T6!99v|QS     ) Y Y<T\$/5<WW$W&,TVWWXXXXr   )r   r   r   r   c          	      (   |t          |          nd}|sg t                      fS t          j        |t	          |          ||           \  }}	t                      }
|D ]}||                    |                   }||                    |                   }|r&|
                    |t          |          |           a	 t          j        |          }|
                    |t          |          |           |rZ| 	                    |          sEt                              d| d           |                     |d           | xj        dz  c_        # t          $ r}|
                    |t          |	          |           | 	                    |          r.t                              d
t!          |           d| d           |                     |d           Y d}~d}~ww xY wt	          |          t	          |
          k    sJ ||
fS )a  Try fetching results from remote actor calls.

        Mark whether an actor is healthy or not accordingly.

        Args:
            remote_actor_ids: IDs of the actors these remote
                calls were fired against.
            remote_calls: List of remote calls to fetch.
            tags: List of tags used for identifying the remote calls.
            timeout_seconds: Timeout (in sec) for the ray.wait() call. Default is None,
                meaning wait indefinitely for all results.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark certain actors healthy based on the results
                of these remote calls. Useful, for example, to make sure actors
                do not come back without proper state restoration.

        Returns:
            A list of ready ObjectRefs mapping to the results of those calls.
        N)num_returnstimeoutfetch_local)r   z1Bringing previously unhealthy, now-healthy actor z back into service.T)r   rc   )r   zRay error (z), taking actor z out of service.F)floatr8   r   waitrO   indexrK   r   r%   r   rf   warningr   r   r   r   r6   )r   r   r   r   r   r   r   r  readiesr   r   r   r/   r1   r   ro   s                   r   r   z'FaultTolerantActorManager._fetch_result  s}   B -<,G%(((T  	+(****XL))++
 
 

 +,, *	2 *	2E'(:(:5(A(ABH|))%001C  ))(M4O4O4OQTUUU2( ))(M4P4P4PRUVVV   2(=(=h(G(G 2NN-H - - -   ((4(@@@,,1,,3  
> 
> 
>))(M4J4J4JCPPP ((22 LLXc!ffXXhXXX   $$Xu$========
>8 7||s>222222&&s   E
G+!A?G&&G+)rn   c                    t          |t                    rt          |          t          |          k    s
J d            g }g }g }t          t	          ||                    D ]}\  }\  }}	                     |	          r`t          |t                    r||         n|pi }
|                    |           |                    |
           |                    |	           ~|}|}|}n fd|D             }|||fS )a  Filter out func and remote worker ids by actor state.

        Args:
            func: A single, or a list of Callables.
            kwargs: An optional single kwargs dict or a list of kwargs dicts matching
                the list of provided `func` or `remote_actor_ids`. In case of a single
                dict, uses `kwargs` on all remote calls. In case of a list of dicts,
                the given kwarg dicts are per actor `func` or per `remote_actor_ids`.
            remote_actor_ids: IDs of potential remote workers to apply func on.

        Returns:
            A tuple of (filtered func, filtered remote worker ids).
        z@Func must have the same number of callables as remote actor ids.c                 >    g | ]}                     |          |S r,   r   r   s     r   rV   zFFaultTolerantActorManager._filter_by_healthy_state.<locals>.<listcomp>  s-    XXXat?T?TUV?W?WXXXXr   )r   r   rO   r   r   r   rJ   )r   ra   rn   r   	temp_functemp_remote_actor_idstemp_kwargsr   r   r   r   s   `          r   r   z2FaultTolerantActorManager._filter_by_healthy_state  sB   ( dD!! 	Y'((C- -   Q  
 I$&!K )#d4D*E*E F F 7 79At((.. 7%/%=%=Qq		FLbA$$Q'''&&q))))00666D F4  YXXX+;XXXV---r   c                    |t                      }net          |t                    r|h}nLt          |t          t          f          rt          |          }n t          dt          |           d          g }g }g }| j                                        D ]^\  }\  }}t          |          dk    s||v r?|
                    |           |
                    |           |
                    |           _|||fS )a  Return all the in flight requests that match the given tags, if any.

        Args:
            tags: A str or a list/tuple of str. If tags is empty or None, return all the in
                flight requests.

        Returns:
            A tuple consisting of a list of the remote calls that match the tag(s),
            a list of the corresponding remote actor IDs for these calls (same length),
            and a list of the tags corresponding to these calls (same length).
        Nz6tags must be either a str or a list/tuple of str, got .r   )r   r   r6   r   tupler   typer   r   rO   rJ   )r   r   r   r   r   r   r1   r/   s           r   r   z.FaultTolerantActorManager._filter_calls_by_tag  s    <55DDc"" 	6DDtUm,, 	t99DDVdVVV   
%)%D%J%J%L%L 	' 	'!D/34yyA~~##D))) ''111!!#&&&-z99r   c                     t          | j                                                  D ]\  }\  }}||k    r| j        |= || j        v r&| j        |         j                                         dS dS )zRemove internal async state of for a given actor.

        This is called when an actor is removed from the pool or being marked
        unhealthy.

        Args:
            actor_id: The id of the actor.
        N)r   r   r   r   rv   r   )r   r/   reqr1   r   s        r   r   z-FaultTolerantActorManager._remove_async_state6  s~     #4#B#H#H#J#JKK 	9 	9NC#rX~~3C8 t000%1%%''''' 10r   c                     | j         S r!   )r   r"   s    r   r   z FaultTolerantActorManager.actorsL  s     |r   )Nr   r   r!   )NF)/r&   r'   r(   r)   r   r   r   r
   r   r4   r   r   r   r   r   r   r   r   r   r6   r   r   r   r   r   r   r   r   r   r  r8   r   r   r   r   r   staticmethodr   r   r   	ObjectRefr   r   r   r   r   r   r,   r   r   rr   rr      sJ	       # #J F F F F F F F YF> /378	%% %%k*+%% 25%% 	%% %% %% %%N )49 ) ) ) \) Q49 Q Q Q \Q 	k!2 	 	 	 \	 S [    \& !C ! ! ! \! MC M M M \M (C ( ( ( \( 
 
hsm 
s 
 
 
 \
 > > > > > \> / /d /t / / / \/0 0 0 \0 
 IM!04+/ %"J J JHcUCZ($xs
/C*Dc4PS9TUJ tCH~tDcN/CCDE	J
 J #49-J "%J J J 
J J J \JX  "y!
 IM!04y! y! y!HcUCZ($xs
/C*Dc4PS9TUy! c]y!
 tCH~tDcN/CCDEy! y! #49-y! 
y! y! y! \y!v  8:+. %"8 8 8 CcE#s(O348 "%	8
 8 8 
8 8 8 \8t  "GE
 IM+. %"!04"&!&GE GE GEHcUCZ($xs
/C*Dc4PS9TUGE c]GE
 tCH~tDcN/CCDEGE "%GE GE GE GE #49-GE  GE GE 
eE#s(OS()	*GE GE GE \GER ,,,  , 
	, , , \,.  ,0"9> 9>!%9> 9> 
c	9> 9> 9> \9>~ IM&*= = =HcUCZ($xs
/C*Dc4PS9TU= tCH~tDcN/CCDE	=
 s)= 
cm	= = = =~  ,0 %"`' `' `' s)`' 3=)	`'
 3i`' "%`' `' `' 
tCM"$55	6`' `' `' \`'L 59	*. *. *. HcUCZ($xs
/C*DDE*. tT$Z/01	*.
 s)*. *. *. *.Z HL": ":U3S	5c?#BCD":	tCM"D$5tCy@	A": ": ": ":H:C : : : :,    r   rr   )$rM   loggingrk   rh   collectionsr   dataclassesr   r   typingr   r   r   r	   r
   r   r   r   r   	ray.actorr   ray.exceptionsr   r   ray.rllib.utils.typingr   ray.util.annotationsr   	getLoggerr&   rf   r   r.   r8   r]   rr   r,   r   r   <module>r     s%     



  # # # # # # ( ( ( ( ( ( ( ( N N N N N N N N N N N N N N N N N N N N 



 ! ! ! ! ! ! 1 1 1 1 1 1 1 1 $ $ $ $ $ $ - - - - - -		8	$	$                        F 
* * * * * * *  ** D
 D
 D
 D
 D
 D
 D
 D
N + + + + + + + +\ J J J J J J J J J Jr   