
    &`i                        d dl Zd dlZd dlZd dlmZmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZ d dlm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZC d dlDmEZE erd dlFmGZG  e7            \  ZHZIZJ ejK        eL          ZM ed          ZNeE G d d                      ZOdS )    N)TYPE_CHECKINGAnyCallable
CollectionDictListOptionalTupleTypeTypeVarUnion)DEPRECATED_VALUEdeprecation_warning)ActorHandle)RayActorError)!COMPONENT_ENV_TO_MODULE_CONNECTORCOMPONENT_LEARNER!COMPONENT_MODULE_TO_ENV_CONNECTORCOMPONENT_RL_MODULE)LearnerGroup)validate_module_id)RLModuleSpec)BaseEnv)
EnvContext)	EnvRunner)RolloutWorker)get_dataset_and_shards)PolicyPolicyState)FaultTolerantActorManager)OldAPIStack)try_import_tf)NUM_ENV_STEPS_SAMPLED_LIFETIMEWEIGHTS_SEQ_NO)AgentID
EnvCreatorEnvType	EpisodeIDPartialAlgorithmConfigDictPolicyIDSampleBatchType
TensorType)DeveloperAPIAlgorithmConfigTc                   
   e Zd ZdZddddddddddeeddee         deeegdf                  dee	e
                  d	ed
         dedee         dedee         dedee         fdZddddddeeegdf                  d	ed
         dedefdZd Zedefd            Zdee         fdZdee         fdZdefdZdefdZdefdZdefdZdefdZdefdZdadee         defd Zdefd!Zdddddddd"d	d
d#ee         d$ee         d%eeeee f                           d&eeee f                  d'eee                  ddfd(Z!	 	 	 	 	 	 	 	 dbd+e"eege#f         eeege#f                  eee         f         d,eeee f                  dee         d-ee$         d.ed/ed0ed1ee         d2edee"e%ee#f         e#f                  fd3Z&	 	 	 	 	 	 dcd4eee'                  d5ee"ed6f                  d7eee                  d8eeee(f                  d-ee$         d9ee         ddfd:Z)ddd;ed<eddfd=Z*d>ee+         ddfd?Z,ded@Z-dddddd*d*dAd+e"eege#f         eeege#f                  eee         f         ded0ed1ee         d-ee$         d.ed/edee#         fdBZ.	 daddddCd+e"eege#f         eeege#f                  eee         f         dee         d0ed1ee         def
dDZ/dd)d*d*dEdFee"eee         e%e         f                  d-ee$         d.ed/edee%ee#f                  f
dGZ0e1d+eegee#         f         deee#                  fdH            Z2e1d+ee3e4gee#         f         deee#                  fdI            Z5dee         fdJZ6e1d+ee
e'ge#f         dee#         fdK            Z7e1d+ee
e'ge#f         dee#         fdL            Z8e1	 dadMe'dNee9         defdO            Z:e1	 	 dfdddddddedPdMe'dQee	e
                  dRee
         dSee;j<        j=                 dTee;j<        j=                 d	ee"d
e>f                  dUee?         dVeee@eAge'f                  dWee"eBe'         ee'ee9         gef         f                  dXeeC         dYeee"ee+f                           ddfdZ            ZDd*dd[dedeeegdf                  d\ed;ed]ed	d
d^eee'e%e;j<        j=        e;j<        j=        f         f                  de"ee+f         fd_ZEeFd`             ZGdS )gEnvRunnerGroupzcSet of EnvRunners with n @ray.remote workers and zero or one local worker.

    Where: n >= 0.
    NTr   )env_creatorvalidate_envdefault_policy_classconfiglocal_env_runnerlogdir_setuptune_trial_id	pg_offsetnum_env_runnersnum_workerslocal_workerr3   r4   r5   r6   r/   r7   r8   r9   r:   r;   r<   c                   |t           k    s|t           k    rt          ddd           ddlm} |s |            }n%t	          |t
                    r |j        |          }|| _        || _        || _	        t          j        | j	                  | _        | j	        j        | j	        j        | j	        j        |j        r|j        ndd| _        || _        |	| _        |j        | _        | j        a|j        rN|j        r$|j        rt3          d	          dd
lm} || _        n/|j        rddlm} || _        nddlm} || _        nt@          | _        || _!        |j"        p|j        | _#        tI          |j%        d          | _&        |rX	 | '                    |||
|
n|j(        |           dS # tR          $ r&}|j*        r|j+        d         j+        d         |d}~ww xY wdS )a  Initializes a EnvRunnerGroup instance.

        Args:
            env_creator: Function that returns env given env config.
            validate_env: Optional callable to validate the generated
                environment (only on worker=0). This callable should raise
                an exception if the environment is invalid.
            default_policy_class: An optional default Policy class to use inside
                the (multi-agent) `policies` dict. In case the PolicySpecs in there
                have no class defined, use this `default_policy_class`.
                If None, PolicySpecs will be using the Algorithm's default Policy
                class.
            config: Optional AlgorithmConfig (or config dict).
            local_env_runner: Whether to create a local (non @ray.remote) EnvRunner
                in the returned set as well (default: True). If `num_env_runners`
                is 0, always create a local EnvRunner.
            logdir: Optional logging directory for workers.
            _setup: Whether to actually set up workers. This is only for testing.
            tune_trial_id: The Ray Tune trial ID, if this EnvRunnerGroup is part of
                an Algorithm run as a Tune trial. None, otherwise.
        z,WorkerSet(num_workers=..., local_worker=...)z9EnvRunnerGroup(num_env_runners=..., local_env_runner=...)T)oldnewerrorr   r.   )num_cpusnum_gpus	resourcesmax_restartsNz,Multi-agent recording is not supported, yet.)OfflineSingleAgentEnvRunner)MultiAgentEnvRunner)SingleAgentEnvRunner   )'max_remote_requests_in_flight_per_actorinit_idr4   r6   r<   r7      ),r   r   %ray.rllib.algorithms.algorithm_configr/   
isinstancedict	from_dict_env_creator_policy_class_remote_configrayput_remote_config_obj_refnum_cpus_per_env_runnernum_gpus_per_env_runnercustom_resources_per_env_runnerrestart_failed_env_runnersmax_num_env_runner_restarts_remote_args_tune_trial_id
_pg_offsetenv_runner_cls"enable_env_runner_and_connector_v2outputis_multi_agent
ValueError$ray.rllib.offline.offline_env_runnerrG   $ray.rllib.env.multi_agent_env_runnerrH   %ray.rllib.env.single_agent_env_runnerrI   r   _logdirignore_env_runner_failures!_ignore_ray_errors_on_env_runnersr    %max_requests_in_flight_per_env_runner_worker_managerr9   r<   r   actor_init_failedargs)selfr3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r/   rG   rH   rI   es                     r/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/env/env_runner_group.py__init__zEnvRunnerGroup.__init__L   s   L ***l>N.N.NBO    	JIIIII  	7$_&&FF%% 	7._.v66F'1$&)gd.A&B&B#+C+C,L 422	
 	
 ,# %3&8 4 = C, 	J()WXXX      /J++, C      /B++      /C++&3#-R1R 	.  9<	 
  
  
  	!-! +6 (#3%5  	 	 	 	 	 !   
 &  &).++ G!	 	s   "F* *
G4!GGrM   c                   d| _         |dk    rd}|j                                        }|                    |j                   |                    d                              |          | _        |j        dk    rt          ||          \  | _	        | _
        nd| _	        d| _
        |                     ||j                   |rG| j                                        dk    r*|j        s#|j        r|j        s|                                 }nd}|r,|                     | j        |d|| j        |	          | _         dS dS )
aA  Sets up an EnvRunnerGroup instance.
        Args:
            validate_env: Optional callable to validate the generated
                environment (only on worker=0).
            config: Optional dict that extends the common config of
                the Algorithm class.
            num_env_runners: Number of remote EnvRunner workers to create.
            local_env_runner: Whether to create a local (non @ray.remote) EnvRunner
                in the returned set as well (default: True). If `num_env_runners`
                is 0, always create a local EnvRunner.
        Nr   TF)copy_frozen)tf_session_argsdataset)validate)r3   r4   worker_indexr=   r6   spaces)_local_env_runnerrv   copyupdatelocal_tf_session_args	framework_local_configinput_r   _ds
_ds_shardsadd_workers'validate_env_runners_after_constructionrm   
num_actorscreate_env_on_local_workerobservation_spaceaction_space
get_spaces_make_workerrS   )rp   r4   r6   r<   r7   r~   rz   s          rr   r9   zEnvRunnerGroup._setup   s   * "&a#
 !' 6 ; ; = =$$V%ABBB#[[U[;;EE1 F 
 
 =I%% )?v(W(W%DHdooDH"DO 	C 	 	
 	
 	
 	$//11A555 6- 65;5H 6 __&&FFF  	%)%6%6 -)+) &7 & &D"""	 	    c                     | j                                         r | j                                         d         gng }|                     d ||           d         }t                              d|            |S )zInfer observation and action spaces from one (local or remote) EnvRunner.

        Returns:
            A dict mapping from ModuleID to a 2-tuple containing obs- and action-space.
        r   c                 *    |                                  S N)r   )
env_runners    rr   <lambda>z+EnvRunnerGroup.get_spaces.<locals>.<lambda>:  s    z4466 r   )remote_worker_idsr7   zQInferred observation/action spaces from remote worker (local worker has no env): )rm   	actor_idsforeach_env_runnerloggerinfo)rp   r   rz   s      rr   r   zEnvRunnerGroup.get_spaces,  s     #--//T!++--a011 	 ((66/!22 ) 
 
 	 	:17: :	
 	
 	

 r   returnc                     | j         S )zReturns the local EnvRunner.)r{   rp   s    rr   r7   zEnvRunnerGroup.local_env_runnerF  s     %%r   c                 4    | j                                         S z&Returns the list of remote worker IDs.)rm   healthy_actor_idsr   s    rr   healthy_env_runner_idsz%EnvRunnerGroup.healthy_env_runner_idsK  s    #55777r   c                 *    |                                  S r   )r   r   s    rr   healthy_worker_idsz!EnvRunnerGroup.healthy_worker_idsO      **,,,r   c                 4    | j                                         S z(Returns the number of remote EnvRunners.)rm   r   r   s    rr   num_remote_env_runnersz%EnvRunnerGroup.num_remote_env_runnersS  s    #..000r   c                 *    |                                  S r   )r   r   s    rr   num_remote_workersz!EnvRunnerGroup.num_remote_workersW  r   r   c                 4    | j                                         S z-Returns the number of healthy remote workers.)rm   num_healthy_actorsr   s    rr   num_healthy_remote_env_runnersz-EnvRunnerGroup.num_healthy_remote_env_runners[      #66888r   c                 *    |                                  S r   )r   r   s    rr   num_healthy_remote_workersz)EnvRunnerGroup.num_healthy_remote_workers_  s    22444r   c                 n    t          t          | j                            |                                 z   S zFReturns the number of all healthy workers, including the local worker.)intboolr{   r   r   s    rr   num_healthy_env_runnersz&EnvRunnerGroup.num_healthy_env_runnersc  s,    4.//0043R3R3T3TTTr   c                 *    |                                  S r   )r   r   s    rr   num_healthy_workersz"EnvRunnerGroup.num_healthy_workersg  s    ++---r   tagc                 8    | j                             |          S )z/Returns the number of in-flight async requests.)r   )rm   num_outstanding_async_reqs)rp   r   s     rr   num_in_flight_async_reqsz'EnvRunnerGroup.num_in_flight_async_reqsk  s    #>>3>GGGr   c                 4    | j                                         S )zATotal number of times managed remote workers have been restarted.)rm   total_num_restartsr   s    rr   num_remote_worker_restartsz)EnvRunnerGroup.num_remote_worker_restartso  r   r   )from_workerenv_steps_sampledconnector_statesrl_module_stateenv_runner_indices_to_updateenv_to_modulemodule_to_envr   r   r   r   r   c                r   |t          |          }|p| j        }|j         o|j        p"|j        o|j        du p|j        dk    o|j         }	|j         o|j        p|j        o|j        }
|                                 dk    r4| j        r-| j        	                    i |	t          |ini |pi            dS |	s|
sdS |	r|g k    ri }nG||                     d d|j                  }d |D             }d	 |D             }| j        Jt          | j        d
          r5t          | j        d          r |J | j        j        }|J | j        j        }i }|r/|                    t"          |                    |          i           |r/|                    t&          |                    |          i           nY|5t"          |                                t&          |                                i}n"|                    t"          t&          g          }|
||t          <   |
s| j        | j        	                    |           n\|	                    |                    t"                    i            |	                    |                    t&                    i            |                    t"          d           |                    t&          d           |rd| j        |
r| j        	                    |           |r|                    |           |                     ddt1          |          |d           dS dS )a  Synchronizes the connectors of this EnvRunnerGroup's EnvRunners.

        The exact procedure works as follows:
        - If `from_worker` is None, set `from_worker=self.local_env_runner`.
        - If `config.use_worker_filter_stats` is True, gather all remote EnvRunners'
        ConnectorV2 states. Otherwise, only use the ConnectorV2 states of `from_worker`.
        - Merge all gathered states into one resulting state.
        - Broadcast the resulting state back to all remote EnvRunners AND the local
        EnvRunner.

        Args:
            config: The AlgorithmConfig object to use to determine, in which
                direction(s) we need to synch and what the timeouts are.
            from_worker: The EnvRunner from which to synch. If None, will use the local
                worker of this EnvRunnerGroup.
            env_steps_sampled: The total number of env steps taken thus far by all
                workers combined. Used to broadcast this number to all remote workers
                if `update_worker_filter_stats` is True in `config`.
            env_runner_indices_to_update: The indices of those EnvRunners to update
                with the merged state. Use None (default) to update all remote
                EnvRunners.
        NTtraining_onlyr   c                 F    |                      t          t          g          S )N
components)	get_stater   r   ws    rr   r   z7EnvRunnerGroup.sync_env_runner_states.<locals>.<lambda>  s$    !++ A A( #. # # r   F)r7   timeout_secondsc                 :    g | ]}t           |v |t                    S  )r   .0ss     rr   
<listcomp>z9EnvRunnerGroup.sync_env_runner_states.<locals>.<listcomp>  3     ( ( (8A== 78===r   c                 :    g | ]}t           |v |t                    S r   )r   r   s     rr   r   z9EnvRunnerGroup.sync_env_runner_states.<locals>.<listcomp>  r   r   _env_to_module_module_to_envr   	set_statestate        funcr   kwargsr   r   )r   r7   rb   use_worker_filter_statsmerge_env_runner_statesin_evaluationupdate_worker_filter_statsbroadcast_env_runner_statesr   r   r#   r   )sync_filters_on_rollout_workers_timeout_shasattrr   r   r}   r   merge_statesr   r   getpop$foreach_env_runner_async_fetch_readyrQ   )rp   r6   r   r   r   r   r   r   r   merge	broadcastenv_runner_statesenv_to_module_statesmodule_to_env_statess                 rr   sync_env_runner_statesz%EnvRunnerGroup.sync_env_runner_statess  sz   D ( #$5 6 6!:T%: 99 /.
 5 .$6 2oE 1"00 	 99 21
 5 32 	 **,,11d6K1!++ -8 89JKK	 ',"	 	 	 F  	Y 	F  E	2%%$&!!#+'+'>'>  */"L (? ( ($( (-( ( ($
( (-( ( ($ )5 57GHH 6 57GHH 6 )000$($9$HM(000$($9$HM$&!' %,,= - : :;O P P   ( %,,= - : :;O P P   "5}7N7N7P7P5}7N7N7P7P%!!
 %0$9$999  %: % %! (@Q<=
  	K$0%//0ABBBB''%))*KLLb   ''%))*KLLb   !!"CTJJJ!!"CTJJJ  	 $0Y0%//0ABBB
  :!((999
 55 "3444"> # 6     !	 	r   r   Fr   r   r   return_obj_refsmark_healthyhealthy_onlyr   return_actor_idsc
                 T    | j                             ||||||||| j        |	
  
        S )zCalls the given function asynchronously and returns previous results if any.

        This is a convenience function that calls the underlying actor manager's
        `foreach_actor_async_fetch_ready()` method.

        )
r   r   r   r   r   r   r   remote_actor_idsignore_ray_errorsr   )rm   foreach_actor_async_fetch_readyrk   )
rp   r   r   r   r   r   r   r   r   r   s
             rr   r   z3EnvRunnerGroup.foreach_env_runner_async_fetch_ready8  sF    ( #CC++%%."D- D 
 
 	
r   policiesfrom_worker_or_learner_groupr   to_worker_indicesglobal_varsinference_onlyc                 n   | j         |t          d          d}|                                 s|||n| j         }|t          d          |d |D             nt          g}	t          |t                    r-|                    d |	D             |          t                   }n| j	        j
        rft          |t          j        j                  r/t          j        |j                            |	|                    }n/|                    |	|          }n|                    ||          }| j	        j
        r[d |                                D             }t          j        |          |                     d	d	t)          
          ||           n3t          j        |          fd}
|                     |
d||           | j         a|A| j	        j
        r| j                             |           n| j                             |           | j                                        dS dS dS )a  Syncs model weights from the given weight source to all remote workers.

        Weight source can be either a (local) rollout worker or a learner_group. It
        should just implement a `get_weights` method.

        Args:
            policies: Optional list of PolicyIDs to sync weights for.
                If None (default), sync weights to/from all policies.
            from_worker_or_learner_group: Optional (local) EnvRunner instance or
                LearnerGroup instance to sync from. If None (default),
                sync from this EnvRunnerGroup's local worker.
            to_worker_indices: Optional list of worker indices to sync the
                weights to. If None (default), sync to all remote workers.
            global_vars: An optional global vars dict to set this
                worker to. If None, do not update the global_vars.
            timeout_seconds: Timeout in seconds to wait for the sync weights
                calls to complete. Default is 0.0 (fire-and-forget, do not wait
                for any sync calls to finish). Setting this to 0.0 might significantly
                improve algorithm performance, depending on the algo's `training_step`
                logic.
            inference_only: Sync weights with workers that keep inference-only
                modules. This is needed for algorithms in the new stack that
                use inference-only modules. In this case only a part of the
                parameters are synced to the workers. Default is False.
        NzmNo `local_env_runner` in EnvRunnerGroup! Must provide `from_worker_or_learner_group` arg in `sync_weights()`!z`from_worker_or_trainer` is None. In this case, EnvRunnerGroup should have local_env_runner. But local_env_runner is also None.c                 (    g | ]}t           d z   |z   S /)r   )r   ps     rr   r   z/EnvRunnerGroup.sync_weights.<locals>.<listcomp>  s#    AAA1$s*Q.AAAr   c                 (    g | ]}t           d z   |z   S r   )r   )r   ms     rr   r   z/EnvRunnerGroup.sync_weights.<locals>.<listcomp>  s#    MMM 1C 7! ;MMMr   )r   r   )r   r   c                 :    i | ]\  }}|t           t          fv ||S r   )r   r$   )r   kvs      rr   
<dictcomp>z/EnvRunnerGroup.sync_weights.<locals>.<dictcomp>  s9     # # #10.AAA qAAAr   r   r   r   c                 X    |                      t          j                             d S r   )set_weightsrV   r   )r   r   rl_module_state_refs    rr   _set_weightsz1EnvRunnerGroup.sync_weights.<locals>._set_weights  s*    **373F+G+GUUUUUr   F)r   r7   r   r   )r7   	TypeErrorr   re   r   rP   r   r   r   rU   rb   rV   actorr   r   remoteget_weightsitemsrW   r   rQ   r   r   r  set_global_vars)rp   r   r   r   r   r   r   r   weights_srcmodulesr  r  s       `      @rr   sync_weightszEnvRunnerGroup.sync_weightsY  s   D  (-I-QJ   ""$$ Z	(D(P 0; -,*  " W   ' BAAAAA)*  +|44  "-"7"7MMWMMM#1 #8 # # $#%
 $G  k39+@AA &)g#-44'.+9 5  ' 'OO '2&;&;#*'5 '< ' 'OO #."9"9%#1 #: # #
 "E %# # / 5 5 7 7# # # '*go&>&>#
 99$#&9:::&7$3 :     '*go&>&>#V V V V V V ''%%*&7$3	 (     ,+7&I G)33ODDDD)55oFFF &%55kBBBBB -, '&r   r=   rx   c                      j                                          fdt                    D             } j                             |           |rm j                             d           D ]S}|j        sH|                                } j        r+t          	                    dt          |                      O|RdS dS )a  Creates and adds a number of remote workers to this worker set.

        Can be called several times on the same EnvRunnerGroup to add more
        EnvRunners to the set.

        Args:
            num_workers: The number of remote Workers to add to this
                EnvRunnerGroup.
            validate: Whether to validate remote workers after their construction
                process.

        Raises:
            RayError: If any of the constructed remote workers is not up and running
                properly.
        c           	      j    g | ]/}                     j        d |z   dz   z   j                  0S )NrJ   )r3   r4   ry   r=   r6   )r   rS   rX   )r   ir=   old_num_workersrp   s     rr   r   z.EnvRunnerGroup.add_workers.<locals>.<listcomp>  sf     
 
 
   -!,q014+k9 2  	 	
 
 
r   c                 *    |                                  S r   )assert_healthyr   s    rr   r   z,EnvRunnerGroup.add_workers.<locals>.<lambda>  s    !**,, r   z&Validation of EnvRunner failed! Error=N)rm   r   range
add_actorsforeach_actorokr   rk   r   rB   str)rp   r=   rx   new_workersresultrq   r  s   ``    @rr   r   zEnvRunnerGroup.add_workers  s	     .99;;
 
 
 
 
 
 ;''
 
 
 	''444  	 .<<,,  
  
 
 y  

A=  %VcRSff%V%VWWWW 	  	 
  
 r   new_remote_workersc                 l    | j                                          | j                             |           dS )zHard overrides the remote EnvRunners in this set with the provided ones.

        Args:
            new_remote_workers: A list of new EnvRunners (as `ActorHandles`) to use as
                new remote workers.
        N)rm   clearr  )rp   r  s     rr   resetzEnvRunnerGroup.reset  s7     	""$$$''(:;;;;;r   c                     	 |                      d dd           n*# t          $ r t                              d           Y nw xY w| j                                         dS # | j                                         w xY w)z9Calls `stop` on all EnvRunners (including the local one).c                 *    |                                  S r   )stopr   s    rr   r   z%EnvRunnerGroup.stop.<locals>.<lambda>,  s    !&&(( r   FT)r   r7   zFailed to stop workers!N)r   	Exceptionr   	exceptionrm   r!  r   s    rr   r%  zEnvRunnerGroup.stop%  s    
	) ##"" $      	8 	8 	8677777	8  &&(((((D &&((((s&    A! $A A! AA! !A<)r   r7   r   r   r   r   r   c          	         |r|r
J d            g }	|rH| j         A|J t          |t                    rt          | j         |          g}	n || j                   g}	| j                                        s|	S | j                            |||||||          }
t          j        |
| j	                   d |

                                D             }
|	|
z   S )a  Calls the given function with each EnvRunner as its argument.

        Args:
            func: The function to call for each EnvRunners. The only call argument is
                the respective EnvRunner instance.
            local_env_runner: Whether to apply `func` to local EnvRunner, too.
                Default is True.
            healthy_only: Apply `func` on known-to-be healthy EnvRunners only.
            remote_worker_ids: Apply `func` on a selected set of remote EnvRunners.
                Use None (default) for all remote EnvRunners.
            timeout_seconds: Time to wait (in seconds) for results. Set this to 0.0 for
                fire-and-forget. Set this to None (default) to wait infinitely (i.e. for
                synchronous execution).
            return_obj_refs: Whether to return ObjectRef instead of actual results.
                Note, for fault tolerance reasons, these returned ObjectRefs should
                never be resolved with ray.get() outside of this EnvRunnerGroup.
            mark_healthy: Whether to mark all those EnvRunners healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that EnvRunners are NOT set unhealthy, if they simply time out
                (only if they return a RayActorError).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects EnvRunners that are currently tagged as
                unhealthy).

        Returns:
             The list of return values of all calls to `func([worker])`.
        z+Can not return ObjectRef from local worker.N)r   r   r   r   r   r   r   c                 6    g | ]}|                                 S r   )r   r   rs     rr   r   z5EnvRunnerGroup.foreach_env_runner.<locals>.<listcomp>{  s     JJJa!%%''JJJr   )r7   rP   r  getattrrm   r   r  r     handle_remote_call_result_errorsrk   ignore_errors)rp   r   r   r7   r   r   r   r   r   local_resultremote_resultss              rr   r   z!EnvRunnerGroup.foreach_env_runner3  s(   V  	9'7	9 	98	9 	97  	= 5 A>>>$$$ = '(=t D DE $T%: ; ;<#--// 	 -;;%.++% < 
 
 	"Bd.T	
 	
 	
 	

 KJ>+G+G+I+IJJJn,,r   )r   r   r   c                @    | j                             |||||          S )a  Calls the given function asynchronously with each EnvRunner as the argument.

        Does not return results directly. Instead, `fetch_ready_async_reqs()` can be
        used to pull results in an async manner whenever they are available.

        Args:
            func: The function to call for each EnvRunners. The only call argument is
                the respective EnvRunner instance.
            tag: A tag to identify the results from this async call when fetching with
                `fetch_ready_async_reqs()`.
            kwargs: An optional kwargs dict to be passed to the remote function calls.
            healthy_only: Apply `func` on known-to-be healthy EnvRunners only.
            remote_worker_ids: Apply `func` on a selected set of remote EnvRunners.

        Returns:
             The number of async requests that have actually been made. This is the
             length of `remote_worker_ids` (or self.num_remote_workers()` if
             `remote_worker_ids` is None) minus the number of requests that were NOT
             made b/c a remote EnvRunner already had its
             `max_remote_requests_in_flight_per_actor` counter reached for this tag.
        )r   r   r   r   )rm   foreach_actor_async)rp   r   r   r   r   r   s         rr   foreach_env_runner_asyncz'EnvRunnerGroup.foreach_env_runner_async  s4    B #77%. 8 
 
 	
r   tagsr   r   r   r6  c                    | j                             ||||          }t          j        || j                   d |                                D             S )a  Get results from outstanding asynchronous requests that are ready.

        Args:
            tags: Tags to identify the results from a specific async call.
                If None (default), returns results from all ready async requests.
                If a single string, returns results from all ready async requests with that tag.
            timeout_seconds: Time to wait for results. Default is 0, meaning
                those requests that are already ready.
            return_obj_refs: Whether to return ObjectRef instead of actual results.
            mark_healthy: Whether to mark all those workers healthy again that are
                currently marked unhealthy AND that returned results from the remote
                call (within the given `timeout_seconds`).
                Note that workers are NOT set unhealthy, if they simply time out
                (only if they return a RayActorError).
                Also note that this setting is ignored if `healthy_only=True` (b/c
                `mark_healthy` only affects workers that are currently tagged as
                unhealthy).

        Returns:
            A list of results successfully returned from outstanding remote calls,
            paired with the indices of the callee workers.
        r5  r)  c                 D    g | ]}|j         |                                fS r   )actor_idr   r+  s     rr   r   z9EnvRunnerGroup.fetch_ready_async_reqs.<locals>.<listcomp>  s'    NNN!QUUWW%NNNr   )rm   fetch_ready_async_reqsr    r.  rk   r/  )rp   r6  r   r   r   r1  s         rr   r:  z%EnvRunnerGroup.fetch_ready_async_reqs  sv    > -DD++%	 E 
 
 	"B"D	
 	
 	
 	

 ON~/K/K/M/MNNNNr   c                 R    t          |                     fdd                    S )aU  Calls `func` with all workers' sub-environments as args.

        An "underlying sub environment" is a single clone of an env within
        a vectorized environment.
        `func` takes a single underlying sub environment as arg, e.g. a
        gym.Env object.

        Args:
            func: A function - taking an EnvType (normally a gym.Env object)
                as arg and returning a list of lists of return values, one
                value per underlying sub-environment per each worker.

        Returns:
            The list (workers) of lists (sub environments) of results.
        c                 .    |                                S r   )foreach_envr   r   s    rr   r   z,EnvRunnerGroup.foreach_env.<locals>.<lambda>  s    !---- r   Tr7   listr   rp   r   s    `rr   r=  zEnvRunnerGroup.foreach_env  s>    " ##----!% $  
 
 	
r   c                 R    t          |                     fdd                    S )a`  Calls `func` with all workers' sub-environments and env_ctx as args.

        An "underlying sub environment" is a single clone of an env within
        a vectorized environment.
        `func` takes a single underlying sub environment and the env_context
        as args.

        Args:
            func: A function - taking a BaseEnv object and an EnvContext as
                arg - and returning a list of lists of return values over envs
                of the worker.

        Returns:
            The list (1 item per workers) of lists (1 item per sub-environment)
                of results.
        c                 .    |                                S r   )foreach_env_with_contextr>  s    rr   r   z9EnvRunnerGroup.foreach_env_with_context.<locals>.<lambda>  s    !44T:: r   Tr?  r@  rB  s    `rr   rE  z'EnvRunnerGroup.foreach_env_with_context  s>    ( ##::::!% $  
 
 	
r   c                 N    | j                             | j        j        d          S )zChecks for unhealthy workers and tries restoring their states.

        Returns:
            List of IDs of the workers that were restored.
        T)r   r   )rm   probe_unhealthy_actorsrU   !env_runner_health_probe_timeout_sr   s    rr   probe_unhealthy_env_runnersz*EnvRunnerGroup.probe_unhealthy_env_runners  s0     #:: /Q ; 
 
 	
r   c                 p    g }|                      fdd          D ]}|                    |           |S )a  Calls `func` with each worker's (policy, PolicyID) tuple.

        Note that in the multi-agent case, each worker may have more than one
        policy.

        Args:
            func: A function - taking a Policy and its ID - that is
                called on all workers' Policies.

        Returns:
            The list of return values of func over all workers' policies. The
                length of this list is:
                (num_workers + 1 (local-worker)) *
                [num policies in the multi-agent config dict].
                The local workers' results are first, followed by all remote
                workers' results
        c                 .    |                                S r   )foreach_policyr>  s    rr   r   z/EnvRunnerGroup.foreach_policy.<locals>.<lambda>(  s    a&&t,, r   Tr?  r   extendrp   r   resultsr,  s    `  rr   rL  zEnvRunnerGroup.foreach_policy  sX    & ((,,,,t ) 
 
 	 	A NN1r   c                 p    g }|                      fdd          D ]}|                    |           |S )a  Apply `func` to all workers' Policies iff in `policies_to_train`.

        Args:
            func: A function - taking a Policy and its ID - that is
                called on all workers' Policies, for which
                `worker.is_policy_to_train()` returns True.

        Returns:
            List[any]: The list of n return values of all
                `func([trainable policy], [ID])`-calls.
        c                 .    |                                S r   )foreach_policy_to_trainr>  s    rr   r   z8EnvRunnerGroup.foreach_policy_to_train.<locals>.<lambda><  s    a//55 r   Tr?  rM  rO  s    `  rr   rS  z&EnvRunnerGroup.foreach_policy_to_train-  sX     ((5555 ) 
 
 	 	A NN1r   	policy_idbatchc                 p    | j         r)| j         j        dS | j                             ||          S t          )zCWhether given PolicyID (optionally inside some batch) is trainable.NT)r7   is_policy_to_trainNotImplementedError)rp   rT  rU  s      rr   rW  z!EnvRunnerGroup.is_policy_to_trainA  s?    
   	&$7?t(;;IuMMM%%r   )r   r   r6   policy_statepolicy_mapping_fnpolicies_to_trainmodule_specworkers
policy_clspolicyr   r   rY  rZ  r[  r\  r]  c                   | j         rL|| j         j        v r>t          d| dt          | j         j                                                             |t
          urt          ddd           |du |du k    rt          d          t          |d	
           |*t          ||||||||	rt          |	          nd|
	  	        nWt          |t          |          |j        |j        |j        |                                ||	rt          |	          nd|
	  	        fd}| j         4| | j                             ||||	|
           n | j         j        di  |                     |d	           dS )a	  Adds a policy to this EnvRunnerGroup's workers or a specific list of workers.

        Args:
            policy_id: ID of the policy to add.
            policy_cls: The Policy class to use for constructing the new Policy.
                Note: Only one of `policy_cls` or `policy` must be provided.
            policy: The Policy instance to add to this EnvRunnerGroup. If not None, the
                given Policy object will be directly inserted into the
                local worker and clones of that Policy will be created on all remote
                workers.
                Note: Only one of `policy_cls` or `policy` must be provided.
            observation_space: The observation space of the policy to add.
                If None, try to infer this space from the environment.
            action_space: The action space of the policy to add.
                If None, try to infer this space from the environment.
            config: The config object or overrides for the policy to add.
            policy_state: Optional state dict to apply to the new
                policy instance, right after its construction.
            policy_mapping_fn: An optional (updated) policy mapping function
                to use from here on. Note that already ongoing episodes will
                not change their mapping but will use the old mapping till
                the end of the episode.
            policies_to_train: An optional list of policy IDs to be trained
                or a callable taking PolicyID and SampleBatchType and
                returning a bool (trainable or not?).
                If None, will keep the existing setup in place. Policies,
                whose IDs are not in the list (or for which the callable
                returns False) will not be updated.
            module_spec: In the new RLModule API we need to pass in the module_spec for
                the new module that is supposed to be added. Knowing the policy spec is
                not sufficient.
            workers: A list of EnvRunner/ActorHandles (remote
                EnvRunners) to add this policy to. If defined, will only
                add the given policy to these workers.

        Raises:
            KeyError: If the given `policy_id` already exists in this EnvRunnerGroup.
        zPolicy ID 'z' already exists in policy map! Make sure you use a Policy ID that has not been taken yet. Policy IDs that are already in your policy map: z)EnvRunnerGroup.add_policy(.., workers=..)zdThe `workers` argument to `EnvRunnerGroup.add_policy()` is deprecated! Please do not use it anymore.T)r@   helprB   NzeOnly one of `policy_cls` or `policy` must be provided to staticmethod: `EnvRunnerGroup.add_policy()`!F)rB   )	rT  r^  r   r   r6   rY  rZ  r[  r\  c                 "     | j         di  d S )Nr   )
add_policy)workernew_policy_instance_kwargss    rr   _create_new_policy_fnz8EnvRunnerGroup.add_policy.<locals>._create_new_policy_fn  s%     F;; :;;;;;r   )rT  r_  rZ  r[  r\  r?  r   )r7   
policy_mapKeyErrorrA  keysr   r   re   r   rQ   typer   r   r6   r   rc  r   )rp   rT  r^  r_  r   r   r6   rY  rZ  r[  r\  r]  rf  re  s                @rr   rc  zEnvRunnerGroup.add_policyM  s9   x   	Y$2G2R%R%RCi C C -8==??@@C C   ***?@     $FdN33?   	9E2222 !)-#%"3))"3$#$'8"9"9"9'* * *&&" *.#<<"(":#0}#--//"3$#$'8"9"9"9'* * *&	< 	< 	< 	< 	<
  ,!%00'!&7&7 + 1     1%0NN3MNNN 	 5NNNNNr   )recreated_workerrz   ry   rk  rz   c                ^   t          ||| j        ||||| j        || j        | j                  }|dk    r | j        di |S t          j                                        dn	| j	        |z   }	  t          j
        di | j        | j                                      |	          j
        di |S )N)r3   r4   r5   r6   ry   r=   rk  log_dirrz   dataset_shardsr:   r   )placement_group_bundle_indexr   )rQ   rT   ri   r   r_   ra   rV   utilget_current_placement_groupr`   r  r^   options)
rp   r3   r4   ry   r=   rk  r6   rz   r   pg_bundle_idxs
             rr   r   zEnvRunnerGroup._make_worker  s     #%!%!3%#-L?-
 
 
 1&4&00000 x3355= B</ 	+CJ++*++D,?@@W-W@@  	
r   c           	      t   t          | t                    rt          j                            |           sd| v r|                     dd          \  }}	 t          j                            |          }|dS nA# t          t          f$ r-}t                              d| d|  d|            Y d }~nd }~ww xY wdS )N.rJ   Tzmodule z not found using input z with error: F)rP   r  ospathisfilersplit	importlibrq  	find_specModuleNotFoundErrorre   r   warning)
class_pathmodule_path
class_namespecrq   s        rr   _valid_modulezEnvRunnerGroup._valid_module  s     z3''	GNN:..	 z!!&0&7&7Q&?&?#K ~//<<#4 $'4   ^k^^*^^[\^^        us   !A7 7B5#B00B5r   )NNr   FFTNF)NNNNr   F)F)r   N)NN)H__name__
__module____qualname____doc__r   r	   r&   r   r'   r   r   r   r  r   rs   r9   r   propertyr   r7   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r0   floatr
   r   r*   r,   r  r   r   r"  r%  r   r4  r:  r!   r=  r   r   rE  rI  rL  rS  r+   rW  gymrz   Spacer)   r   r%   r(   r   r   rc  r   staticmethodr  r   r   rr   r2   r2   E   s         -1<@7;.2!% $'+)-$%V V V j)V x	489	V
 'tF|4V *+V V V V  }V V "#V V V Vv =A.2 !%F F F x	489F *+	F
 F F F F FP  4 &) & & & X&8S	 8 8 8 8-DI - - - -1 1 1 1 1-C - - - -9 9 9 9 95C 5 5 5 5U U U U U.S . . . .H HHSM HS H H H H9C 9 9 9 9 ,0+/;?48<@C C C "C i(	C
 $C=C #4S#X#78C "$sCx.1C '/tCy&9C 
C C C CT ,0!+. %"!'+!&
 
i[!^$d8YKN+C&Dc4PS9T


 c3h(
 c]
 "%
 
 
 
  9
 
 
eE#q&M1$%	&
 
 
 
F .2SW157;+.).QC QC4>*QC '/uY5N/O&PQC $DI.	QC
 d3
?34QC "%QC !QC 
QC QC QC QCf-  - s - d - t -  -  -  - ^<[(9 <d < < < <) ) ) )( !%!'++/ %"J- J- J-i[!^$d8YKN+C&Dc4PS9T
J- J- J-  9J- "%J- J- J- 
aJ- J- J- J-b "'
 !'+'
 '
 '
i[!^$d8YKN+C&Dc4PS9T
'

 c]'
 '
  9'
 
'
 '
 '
 '
X =A+. %"+O +O +O uS$s)U3Z789+O "%	+O
 +O +O 
eCFm	+O +O +O +OZ 
'DG); < 
d1g 
 
 
 [
. 
gz2DG;<
	d1g
 
 
 [
4	
T#Y 	
 	
 	
 	
 8VX,>,A#B tAw    [2 Hfh5G5J,K PTUVPW    [& FJ	& 	&!	&*2?*C	&		& 	& 	& [	&  .2#'	IO 9=37QU.2PT .2AQ'IO IO IOIO T&\*IO  	IO $CJ$45IO sz/0IO 02LLMNIO {+IO $Hgy-A8-K$LMIO $8$(H_$=>DEG
IO" l+#IO& $uY%;<=>'IO( 
)IO IO IO [IOd "' '
 '
 '
  '
 x	489	'

 '
 '
 '
 "'
 5!13:3C!CDDE
'
 
y+%	&'
 '
 '
 '
R   \  r   r2   )Pimportlib.utilr{  loggingrw  typingr   r   r   r   r   r   r	   r
   r   r   r   	gymnasiumr  rV   ray._common.deprecationr   r   	ray.actorr   ray.exceptionsr   ray.rllib.corer   r   r   r   ray.rllib.core.learnerr   ray.rllib.core.rl_moduler   "ray.rllib.core.rl_module.rl_moduler   ray.rllib.env.base_envr   ray.rllib.env.env_contextr   ray.rllib.env.env_runnerr   #ray.rllib.evaluation.rollout_workerr   ray.rllib.offliner   ray.rllib.policy.policyr   r   ray.rllib.utils.actor_managerr    ray.rllib.utils.annotationsr!   ray.rllib.utils.frameworkr"   ray.rllib.utils.metricsr#   r$   ray.rllib.utils.typingr%   r&   r'   r(   r)   r*   r+   r,   ray.util.annotationsr-   rO   r/   tf1tftfv	getLoggerr  r   r0   r2   r   r   rr   <module>r     sK        				                              



        " ! ! ! ! ! ( ( ( ( ( (            0 / / / / / 7 7 7 7 7 7 ; ; ; ; ; ; * * * * * * 0 0 0 0 0 0 . . . . . . = = = = = = 4 4 4 4 4 4 7 7 7 7 7 7 7 7 C C C C C C 3 3 3 3 3 3 3 3 3 3 3 3 R R R R R R R R	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 . - - - - - FEEEEEE}R		8	$	$ GCLL L L L L L L L L L Lr   