
    &`iuv                     D   d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZmZmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lm Z  d dl!m"Z" erd dl#m$Z$  ej%        e&          Z' e
d          Z(e" G d de j)                              Z*dS )    N)	TYPE_CHECKINGAnyCallableDictListOptionalTupleTypeVarUnion)ActorHandle)RayActorError)!COMPONENT_ENV_TO_MODULE_CONNECTORCOMPONENT_LEARNER!COMPONENT_MODULE_TO_ENV_CONNECTORCOMPONENT_RL_MODULE)LearnerGroup)FaultTolerantActorManager)NUM_ENV_STEPS_SAMPLED_LIFETIMEWEIGHTS_SEQ_NO)Runner)PolicyID)DeveloperAPI)AlgorithmConfigTc                   D   e Zd Z	 	 	 	 	 dFdddee         dee         d	ee         d
ededeeef         ddfdZ	dddddded         dedee         dee         deeef         ddfdZ
dGdededdfdZdefdZdddedededddef
dZdddddddddddee         dee         deeeeef                           deeeef                  deee                  fdZ	 	 	 	 	 dHd!eee                  d"eeed#f                  d$eee                  d%ee         d&ee         ddfd'Zd(ee         ddfd)ZdId*Zdddddddd+d,eeegef         eeegef                  eee         f         ded-ed.ee         d%ee         d/ed0edee         fd1Zddd2d,eeegef         eeegef                  eee         f         d-ed.ee         defd3Zd ddd4d%ee         d/ed0edeeeef                  fd5Zdee         fd6Ze e!j"        d7                         Z#e e!j"        defd8                        Z$e dJd9            Z%e defd:            Z&e dee         fd;            Z'e e!j"        defd<                        Z(e defd=            Z)e defd>            Z*e defd?            Z+e defd@            Z,e defdA            Z-e e!j"        dB                         Z.e e!j"        dC                         Z/e e!j"        dD                         Z0e e!j"        dE                         Z1dS )KRunnerGroupFNr   Tconfigr   local_runnerlogdirtune_trial_id	pg_offset_setupkwargsreturnc                    t          |t                    rt          j        |          n|pt                      | _        || _        t          j        | j                  | _        || _	        || _
        || _        t          | j        d          | _        |rL	  | j        d|| j        |d| d S # t"          $ r&}|j        r|j        d         j        d         |d }~ww xY wd S )N   )'max_remote_requests_in_flight_per_actorinit_id)r   num_runnersr   r       )
isinstancedictr   	from_dictr   _remote_configrayput_remote_config_obj_ref_tune_trial_id
_pg_offset_logdirr   "_max_requests_in_flight_per_runner_worker_managerr"   r)   r   actor_init_failedargs)	selfr   r   r   r    r!   r"   r#   es	            x/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/utils/runners/runner_group.py__init__zRunnerGroup.__init__*   s3   $ &$''/O%f----O-- 	 %&)gd.A&B&B#+#8484[ 
  
  

  	 ! $ 0!-  	     !   
 &  &).++ G!	 	s   B/ /
C9!CC)r   r)   r   validater)   r>   c                    d | _         |dk    rd}|| _         | j        |fd||n| j        i| |r | j        dd|| j        d|| _         d S d S )Nr   Tr>   runner_indexr)   r   r+   )_local_runner_RunnerGroup__local_configadd_runners$_validate_runners_after_construction_make_runner_local_config)r:   r   r)   r   r>   r#   s         r<   r"   zRunnerGroup._setupj   s     "!L$ 		
 	
 # X:		

 	
 	
 	
  	!2!2 "')" " 	" "D	 	    c                       j                                          fdt                    D             } j                             |           |r                                  dS dS )z?Creates and adds a number of remote runners to this runner set.c           	      N    g | ]!} j         d|z   d z   z   j        d"S )r&   r@   r+   )rF   r2   ).0ir#   r)   old_num_runnersr:   s     r<   
<listcomp>z+RunnerGroup.add_runners.<locals>.<listcomp>   sk     
 
 
  D ,q014+k9 2   
 
 
rH   N)r7   
num_actorsrange
add_actorsr>   )r:   r)   r>   r#   new_runnersrM   s   `` ` @r<   rD   zRunnerGroup.add_runners   s     .99;;
 
 
 
 
 
 
 ;''
 
 
 	''444  	MMOOOOO	 	rH   c           	          | j                             d           D ]^}|j        sU|                                }| j        r8t
                              d| j        j         dt          |                      \|_d S )Nc                 *    |                                  S N)assert_healthyws    r<   <lambda>z&RunnerGroup.validate.<locals>.<lambda>   s    1CSCSCUCU rH   zValidation of z failed! Error=)
r7   foreach_actorokget_ignore_ray_errors_on_runnersloggererror
runner_cls__name__str)r:   resultr;   s      r<   r>   zRunnerGroup.validate   s    *889U9UVV 
	 
	F 9 JJLL5 LLZ)AZZRUVWRXRXZZ    G
	 
	rH   )recreated_runnerrA   rd   c          
      <   t          d||||| j        | j        d|}|dk    r | j        di |S t          j                                        dn	| j        |z   }  t	          j        di | j	        | j                  
                    |          j        di |S )N)r   worker_indexnum_workersrecreated_workerlog_dirr    r   )placement_group_bundle_indexr+   )r-   r5   r3   r`   r0   utilget_current_placement_groupr4   remote_remote_argsoptions)r:   rA   r)   rd   r   r#   pg_bundle_idxs          r<   rF   zRunnerGroup._make_runner   s      
%#-L-
 
 
 
 1"4?,,V,,,
 x3355= B</ 	+CJ++*++DO<<W-W@@  	
rH   )from_runnerenv_steps_sampledconnector_statesrl_module_staterunner_indices_to_updateenv_to_modulemodule_to_envrr   rs   rt   ru   rv   c                   |p| j         }|j        p|j        dk    o|j        }
|j        }| j        dk    r2| j         r+| j                             i |	t          |ini |pi            |
s|sdS |
r|g k    ri }nG||                     d d|j                  }d |D             }d |D             }| j         Jt          | j         d	          r5t          | j         d
          r |J | j         j
        }|J | j         j        }i }|r/|                    t          |                    |          i           |r/|                    t          |                    |          i           nY|5t          |                                t          |                                i}n"|                    t          t          g          }|||j        pdz  |t          <   |s| j         | j                             |           n\|                    |                    t                    i            |                    |                    t                    i            |                    t          d           |                    t          d           |rd| j         |r| j                             |           |r|                    |           |                     dt)          |          |dd           dS dS )z>Synchronizes the connectors of this `RunnerGroup`'s `Runner`s.training_onlyr   Nc                 F    |                      t          t          g          S )N
components)	get_stater   r   rW   s    r<   rY   z0RunnerGroup.sync_runner_states.<locals>.<lambda>  s$    !++ A A( #. # # rH   F)r   timeout_secondsc                 :    g | ]}t           |v |t                    S r+   )r   rK   ss     r<   rN   z2RunnerGroup.sync_runner_states.<locals>.<listcomp>  3     ( ( (8A== 78===rH   c                 :    g | ]}t           |v |t                    S r+   )r   r   s     r<   rN   z2RunnerGroup.sync_runner_states.<locals>.<listcomp>  r   rH   _env_to_module_module_to_envr|   r&   	set_statestate        )r#   remote_worker_idsr   r   )r   merge_runner_statesin_evaluationbroadcast_runner_statesnum_healthy_remote_runnersr   r   foreach_runner)sync_filters_on_rollout_workers_timeout_shasattrr   r   updater   merge_statesr   r~   r)   r\   popr-   )r:   r   rr   rs   rt   ru   rv   rw   rx   r#   merge	broadcastrunner_statesenv_to_module_statesmodule_to_env_statess                  r<   sync_runner_stateszRunnerGroup.sync_runner_states   s     "6T%6* 
&/9Rf>R 	 2	
 *a//D4E/'' -8 89JKK	 ',"	 	 	  	Y 	F  E	2%% "#+'+':':  &+"L (; ( ($( (-( ( ($
( (-( ( ($ %1 13CDD 2 13CDD 2 )000$($5$DM(000$($5$DM "' !((= - : :;O P P   ( !((= - : :;O P P   "5}7N7N7P7P5}7N7N7P7P!
 !, 5 599  !6 ! ! (<M"'a=M89  	G ,!++M::::''!%%&GHH"   ''!%%&GHH"   ?FFF?FFF  	  ,,!++M:::
  6$$_555 -000":" #       	 	rH   r   policiesfrom_worker_or_learner_groupr   to_worker_indicesr   inference_onlyc                    | j         |t          d          d}| j        s|)||n| j         }|t          d          |d |D             nt          g}	t          |t                    r-|                    d |	D             |          t                   }n[t          |t                    r/t          j        |j                            |	|                    }n|                    |	|          }d |                                D             }t          j        |          }
|                     dt!          |
	          d
||           | j         || j                             |           dS dS dS )a  Syncs model weights from the given weight source to all remote workers.

        Weight source can be either a (local) rollout worker or a learner_group. It
        should just implement a `get_weights` method.

        Args:
            policies: Optional list of PolicyIDs to sync weights for.
                If None (default), sync weights to/from all policies.
            from_worker_or_learner_group: Optional (local) `Runner` instance or
                LearnerGroup instance to sync from. If None (default),
                sync from this `Runner`Group's local worker.
            to_worker_indices: Optional list of worker indices to sync the
                weights to. If None (default), sync to all remote workers.
            global_vars: An optional global vars dict to set this
                worker to. If None, do not update the global_vars.
            timeout_seconds: Timeout in seconds to wait for the sync weights
                calls to complete. Default is 0.0 (fire-and-forget, do not wait
                for any sync calls to finish). Setting this to 0.0 might significantly
                improve algorithm performance, depending on the algo's `training_step`
                logic.
            inference_only: Sync weights with workers that keep inference-only
                modules. This is needed for algorithms in the new stack that
                use inference-only modules. In this case only a part of the
                parameters are synced to the workers. Default is False.
        NzhNo `local_runner` in `RunnerGroup`! Must provide `from_worker_or_learner_group` arg in `sync_weights()`!z}`from_worker_or_trainer` is None. In this case, `RunnerGroup`^ should have `local_runner`. But `local_runner` is also `None`.c                 (    g | ]}t           d z   |z   S /)r   )rK   ps     r<   rN   z,RunnerGroup.sync_weights.<locals>.<listcomp>  s#    AAA1$s*Q.AAArH   c                 (    g | ]}t           d z   |z   S r   )r   )rK   ms     r<   rN   z,RunnerGroup.sync_weights.<locals>.<listcomp>  s#    MMM 1C 7! ;MMMrH   )r}   r   c                 :    i | ]\  }}|t           t          fv ||S r+   )r   r   )rK   kvs      r<   
<dictcomp>z,RunnerGroup.sync_weights.<locals>.<dictcomp>  s9       Aq,n=== 1===rH   r   r   F)funcr#   r   r   r   )r   	TypeErrornum_remote_runners
ValueErrorr   r,   r   r~   r   r   r0   r\   rn   itemsr1   r   r-   r   )r:   r   r   r   r   r   r#   ru   weights_srcmodulesrl_module_state_refs              r<   sync_weightszRunnerGroup.sync_weightsz  s   D $)E)MJ   " B	&B&N 0; -,&  " U   ' BAAAAA)*  +|44 "-"7"7MMWMMM#1 #8 # # $#% k;77 &)g#-44'.+9 5  ' 'OO '2&;&;#*'5 '< ' 'O +1133  O #&'/":":  "5666""3 /      (+7!++O<<<<< )(77rH   new_remote_runnersc                 l    | j                                          | j                             |           dS )zHard overrides the remote `Runner`s in this set with the provided ones.

        Args:
            new_remote_workers: A list of new `Runner`s (as `ActorHandles`) to use as
                new remote workers.
        N)r7   clearrQ   )r:   r   s     r<   resetzRunnerGroup.reset  s7     	""$$$''(:;;;;;rH   c                     	 |                      d dd           n*# t          $ r t                              d           Y nw xY w| j                                         dS # | j                                         w xY w)z8Calls `stop` on all `Runner`s (including the local one).c                 *    |                                  S rU   )stoprW   s    r<   rY   z"RunnerGroup.stop.<locals>.<lambda>  s    !&&(( rH   FT)healthy_onlyr   zFailed to stop workers!N)r   	Exceptionr^   	exceptionr7   r   r:   s    r<   r   zRunnerGroup.stop  s    
	) ""T        	8 	8 	8677777	8  &&(((((D &&((((s&    A! $A A! AA! !A<)r#   r   r   r   r   return_obj_refsmark_healthyr   r   r   r   r   c          	         |r|r
J d            g }	|ra| j         Z|r|d         }
|dd         }ni }
|}t          |t                    r t          | j         |          di |
g}	n || j         fi |
g}	| j                                        s|	S | j                            |||||||          }t          j        || j	                   d |
                                D             }|	|z   S )	a  Calls the given function with each `Runner` as its argument.

        Args:
            func: The function to call for each `Runner`s. The only call argument is
                the respective `Runner` instance.
            local_env_runner: Whether to apply `func` to local `Runner`, too.
                Default is True.
            healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
            remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.
                Use None (default) for all remote `Runner`s.
            timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
                fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
                synchronous execution).
            return_obj_refs: Whether to return `ObjectRef` instead of actual results.
                Note, for fault tolerance reasons, these returned ObjectRefs should
                never be resolved with ray.get() outside of this `RunnerGroup`.
            mark_healthy: Whether to mark all those `Runner`s healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that `Runner`s are NOT set unhealthy, if they simply time out
                (only if they return a `RayActorError`).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects `Runner`s that are currently tagged as
                unhealthy).

        Returns:
             The list of return values of all calls to `func([worker])`.
        z-Can not return `ObjectRef` from local worker.Nr   r&   )r#   r   remote_actor_idsr   r   r   ignore_ray_errorsc                 6    g | ]}|                                 S r+   )r\   rK   rs     r<   rN   z.RunnerGroup.foreach_runner.<locals>.<listcomp>Q  s     JJJa!%%''JJJrH   r+   )r   r,   rb   getattrr7   	actor_idsrZ   r    handle_remote_call_result_errorsr]   ignore_errors)r:   r   r#   r   r   r   r   r   r   local_resultlocal_kwargsremote_resultss               r<   r   zRunnerGroup.foreach_runner  sc   R  	;'3	; 	;:	; 	;3  
	ID-9  %ay!$$$ I @(94 @ @ P P< P PQ $T%6 G G, G GH#--// 	 -;;%.++% < 
 
 	"Bd.P	
 	
 	
 	

 KJ>+G+G+I+IJJJn,,rH   )r   r   c                <    | j                             |||          S )a  Calls the given function asynchronously with each `Runner` as the argument.

        Does not return results directly. Instead, `fetch_ready_async_reqs()` can be
        used to pull results in an async manner whenever they are available.

        Args:
            func: The function to call for each `Runner`s. The only call argument is
                the respective `Runner` instance.
            healthy_only: Apply `func` on known-to-be healthy `Runner`s only.
            remote_worker_ids: Apply `func` on a selected set of remote `Runner`s.

        Returns:
             The number of async requests that have actually been made. This is the
             length of `remote_worker_ids` (or self.num_remote_workers()` if
             `remote_worker_ids` is None) minus the number of requests that were NOT
             made b/c a remote `Runner` already had its
             `max_remote_requests_in_flight_per_actor` counter reached.
        )r   r   )r7   foreach_actor_async)r:   r   r   r   s       r<   foreach_runner_asyncz RunnerGroup.foreach_runner_asyncU  s-    2 #77%. 8 
 
 	
rH   r   r   r   c                    | j                             |||          }t          j        || j                   d |                                D             S )a  Get esults from outstanding asynchronous requests that are ready.

        Args:
            timeout_seconds: Time to wait for results. Default is 0, meaning
                those requests that are already ready.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark all those workers healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that workers are NOT set unhealthy, if they simply time out
                (only if they return a RayActorError).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects workers that are currently tagged as
                unhealthy).

        Returns:
            A list of results successfully returned from outstanding remote calls,
            paired with the indices of the callee workers.
        r   r   c                 D    g | ]}|j         |                                fS r+   )actor_idr\   r   s     r<   rN   z6RunnerGroup.fetch_ready_async_reqs.<locals>.<listcomp>  s'    NNN!QUUWW%NNNrH   )r7   fetch_ready_async_reqsr   r   r]   r   )r:   r   r   r   r   s        r<   r   z"RunnerGroup.fetch_ready_async_reqst  ss    4 -DD++% E 
 
 	"B"@	
 	
 	
 	

 ON~/K/K/M/MNNNNrH   c                 D    | j                             | j        d          S )zChecks for unhealthy workers and tries restoring their states.

        Returns:
            List of IDs of the workers that were restored.
        T)r   r   )r7   probe_unhealthy_actorsrunner_health_probe_timeout_sr   s    r<   probe_unhealthy_runnersz#RunnerGroup.probe_unhealthy_runners  s-     #:: > ; 
 
 	
rH   c                     dS )z>Number of seconds to wait for health probe calls to `Runner`s.Nr+   r   s    r<   r   z)RunnerGroup.runner_health_probe_timeout_s        rH   c                     dS )zClass for each runner.Nr+   r   s    r<   r`   zRunnerGroup.runner_cls  r   rH   c                     | j         S )z(Returns the config for a local `Runner`.)rC   r   s    r<   rG   zRunnerGroup._local_config  s     ""rH   c                     | j         S )zReturns the local `Runner`.)rB   r   s    r<   r   zRunnerGroup.local_runner  s     !!rH   c                 4    | j                                         S )z(Returns the list of remote `Runner` IDs.)r7   healthy_actor_idsr   s    r<   healthy_runner_idszRunnerGroup.healthy_runner_ids  s     #55777rH   c                     dS )z)Number of runners to schedule and manage.Nr+   r   s    r<   r)   zRunnerGroup.num_runners  r   rH   c                 4    | j                                         S )zNumber of remote `Runner`s.)r7   rO   r   s    r<   r   zRunnerGroup.num_remote_runners  s     #..000rH   c                 4    | j                                         S )z/Returns the number of healthy remote `Runner`s.)r7   num_healthy_actorsr   s    r<   r   z&RunnerGroup.num_healthy_remote_runners       #66888rH   c                 n    t          t          | j                            |                                 z   S )z(Returns the number of healthy `Runner`s.)intboolrB   r   r   s    r<   num_healthy_runnerszRunnerGroup.num_healthy_runners  s.     4*++,,t/N/N/P/PPPrH   c                 4    | j                                         S )z/Returns the number of in-flight async requests.)r7   num_outstanding_async_reqsr   s    r<   num_in_flight_async_reqsz$RunnerGroup.num_in_flight_async_reqs  s     #>>@@@rH   c                 4    | j                                         S )zIReturns the number of times managed remote `Runner`s have been restarted.)r7   total_num_restartsr   s    r<   num_remote_runner_restartsz&RunnerGroup.num_remote_runner_restarts  r   rH   c                     dS )z!Remote arguments for each runner.Nr+   r   s    r<   ro   zRunnerGroup._remote_args  r   rH   c                     dS )z'If errors in runners should be ignored.Nr+   r   s    r<   r]   z)RunnerGroup._ignore_ray_errors_on_runners  r   rH   c                     dS )z&Maximum requests in flight per runner.Nr+   r   s    r<   r6   z.RunnerGroup._max_requests_in_flight_per_runner  r   rH   c                     dS )z.If runners should validated after constructed.Nr+   r   s    r<   rE   z0RunnerGroup._validate_runners_after_construction  r   rH   )FNNr   T)F)NNNr   F)r$   N)r$   r   )2ra   
__module____qualname__r   r   rb   r   r   r   r=   r"   rD   r   r>   r   rF   r   r   r   r   r   floatr   r   r   r   r   r   r   r	   r   r   propertyabcabstractmethodr   r`   rG   r   r   r)   r   r   r   r   r   ro   r]   r6   rE   r+   rH   r<   r   r   (   s        (- $'+> >!> tn> >  }> > > sCx.> 
> > > >F /3',#'      *+  	 
 tn  4.  sCx.  
       D s d QU    0)    $ "'#
 #
 #
 #
 	#

 #
 "#
 
#
 #
 #
 #
R )-+/;?488<b b b "b f%	b
 $C=b #4S#X#78b "$sCx.1b #+49"5b b b bL .2PT15+.).r= r=4>*r= '/uV^5K/L&Mr= $DI.	r=
 "%r= !r= 
r= r= r= r=h<[(9 <d < < < <) ) ) )$ !!'++/ %"M- M- M-HfXq[)4&10E+FTRUYVWM-
 M- M-  9M- "%M- M- M- 
aM- M- M- M-f "'+
 
 
HfXq[)4&10E+FTRUYVW
 	

  9
 

 
 
 
D ,/ %"%O %O %O "%%O 	%O
 %O 
eCFm	%O %O %O %ON	
c 	
 	
 	
 	
 M M  XM %H % % %  X% # # # X# "f " " " X" 8DI 8 8 8 X8 8S 8 8 8  X8 1C 1 1 1 X1 9C 9 9 9 X9 QS Q Q Q XQ A# A A A XA 9C 9 9 9 X9 0 0  X0 6 6  X6 5 5  X5 = =  X= = =rH   r   )	metaclass)+r   loggingtypingr   r   r   r   r   r   r	   r
   r   r0   	ray.actorr   ray.exceptionsr   ray.rllib.corer   r   r   r   $ray.rllib.core.learner.learner_groupr   ray.rllib.utils.actor_managerr   ray.rllib.utils.metricsr   r   ray.rllib.utils.runners.runnerr   ray.rllib.utils.typingr   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   	getLoggerra   r^   r   ABCMetar   r+   rH   r<   <module>r     s   



 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 


 ! ! ! ! ! ! ( ( ( ( ( (            > = = = = = C C C C C C R R R R R R R R 1 1 1 1 1 1 + + + + + + - - - - - - FEEEEEE		8	$	$ GCLL F= F= F= F= F=CK F= F= F= F= F= F=rH   