
    &`i:                       U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZ d dlmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlmZmZ d dlmZm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZF d dlGmHZHmIZI d dlJmKZK d dlLmMZMmNZNmOZOmPZP d dlQmRZR d dlSmTZT d dlUmVZV d dlWmXZXmYZY d dlZm[Z[m\Z\ d dl]m^Z^m_Z_ d dl`maZambZb d dlcmdZdmeZe d d lfmgZg d d!lhmiZi d d"ljmkZk d d#llmmZm d d$lnmoZpmqZq d d%lrmsZsmtZtmuZumvZvmwZwmxZxmyZymzZzm{Z{m|Z|m}Z} d d&l~mZmZ d d'lmZ d d(lmZmZmZ d d)lmZ erd d*lmZ d d+lmZ  ed            \  ZZZ ee            \  ZZ ej        e          Zdaed,         ed-<   eXd6d/            Zd0eud1ed2ed3efd4ZeX G d5 d,ee+                      ZdS )7    N)defaultdict)FunctionType)TYPE_CHECKINGAnyCallable
CollectionDictListOptionalSetTupleTypeUnion)Space)	ObjectRefcloudpickle)create_connectors_for_policymaybe_get_filters_for_syncing)validate_module_id)RLModuleSpec)BaseEnvconvert_to_base_env)
EnvContext)	EnvRunner)ExternalMultiAgentEnv)MultiAgentEnv)is_atariwrap_deepmind)RolloutMetrics)SyncSampler)ModelCatalog)Preprocessor)
D4RLReaderDatasetReaderDatasetWriterInputReader	IOContext
JsonReader
JsonWriter
MixedInput
NoopOutputOutputWriterShuffledInput)Policy
PolicySpec)	PolicyMap)DEFAULT_POLICY_IDMultiAgentBatchconcat_samples convert_ma_batch_to_sample_batch)TorchPolicy)TorchPolicyV2)
force_list)OldAPIStackoverride)	summarizeupdate_global_seed_if_necessary)ERR_MSG_NO_GPUSHOWTO_CHANGE_CONFIG)FilterNoFilter)try_import_tftry_import_torch)from_config)create_policy_for_framework)do_minibatch_sgd)_TFRunBuilder)get_gpu_devicesget_tf_eager_cls_if_necessary)AgentID
EnvCreatorEnvTypeModelGradientsModelWeightsMultiAgentPolicyConfigDictPartialAlgorithmConfigDictPolicyIDPolicyStateSampleBatchTypeT)registry_contains_inputregistry_get_input)	PublicAPI)disable_log_once_globallyenable_periodic_logginglog_once)ParallelIteratorWorkerAlgorithmConfig)RLlibCallbackRolloutWorker_global_workerreturnc                      t           S )z>Returns a handle to the active rollout worker in this process.)r^        w/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/evaluation/rollout_worker.pyget_global_workerrd   r   s
    
 rb   envseed
worker_idx
vector_idxc                 V   |dS d}||k     s
J d            ||z  |z   |z   }t          | d          s1t          d          r t                              d|  d           dS dS 	 |                     |           dS # t
          $ r" t                              d|  d	           Y dS w xY w)
zzSet a deterministic random seed on environment.

    NOTE: this may not work with remote environments (issue #18154).
    Ni  z3Too many envs per worker. Random seeds may collide.resetenv_has_no_reset_methodzEnv z. doesn't have a `reset()` method. Cannot seed.)rf   z doesn't support setting a seed via its `reset()` method! Implement this method as `reset(self, *, seed=None, options=None)` for it to abide to the correct API. Cannot seed.)hasattrrX   loggerinforj   	Exception)re   rf   rg   rh   max_num_envs_per_env_runnercomputed_seeds         rc   _update_env_seed_if_necessaryrr   z   s    | (,0000< 	100#&AAJNQUUM
 3   -.. 	TKKRsRRRSSSSS	T 	T	II=I))))) 	 	 	KKRs R R R     	s   $A< <(B('B(c                      e Zd ZdZdddddddddd	dedeeeegdf                  ded	         d
e	dee	         de
dee         deeeeeef         f                  deee                  deeej        j                          fdZ ee          d             Z ee          d             Z ee          defd            Z ee          deeeeef         f         fd            Z ej        d          deee	f         fd            ZdedefdZ de	de	dedee         dee!e	f         f
d Z"	 dlded!e
dee#e!f         fd"Z$d#e%e#eee#f         f         ddfd$Z& ee          dee'         fd%            Z(d&eege)f         dee)         fd'Z*d&eeege)f         dee)         fd(Z+e,fd)edee         fd*Z-	 	 dmdddddddd+d)ed,eee                  d-ee         d.ee         d/ee         dee.         d0ee/         d1ee%e0e         eeege
f         f                  d2ee1         defd3Z2e,ddd4d)ed5eee3gef                  d1ee%e0e         eeege
f         f                  ddfd6Z4	 dld5eee3e5gef                  ddfd7Z6d8e%e0e         eeee         ge
f         f         ddfd9Z7 e8d:;          	 dld<ee         de9e         fd=            Z:e,fd&eeee5         ge)f         d)ee         de)fd>Z;d&eeeee5         ge)f         dee)         fd?Z<d&eeeee5         ge)f         dee)         fd@Z=dAe!ddfdBZ>dndCe
defdDZ?de!fdEZ@dFe!ddfdGZA	 	 dodHee0e                  dIe
deeeBf         fdJZC	 	 dmdKeeeBf         dLee         dMee	         ddfdNZDde!fdOZE	 dldLe!dPeee                  ddfdQZF ee          dpdR            ZGdpdSZHdpdTZIdUedVe	dWe	dXeddf
dYZJde!fdZZKdefd[ZLdefd\ZMde	fd]ZNdddd^d_eOd-ee         d`eeee/f                  daee1         ddf
dbZPd_eOdeOfdcZQd_eOdeOfddZRddded_eOd-ee         d`eeee/f                  ddfdfZSd_eOddfdgZTdh ZUdi ZVdj ZWdk ZXdS )qr]   a	  Common experience collection class.

    This class wraps a policy instance and an environment class to
    collect experiences from the environment. You can create many replicas of
    this class as Ray actors to scale RL training.

    This class supports vectorized and multi-agent policy evaluation (e.g.,
    VectorEnv, MultiAgentEnv, etc.)

    .. testcode::
        :skipif: True

        # Create a rollout worker and using it to collect experiences.
        import gymnasium as gym
        from ray.rllib.evaluation.rollout_worker import RolloutWorker
        from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
        worker = RolloutWorker(
          env_creator=lambda _: gym.make("CartPole-v1"),
          default_policy_class=PPOTF1Policy)
        print(worker.sample())

        # Creating a multi-agent rollout worker
        from gymnasium.spaces import Discrete, Box
        import random
        MultiAgentTrafficGrid = ...
        worker = RolloutWorker(
          env_creator=lambda _: MultiAgentTrafficGrid(num_cars=25),
          config=AlgorithmConfig().multi_agent(
            policies={
              # Use an ensemble of two policies for car agents
              "car_policy1":
                (PGTFPolicy, Box(...), Discrete(...),
                 AlgorithmConfig.overrides(gamma=0.99)),
              "car_policy2":
                (PGTFPolicy, Box(...), Discrete(...),
                 AlgorithmConfig.overrides(gamma=0.95)),
              # Use a single shared policy for all traffic lights
              "traffic_light_policy":
                (PGTFPolicy, Box(...), Discrete(...), {}),
            },
            policy_mapping_fn=(
              lambda agent_id, episode, **kwargs:
              random.choice(["car_policy1", "car_policy2"])
              if agent_id.startswith("car_") else "traffic_light_policy"),
            ),
        )
        print(worker.sample())

    .. testoutput::

        SampleBatch({
            "obs": [[...]], "actions": [[...]], "rewards": [[...]],
            "terminateds": [[...]], "truncateds": [[...]], "new_obs": [[...]]}
        )

        MultiAgentBatch({
            "car_policy1": SampleBatch(...),
            "car_policy2": SampleBatch(...),
            "traffic_light_policy": SampleBatch(...)}
        )

    Nr   F)	validate_envconfigworker_indexnum_workersrecreated_workerlog_dirspacesdefault_policy_classdataset_shardsenv_creatorrt   ru   r[   rv   rw   rx   ry   rz   r{   r|   c       
         @    t                                                       _         j        d=  addlm} |t          |t                    r |                                |pi           }|	                                 |j
        rB|dk    r<|j
                                        D ]!\  }}t          |          t          j        |<   "nH|j        rA|dk    r;|j                                        D ]!\  }}t          |          t          j        |<   " fd}t!          j         |d           t%          j         |           ||n j        j         _        |
 _        | _        t1          j                     _        t6          rD|j        dk    s|j        r2t6                                          st6                                            j        j         r1tC          j"        d	          #                     j        j                     j        d
k    rtI                       n j        j         dk    rtK                       tM           j        j'         j        d j         j        j(        |          }| _)        | _         j        *                                 _+        | _,        d  _-         .                     j        j-                   | _/         j        0                     j                  }| j        j1        z   _2        |j3          _4        d _5        dtm          tn                    d _8         j        j9        dn# j        j9         j        z    j        j:        dz  z    _9         j        dk    rtw           j        j         j9                   dx _<         _=         j        dk    r j        dk    r j        j>        r' |t          j?         j)                             _<         j        j@        } j<        B| | j<         j)                   t           j<        t          t          jC        jD        f          rd }n{t           j<                  rM j        jF        dk    r=d _4         j        j@        d} j        jG        H                    d          du  fd}n j        jF        d _4        d }nd } | j<                   _<        t           j<         j9         j        d            j+        J                      j<         j)                    K                    |||| j9                   _=        | _L        |	 _M         j        N                     j<         jL         jM                  \   _O         _P        d _Q        d _R         j        dk    r j        jS        n j        jT        }t          jU                    rt          jV        jW        X                                t          jV        jW        jY        k    r|jZ        sg } j        j        dv rt                      }nH j        j        dk    r8t          t          t          j_        `                                                    }t          |          |k     r7t          t          jd        t          |          |          t          z             n~t          jU                    rkt          jV        jW        X                                t          jV        jW        jY        k    r0|dk    r* j        jZ        st          g                    d| d           tm          t                     _i        d _j         k                     jO                    jQ        l                                D ]}|jm        s|n                                  j        jo        rV j<        Ot           j<        t          t          t          t          jC        jD        f          st          d j<         d            j        dk    r2t          s                    d!d                     ji                              j<        d _t        n{d" j        v r% j        u                     j<                   _t        nMt           j<         j=         j        j1         j        j(         j        jw          j        jx        #           _t        |} j        jy        d$k    rd}n# j        jy        d%k    sJ t          d&          }d}t          | j         j                    _|        d} j        j}        du r j        dk    s j        d
k    rd} j<        d _~        n\t            jt        || j        j         j+        | j        j         j        j         j        j         j        j        |'           _~                                           j|                   _                                           j|                   _        d _        dS )(a  Initializes a RolloutWorker instance.

        Args:
            env_creator: Function that returns a gym.Env given an EnvContext
                wrapped configuration.
            validate_env: Optional callable to validate the generated
                environment (only on worker=0).
            worker_index: For remote workers, this should be set to a
                non-zero and unique value. This index is passed to created envs
                through EnvContext so that envs can be configured per worker.
            recreated_worker: Whether this worker is a recreated one. Workers are
                recreated by an Algorithm (via EnvRunnerGroup) in case
                `restart_failed_env_runners=True` and one of the original workers (or
                an already recreated one) has failed. They don't differ from original
                workers other than the value of this flag (`self.recreated_worker`).
            log_dir: Directory where logs can be placed.
            spaces: An optional space dict mapping policy IDs
                to (obs_space, action_space)-tuples. This is used in case no
                Env is created on this RolloutWorker.
        selfr   rZ   Nc               3   8   K   	                                   V  N)sampler   s   rc   gen_rolloutsz,RolloutWorker.__init__.<locals>.gen_rollouts  s%      $kkmm###$rb   F)ru   tf2z	ray.rllib   DEBUG)rv   vector_indexrw   remoterx   c                     t           S r   )r1   )agent_idepisodeworkerkws       rc   <lambda>z(RolloutWorker.__init__.<locals>.<lambda>R  s    4E rb   )rv   )timestepnum_grad_updates_per_policyi'  c                     | S r   ra   re   s    rc   wrapz$RolloutWorker.__init__.<locals>.wrap      Jrb   deepmindT
framestackc           	          t          | j        j                            d          j        j                            dd          dk              } | S )Ndim	frameskipr   r   )r   r   noframeskip)r   ru   modelget
env_config)re   r   use_framestacks    rc   r   z$RolloutWorker.__init__.<locals>.wrap  sX    ' K-11%88#1$(K$:$>$>{A$N$NRS$S	  C Jrb   c                     | S r   ra   r   s    rc   r   z$RolloutWorker.__init__.<locals>.wrap  r   rb   c                     | S r   ra   r   s    rc   r   z$RolloutWorker.__init__.<locals>.wrap  r   rb   r   sub_environmentenv_context)re   rz   r{   )r   tftorchz@You are running ray with `local_mode=True`, but have configured zf GPUs to be used! In local mode, Policies are placed on the CPU and the `num_gpus` setting is ignored.)policy_dictz1You are running a multi-agent setup, but the env zT is not a subclass of BaseEnv, MultiAgentEnv, ActorHandle, or ExternalMultiAgentEnv!zBuilt filter map: {}custom_vector_env)make_envnum_envsremote_envsremote_env_batch_wait_msr   restart_failed_sub_environmentstruncate_episodescomplete_episodesinf)r   re   clip_rewardsrollout_fragment_lengthcount_steps_by	callbacksmultiple_episodes_in_batchnormalize_actionsclip_actionsobservation_fnsample_collector_classrender)localscopy_original_kwargsr^   %ray.rllib.algorithms.algorithm_configr[   
isinstancedictupdate_from_dictfreeze extra_python_environs_for_driveritemsstrosenviron extra_python_environs_for_workerrY   __init__r   ru   num_env_runnersrw   
_ds_shardsrv   	threadingLock_locktf1framework_strenable_tf1_exec_eagerlyexecuting_eagerlyenable_eager_execution	log_levellogging	getLoggersetLevelrV   rW   r   r   remote_worker_envsr   callbacks_classr   rx   policy_mapping_fnset_policy_mapping_fnr}   get_rollout_fragment_lengthnum_envs_per_env_runnertotal_rollout_fragment_length_disable_preprocessor_apipreprocessing_enabled
last_batchr   intglobal_varsrf   in_evaluationr;   re   make_sub_env_fncreate_local_env_runnerdeepcopyr   r   rayactorActorHandler   preprocessor_prefr   r   rr   on_sub_environment_created_get_make_sub_env_fnrz   r{   get_multi_agent_setupr   is_policy_to_train
policy_mappreprocessorsnum_gpusnum_gpus_per_env_runneris_initialized_privater   _mode
LOCAL_MODE
_fake_gpusget_tf_gpu_deviceslistranger   cudadevice_countlenRuntimeErrorr<   formatr=   rm   warningr?   filtersmulti_rl_module_spec_update_policy_mapvalues%_model_init_state_automatically_added/_update_model_view_requirements_from_init_stateis_multi_agentr   r   
ValueErrorrn   	async_envr   r   r   r   
batch_modefloatr'   
io_context
render_envsamplerr    r   r   r   r   sample_collector_get_input_creator_from_configinput_reader_get_output_creator_from_configoutput_writerweights_seq_no)r   r}   rt   ru   rv   rw   rx   ry   rz   r{   r|   kwargsr[   keyvaluer   r   "configured_rollout_fragment_lengthr   r   r   devicespol#rollout_fragment_length_for_samplerpackr   r   s   `                         @rc   r   zRolloutWorker.__init__   s
   F '-hhmmoo!&) IIIIII >Z55>$_&&77"EEF 2 	-|q7H7H$EKKMM - -
U"%e**
3-4 	-9I9I$EKKMM - -
U"%e**
3	$ 	$ 	$ 	$ 	$ 	'lEBBB4//// '2KK8S 	
 )!- ^%%
 	)%..&2P. ))++ /
 &&(((;  	Kk**33DK4IJJJq  %''''["g--#%%% K"*(;1-
 
 
 ''-(,(C(C(E(E&6 FE 	 	""4;#@AAA'2-1[-T-T* .U .
 .
* /1TT 	* 06/O+O"59  ,7s+;+;	"
 	"
 {' D! k'%/0 		 q  +DK,EtyQQQ +/.4' "" 1$$K7 % #{4=1A#B#BCCDH{/8 'TXt'7888 $(Wci.C$DEE %    $(## (E(S(S-2* ;+3#'L "&!2!6!6|!D!D!L       .6-2*   
   !%TXDH
 *$(DIt?PRSTTTN55 $ , 6    $(#<#<[,di$ $D  $8!48K4U4U;!%!: 5V 5
 5
1$1 04;?
  A%% K  4 	   	#))++s|/B/MMM% N G{(M99,..*g55uUZ%<%<%>%>??@@7||h&&"#*3w<<AADWW   '   	#))++s|/B/MMM1K*  NN&     0;8/D/D %)!D,<=== ?))++ 	F 	FC< FCCEEE K&	$/	@UV  % *DH * * *   !!KK.55dlCCDDD 8!DNN DK//![::48DDDNN ':-< K:)-)MK?
' 
' 
'DN /Q+;!%888DD ;)-@@@@@27,,/D &/T[$"3T&
 &
 ;!T))!!T%6!%;%;F8DLL&N)(K#{9.+/"&+"?![5#{9'+{'C  DL *O)L)L)N)NO*
 *
 ,R4+O+O+Q+QO,
 ,
 .2rb   c                     t           r   )NotImplementedErrorr   s    rc   r   zRolloutWorker.make_envn  s
    
 "!rb   c                 p    | j         o| j        o| j        }|sJ d|  d| j         d| j         d            d S )NzRolloutWorker z (idx=z; num_workers=z) not healthy!)r   r  r  rv   rw   )r   
is_healthys     rc   assert_healthyzRolloutWorker.assert_healthyu  sq    _Q):Qt?Q
 	
 	
<T < <): < <+< < <	
 	
z 	
 	
rb   r_   c                    | j         j        r| j        | j        S | j        t	          d          t          d          r2t                              d                    | j	                             | j        
                                g}| j         j        dk    r|d         j        n|d                                         }| j         j        dk    r| j         j        s| j         j        }nt#          d          }|| j	        k     rt%          |          |k     rz| j        
                                }|| j         j        dk    r|j        n|                                z  }|                    |           || j	        k     rt%          |          |k     zt)          |          }| j                            | |	           | j                            |           t          d
          r:t                              d                    t3          |                               | j         j        r$|                    | j         j        dk               | j         j        r|| _        |S )a  Returns a batch of experience sampled from this worker.

        This method must be implemented by subclasses.

        Returns:
            A columnar batch of experiences (e.g., tensors) or a MultiAgentBatch.

        .. testcode::
            :skipif: True

            import gymnasium as gym
            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
            worker = RolloutWorker(
              env_creator=lambda _: gym.make("CartPole-v1"),
              default_policy_class=PPOTF1Policy,
              config=AlgorithmConfig(),
            )
            print(worker.sample())

        .. testoutput::

            SampleBatch({"obs": [...], "action": [...], ...})
        NzzRolloutWorker has no `input_reader` object! Cannot call `sample()`. You can try setting `create_local_env_runner` to True.sample_startz"Generating sample batch of size {}	env_stepsr   r   r   )r   samples
sample_endzCompleted sample batch:

{}
bulk)r(  )ru   fake_samplerr   r  r	  rX   rm   rn   r   r   nextr   countagent_stepsr  offline_samplingr   r  r   appendr3   r   on_sample_endr  writer:   compress_observationscompress)r   r  batchessteps_so_farmax_batchesbatchs         rc   r   zRolloutWorker.sample}  sh   4 ;# 	(C?"&5   N## 	KK4;;6    $))++, {)[88 AJ'')) 	 K"&999K0 : +=KK,,KT???LL;&&%**,,E;-<< &&((L
 NN5!!! T???LL;&& w''$$D%$@@@ 	  '''L!! 	TKK9@@5AQAQRRSSS;, 	MNN AV KNLLL;# 	$#DOrb   c                     |                      d           }d |D             }|                     d           }|rddlm} |d         ||<   |S )Nc                      || j         | j        fS r   observation_spaceaction_space)ppids     rc   r   z*RolloutWorker.get_spaces.<locals>.<lambda>  s    C!4anE rb   c           	      f    i | ].}|d          t          |d         d|d                   |d         f/S )r   r   original_space   )getattr).0es     rc   
<dictcomp>z,RolloutWorker.get_spaces.<locals>.<dictcomp>  s<    VVV!!A$1'71>>!EVVVrb   c                     | j         | j        fS r   r9  r   s    rc   r   z*RolloutWorker.get_spaces.<locals>.<lambda>  s    .0@A rb   r   )INPUT_ENV_SPACES)foreach_policyforeach_envray.rllib.envrF  )r   rz   
env_spacesrF  s       rc   
get_spaceszRolloutWorker.get_spaces  s~    $$EE
 
 WVvVVV%%AA
 

  	5666666'1!}F#$rb   r@  )num_returnsc                 <    |                                  }||j        fS )a  Same as sample() but returns the count as a separate value.

        Returns:
            A columnar batch of experiences (e.g., tensors) and the
                size of the collected batch.

        .. testcode::
            :skipif: True

            import gymnasium as gym
            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
            worker = RolloutWorker(
              env_creator=lambda _: gym.make("CartPole-v1"),
              default_policy_class=PPOTFPolicy)
            print(worker.sample_with_count())

        .. testoutput::

            (SampleBatch({"obs": [...], "action": [...], ...}), 3)
        )r   r+  r   r6  s     rc   sample_with_countzRolloutWorker.sample_with_count  s    . ek!!rb   r&  c                 ,   t          d          r:t                              d                    t	          |                               i }t          |t                    ri i }|j                                        D ]\  }}| j	        | 	                    ||          s#|
                                 | j        |         }|                                }|rCt          |d          r3t          |d          |<   |                    |         |          ||<   |                    |          ||<   |                    fd|                                D                        na| j	        | 	                    t$          |          r?|                    t$          | j        t$                                       |          i           t          d          r:t                              d                    t	          |                               |S )aH  Update policies based on the given batch.

        This is the equivalent to apply_gradients(compute_gradients(samples)),
        but can be optimized to avoid pulling gradients into CPU memory.

        Args:
            samples: The SampleBatch or MultiAgentBatch to learn on.

        Returns:
            Dictionary of extra metadata from compute_gradients().

        .. testcode::
            :skipif: True

            import gymnasium as gym
            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
            worker = RolloutWorker(
              env_creator=lambda _: gym.make("CartPole-v1"),
              default_policy_class=PPOTF1Policy)
            batch = worker.sample()
            info = worker.learn_on_batch(samples)
        learn_on_batchz-Training on concatenated sample batches:

{}
N_build_learn_on_batchc                 N    i | ]!\  }}||                              |          "S ra   r   )rB  r=  vbuilderss      rc   rD  z0RolloutWorker.learn_on_batch.<locals>.<dictcomp>2  s1    VVV63S(3-"3"3A"6"6VVVrb   	learn_outzTraining out:

{}
)rX   rm   rn   r   r:   r   r2   policy_batchesr   r   decompress_if_neededr   get_sessionrl   rE   rR  rQ  updater1   debug)	r   r&  info_outto_fetchr=  r6  policy
tf_sessionrV  s	           @rc   rQ  zRolloutWorker.learn_on_batch  s$   0 $%% 	KKBIIg&&    g// 	HH%4::<< A A
U*6t?V?V@ @6 **,,,-#//11
 A'&2I"J"J A$1*>N$O$OHSM$*$@$@#PU$V$VHSMM$*$9$9%$@$@HSMMOOVVVVX^^EUEUVVVWWWW&.$2I2I!73 3. )4?-,(.11   K   	NLL077	(8K8KLLMMMrb   expected_batch_sizenum_sgd_itersgd_minibatch_sizestandardize_fieldsc                    |                                  }|j        |k    sJ d|d|j        f            t                              d                    |j        |                     t          || j        | |||          }||j        fS )aA  Sample and batch and learn on it.

        This is typically used in combination with distributed allreduce.

        Args:
            expected_batch_size: Expected number of samples to learn on.
            num_sgd_iter: Number of SGD iterations.
            sgd_minibatch_size: SGD minibatch size.
            standardize_fields: List of sample fields to normalize.

        Returns:
            A tuple consisting of a dictionary of extra metadata returned from
                the policies' `learn_on_batch()` and the number of samples
                learned on.
        z:Batch size possibly out of sync between workers, expected:zgot:zIExecuting distributed minibatch SGD with epoch size {}, minibatch size {})r   r+  rm   rn   r   rD   r   )r   ra  rb  rc  rd  r6  rn   s          rc   sample_and_learnzRolloutWorker.sample_and_learnB  s    , {1111HK	4
111 	44:F/5 5	
 	
 	
  O
 
 U[  rb   single_agentc                    t          d          r:t                              d                    t	          |                               |du rEt          |          }| j        t                                       |          \  }}|j	        |d<   ||fS |
                                }i i }}| j        j        dk    r|j                                        D ]q\  }}| j        |                     ||          s#| j        |         }t!          |                                d          |                    |          \  ||<   ||<   rfd|                                D             }fd|                                D             }nf|j                                        D ]L\  }}| j        |                     ||          s#| j        |                             |          \  ||<   ||<   M|j	        |d<   t          d	          r:t                              d
                    t	          |                               ||fS )aF  Returns a gradient computed w.r.t the specified samples.

        Uses the Policy's/ies' compute_gradients method(s) to perform the
        calculations. Skips policies that are not trainable as per
        `self.is_policy_to_train()`.

        Args:
            samples: The SampleBatch or MultiAgentBatch to compute gradients
                for using this worker's trainable policies.

        Returns:
            In the single-agent case, a tuple consisting of ModelGradients and
            info dict of the worker's policy.
            In the multi-agent case, a tuple consisting of a dict mapping
            PolicyID to ModelGradients and a dict mapping PolicyID to extra
            metadata info.
            Note that the first return value (grads) can be applied as is to a
            compatible worker using the worker's `apply_gradients()` method.

        .. testcode::
            :skipif: True

            import gymnasium as gym
            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
            worker = RolloutWorker(
              env_creator=lambda _: gym.make("CartPole-v1"),
              default_policy_class=PPOTF1Policy)
            batch = worker.sample()
            grads, info = worker.compute_gradients(samples)
        compute_gradientszCompute gradients on:

{}
Tbatch_countr   Nc                 B    i | ]\  }}|                     |          S ra   rT  rB  krU  builders      rc   rD  z3RolloutWorker.compute_gradients.<locals>.<dictcomp>  )    GGGda7;;q>>GGGrb   c                 B    i | ]\  }}|                     |          S ra   rT  rl  s      rc   rD  z3RolloutWorker.compute_gradients.<locals>.<dictcomp>  ro  rb   grad_outzCompute grad info:

{}
)rX   rm   rn   r   r:   r4   r   r1   ri  r+  as_multi_agentru   r   rX  r   r   rE   rZ  _build_compute_gradients)	r   r&  rg  rq  r]  r=  r6  r_  rn  s	           @rc   ri  zRolloutWorker.compute_gradientso  sz   H '(( 	TKK7>>y?Q?QRRSSS46w??G!%1B!C!U!U" "Hh '.mH]#X%% ((**  (;$,,%4::<< 	 	
U*6t?V?V@ @6 -'(:(:(<(<>QRR/5/N/NU0 0,x}} HGGGhnn6F6FGGGHGGGGhnn6F6FGGGHH%4::<<  
U*6t?V?V@ @6 /3s/C/U/U0 0,x}} #*-J 	RKK4;;Ih<O<OPPQQQ!!rb   gradsc                    t          d          r:t                              d                    t	          |                               t          |t                    rY|                                D ]B\  }}| j        |                     |d          r | j	        |         
                    |           CdS | j        |                     t          d          r'| j	        t                   
                    |           dS dS )aW  Applies the given gradients to this worker's models.

        Uses the Policy's/ies' apply_gradients method(s) to perform the
        operations.

        Args:
            grads: Single ModelGradients (single-agent case) or a dict
                mapping PolicyIDs to the respective model gradients
                structs.

        .. testcode::
            :skipif: True

            import gymnasium as gym
            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            from ray.rllib.algorithms.ppo.ppo_tf_policy import PPOTF1Policy
            worker = RolloutWorker(
              env_creator=lambda _: gym.make("CartPole-v1"),
              default_policy_class=PPOTF1Policy)
            samples = worker.sample()
            grads, info = worker.compute_gradients(samples)
            worker.apply_gradients(grads)
        apply_gradientszApply gradients:

{}
N)rX   rm   rn   r   r:   r   r   r   r   r   rv  r1   )r   rt  r=  gs       rc   rv  zRolloutWorker.apply_gradients  s   6 %&& 	MKK299)E:J:JKKLLL eT"" 
	F++-- < <Q*2d6M6M7 72 OC(88;;;	< < $,0G0Gt1
 1
, O-.>>uEEEEE -,rb   c                 L    | j         | j                                         }ng }|S )zReturns the thus-far collected metrics from this worker's rollouts.

        Returns:
             List of RolloutMetrics collected thus-far.
        )r  get_metrics)r   outs     rc   ry  zRolloutWorker.get_metrics  s+     <#,**,,CCC
rb   funcc                     | j         g S | j                                         }|s | j                   gS fd|D             S )a  Calls the given function with each sub-environment as arg.

        Args:
            func: The function to call for each underlying
                sub-environment (as only arg).

        Returns:
             The list of return values of all calls to `func([env])`.
        Nc                 &    g | ]} |          S ra   ra   )rB  rC  r{  s     rc   
<listcomp>z-RolloutWorker.foreach_env.<locals>.<listcomp>  s!    ***DDGG***rb   )r
  get_sub_environments)r   r{  envss    ` rc   rH  zRolloutWorker.foreach_env  s`     >!I~2244  	+D(()) +***T****rb   c                    | j         g S | j                                         }|s || j         | j                  gS g }t          |          D ]?\  }}| j                            |          }|                     |||                     @|S )a.  Calls given function with each sub-env plus env_ctx as args.

        Args:
            func: The function to call for each underlying
                sub-environment and its EnvContext (as the args).

        Returns:
             The list of return values of all calls to `func([env, ctx])`.
        Nr   )r
  r  r   	enumeratecopy_with_overridesr.  )r   r{  r  retirC  ctxs          rc   foreach_env_with_contextz&RolloutWorker.foreach_env_with_context  s     >!I~2244  	D)9::;; C!$ ) )1&:::JJ

443<<((((Jrb   	policy_idc                 6    | j                             |          S )a  Return policy for the specified id, or None.

        Args:
            policy_id: ID of the policy to return. None for DEFAULT_POLICY_ID
                (in the single agent case).

        Returns:
            The policy under the given ID (or None if not found).
        )r   r   )r   r  s     rc   
get_policyzRolloutWorker.get_policy.  s     ""9---rb   )r:  r;  ru   policy_stater   policies_to_trainmodule_spec
policy_clsr_  r:  r;  r  r  r  c          	         t          |d           |
t          d          || j        v r9t          d| dt	          | j                                                             |du |du k    rt          d          |C| j                            |t          ||||          i| j	        | j
        | j                  \  }}n0|t          t          |          |j        |j        |j                  i}| j                            |           |                     ||||i|
	           |                     |           |	|                     |	           | j        |         S )
ad  Adds a new policy to this RolloutWorker.

        Args:
            policy_id: ID of the policy to add.
            policy_cls: The Policy class to use for constructing the new Policy.
                Note: Only one of `policy_cls` or `policy` must be provided.
            policy: The Policy instance to add to this algorithm.
                Note: Only one of `policy_cls` or `policy` must be provided.
            observation_space: The observation space of the policy to add.
            action_space: The action space of the policy to add.
            config: The config overrides for the policy to add.
            policy_state: Optional state dict to apply to the new
                policy instance, right after its construction.
            policy_mapping_fn: An optional (updated) policy mapping function
                to use from here on. Note that already ongoing episodes will
                not change their mapping but will use the old mapping till
                the end of the episode.
            policies_to_train: An optional collection of policy IDs to be
                trained or a callable taking PolicyID and - optionally -
                SampleBatchType and returning a bool (trainable or not?).
                If None, will keep the existing setup in place.
                Policies, whose IDs are not in the list (or for which the
                callable returns False) will not be updated.
            module_spec: In the new RLModule API we need to pass in the module_spec for
                the new module that is supposed to be added. Knowing the policy spec is
                not sufficient.

        Returns:
            The newly added policy.

        Raises:
            ValueError: If both `policy_cls` AND `policy` are provided.
            KeyError: If the given `policy_id` already exists in this worker's
                PolicyMap.
        FerrorNzOIf you pass in module_spec to the policy, the RLModule API needs to be enabled.Policy ID 'z' already exists in policy map! Make sure you use a Policy ID that has not been taken yet. Policy IDs that are already in your policy map: zTOnly one of `policy_cls` or `policy` must be provided to RolloutWorker.add_policy()!)policiesre   rz   r{   )r   r_  policy_statessingle_agent_rl_module_spec)r   r	  r   KeyErrorr   keysru   r   r/   re   rz   r{   typer:  r;  r   r[  r  r   set_is_policy_to_train)r   r  r  r_  r:  r;  ru   r  r   r  r  policy_dict_to_add_s                rc   
add_policyzRolloutWorker.add_policy:  s   f 	9E2222"!  
 ''2i 2 2 ,,..//2 2   $FdN33.  
 >$(K$E$Ez"$5|V   
 H{%)%> %F 	% 	%! :LL,'M	 " 	 2333*$l3(3	 	  	
 	
 	
 	""#4555(''(9:::y))rb   )r  r   r  r   c                    || j         vrt          d| d          | j         |= | j        |= |                     |           ||                     |           dS dS )aU  Removes a policy from this RolloutWorker.

        Args:
            policy_id: ID of the policy to be removed. None for
                DEFAULT_POLICY_ID.
            policy_mapping_fn: An optional (updated) policy mapping function
                to use from here on. Note that already ongoing episodes will
                not change their mapping but will use the old mapping till
                the end of the episode.
            policies_to_train: An optional collection of policy IDs to be
                trained or a callable taking PolicyID and - optionally -
                SampleBatchType and returning a bool (trainable or not?).
                If None, will keep the existing setup in place.
                Policies, whose IDs are not in the list (or for which the
                callable returns False) will not be updated.
        r  z' not in policy map!N)r   r	  r   r   r  )r   r  r   r  s       rc   remove_policyzRolloutWorker.remove_policy  s~    2 DO++J9JJJKKKOI&y)""#4555(''(9::::: )(rb   c                 b    |*|| _         t          | j                   st          d          dS dS )zSets `self.policy_mapping_fn` to a new callable (if provided).

        Args:
            policy_mapping_fn: The new mapping function to use. If None,
                will keep the existing mapping function in place.
        Nz'`policy_mapping_fn` must be a callable!)r   callabler	  )r   r   s     rc   r   z#RolloutWorker.set_policy_mapping_fn  sI     (%6D"D233 L !JKKK )(L Lrb   r   c                     t          |          sAt          |t          t          t          f          s
J d            t          |          dfd	}|| _        dS )a  Sets `self.is_policy_to_train()` to a new callable.

        Args:
            is_policy_to_train: A collection of policy IDs to be
                trained or a callable taking PolicyID and - optionally -
                SampleBatchType and returning a bool (trainable or not?).
                If None, will keep the existing setup in place.
                Policies, whose IDs are not in the list (or for which the
                callable returns False) will not be updated.
        zERROR: `is_policy_to_train`must be a [list|set|tuple] or a callable taking PolicyID and SampleBatch and returning True|False (trainable or not?).Nc                     | v S r   ra   )r=  r6  polss     rc   r   z@RolloutWorker.set_is_policy_to_train.<locals>.is_policy_to_train  s    d{"rb   r   )r  r   r   settupler   )r   r   r  s     @rc   r  z$RolloutWorker.set_is_policy_to_train  s    $ *++ 		#04e2DEE  2 E
 )**D# # # # # # #5rb   alpha)	stabilityr6  c                 R      fd j                                         D             S )a  Returns all policies-to-train, given an optional batch.

        Loops through all policies currently in `self.policy_map` and checks
        the return value of `self.is_policy_to_train(pid, batch)`.

        Args:
            batch: An optional SampleBatchType for the
                `self.is_policy_to_train(pid, [batch]?)` check.

        Returns:
            The set of currently trainable policy IDs, given the optional
            `batch`.
        c                 N    h | ]!}j                              |          |"S r   )r   )rB  r=  r6  r   s     rc   	<setcomp>z6RolloutWorker.get_policies_to_train.<locals>.<setcomp>  sA     
 
 
&.$2I2I#u2U2U. ...rb   r   r  rN  s   ``rc   get_policies_to_trainz#RolloutWorker.get_policies_to_train  sB    "
 
 
 
 
++--
 
 
 	
rb   c                 *     || j         |         fi |S )a  Calls the given function with the specified policy as first arg.

        Args:
            func: The function to call with the policy as first arg.
            policy_id: The PolicyID of the policy to call the function with.

        Keyword Args:
            kwargs: Additional kwargs to be passed to the call.

        Returns:
            The return value of the function call.
        )r   )r   r{  r  r  s       rc   
for_policyzRolloutWorker.for_policy  s#    & tDOI.99&999rb   c                 R    fd| j                                         D             S )ar  Calls the given function with each (policy, policy_id) tuple.

        Args:
            func: The function to call with each (policy, policy ID) tuple.

        Keyword Args:
            kwargs: Additional kwargs to be passed to the call.

        Returns:
             The list of return values of all calls to
                `func([policy, pid, **kwargs])`.
        c                 *    g | ]\  }} ||fi S ra   ra   )rB  r=  r_  r{  r  s      rc   r~  z0RolloutWorker.foreach_policy.<locals>.<listcomp>/  s1    WWWVVS++F++WWWrb   )r   r   r   r{  r  s    ``rc   rG  zRolloutWorker.foreach_policy   s2     XWWWWt?T?T?V?VWWWWrb   c                 V      fd j                                         D             S )aK  
        Calls the given function with each (policy, policy_id) tuple.

        Only those policies/IDs will be called on, for which
        `self.is_policy_to_train()` returns True.

        Args:
            func: The function to call with each (policy, policy ID) tuple,
                for only those policies that `self.is_policy_to_train`
                returns True.

        Keyword Args:
            kwargs: Additional kwargs to be passed to the call.

        Returns:
            The list of return values of all calls to
            `func([policy, pid, **kwargs])`.
        c                 t    g | ]4}j                              |d            j        |         |fi 5S r   )r   r   )rB  r=  r{  r  r   s     rc   r~  z9RolloutWorker.foreach_policy_to_train.<locals>.<listcomp>F  s^     

 

 

 &.$2I2I#t2T2T. D%s55f55...rb   r  r  s   ```rc   foreach_policy_to_trainz%RolloutWorker.foreach_policy_to_train1  sJ    *

 

 

 

 

 

 ++--

 

 

 
	
rb   new_filtersc                     t          fd| j        D                       sJ | j        D ](}| j        |                             |                    )dS )zChanges self's filter to given and rebases any accumulated delta.

        Args:
            new_filters: Filters with new state to update local copy.
        c              3       K   | ]}|v V  	d S r   ra   )rB  rm  r  s     rc   	<genexpr>z-RolloutWorker.sync_filters.<locals>.<genexpr>X  s(      ::1#::::::rb   N)allr  sync)r   r  rm  s    ` rc   sync_filterszRolloutWorker.sync_filtersR  sl     ::::T\:::::::: 	1 	1ALO  Q0000	1 	1rb   flush_afterc                     i }| j                                         D ]2\  }}|                                ||<   |r|                                 3|S )zReturns a snapshot of filters.

        Args:
            flush_after: Clears the filter buffer state.

        Returns:
            Dict for serializable filters
        )r  r   as_serializablereset_buffer)r   r  return_filtersrm  fs        rc   get_filterszRolloutWorker.get_filters\  s_     L&&(( 	! 	!DAq ! 1 1 3 3N1 !   rb   c                 j   |                      d          }i }| j                                        D ]L}| j        j        r| j        |                     |          r"| j        |                                         ||<   Mt          | j                                                  || j        | j        |dS )NT)r  )
policy_idsr  r   r   r  )	r  r   r  ru   "checkpoint_trainable_policies_onlyr   	get_stater   r   )r   r  r  r=  s       rc   r  zRolloutWorker.get_statel  s    ""t"44?'')) 	F 	FC KBF*2**3// 3 &*_S%9%C%C%E%Ec" t335566 +!%!7"&"9
 
 	
rb   statec                 8   t          |t                    rt          j        |          }|                     |d                    d|v r|d         n|d         }|                                D ]\  }}t          |d           || j        vr|                    dd           }|"t          
                    d| d| d	           nYt          |t                    rt          j        |          n|}|                     ||j        |j        |j        |j        
           || j        v r | j        |                             |           d|v r|                     |d                    |                    d          |                     |d                    d S d S )Nr  r  r  Fr  policy_specz
PolicyID 'z' was probably added on-the-fly (not part of the static `multagent.policies` config) and no PolicySpec objects found in the pickled policy state. Will not add `z`, but ignore it for now.)r  r  r:  r;  ru   r   r   )r   bytespickleloadsr  r   r   r   r   rm   r  r   r/   deserializer  policy_classr:  r;  ru   	set_stater   r  )r   r  r  r=  r  specr  s          rc   r  zRolloutWorker.set_state  s    eU## 	(L''E 	%	*+++
 '6&>&>E/""E'N 	 "/!4!4!6!6 	= 	=C s%0000$/))#''t<<<NNOS O O 14O O O    9C48N8NX
.t444TX   OO"%#.#;*5*G%0%=*1 $    do%%$..|<<< %''&&u-@'ABBB99)**6''.B(CDDDDD 76rb   r  inference_onlyc                      &t           j                                                  t                     fd j                                        D             S )a  Returns each policies' model weights of this worker.

        Args:
            policies: List of PolicyIDs to get the weights from.
                Use None for all policies.
            inference_only: This argument is only added for interface
                consistency with the new api stack.

        Returns:
            Dict mapping PolicyIDs to ModelWeights.

        .. testcode::
            :skipif: True

            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            # Create a RolloutWorker.
            worker = ...
            weights = worker.get_weights()
            print(weights)

        .. testoutput::

            {"default_policy": {"layer1": array(...), "layer2": ...}}
        Nc                 X    i | ]&}|v |j         |                                         'S ra   )r   get_weights)rB  r=  r  r   s     rc   rD  z-RolloutWorker.get_weights.<locals>.<dictcomp>  sA     

 

 

 h %1133rb   )r   r   r  r7   )r   r  r  s   `` rc   r  zRolloutWorker.get_weights  st    : DO002233Hh''

 

 

 

 

 ++--

 

 

 
	
rb   weightsr   r  c                    ||| j         k    r|rt          t          t          |                                                    t
                    r`t          j        t          |                                                    fdt          |
                                          D             }|                                D ]\\  }}|| j        v r!| j        |                             |           /t          d          rt                              d| d           ]|| _         |r|                     |           dS dS )a  Sets each policies' model weights of this worker.

        Args:
            weights: Dict mapping PolicyIDs to the new weights to be used.
            global_vars: An optional global vars dict to set this
                worker to. If None, do not update the global_vars.
            weights_seq_no: If needed, a sequence number for the weights version
                can be passed into this method. If not None, will store this seq no
                (in self.weights_seq_no) and in future calls - if the seq no did not
                change wrt. the last call - will ignore the call to save on performance.

        .. testcode::
            :skipif: True

            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            # Create a RolloutWorker.
            worker = ...
            weights = worker.get_weights()
            # Set `global_vars` (timestep) as well.
            worker.set_weights(weights, {"timestep": 42})
        Nc                 (    i | ]\  }}||         S ra   ra   )rB  r  r=  actual_weightss      rc   rD  z-RolloutWorker.set_weights.<locals>.<dictcomp>	  s1       /5q#C*  rb   "set_weights_on_non_existent_policyz>`RolloutWorker.set_weights()` used with weights from policyID=z>, but this policy cannot be found on this worker! Skipping ...)r  r   r*  iterr  r   r   r   r   r  r  r   r   set_weightsrX   rm   r  set_global_vars)r   r  r   r  r=  wr  s         @rc   r  zRolloutWorker.set_weights  sa   : !^t7J%J%J :d40@0@+A+A&B&BINN !$gnn.>.>)?)?!@!@   9B7<<>>9R9R   "--//  Q$/))OC(44Q7777BCC NN0$'0 0 0   - 	.  -----	. 	.rb   c                     | j         S )a  Returns the current `self.global_vars` dict of this RolloutWorker.

        Returns:
            The current `self.global_vars` dict of this RolloutWorker.

        .. testcode::
            :skipif: True

            from ray.rllib.evaluation.rollout_worker import RolloutWorker
            # Create a RolloutWorker.
            worker = ...
            global_vars = worker.get_global_vars()
            print(global_vars)

        .. testoutput::

            {"timestep": 424242}
        )r   r   s    rc   get_global_varszRolloutWorker.get_global_vars  s    & rb   r  c                    |                                 }|                    di           }| j        d                             |           ||n| j                                        D ]_}| j        |                     |d          r@| j        |                             t          |fi d|	                    |          i           `| j                            |           dS )a(  Updates this worker's and all its policies' global vars.

        Updates are done using the dict's update method.

        Args:
            global_vars: The global_vars dict to update the `self.global_vars` dict
                from.
            policy_ids: Optional list of Policy IDs to update. If None, will update all
                policies on the to-be-updated workers.

        .. testcode::
            :skipif: True

            worker = ...
            global_vars = worker.set_global_vars(
            ...     {"timestep": 4242})
        r   Nnum_grad_updates)
r   popr   r[  r   r  r   on_global_var_updater   r   )r   r   r  global_vars_copygradient_updates_per_policyr=  s         rc   r  zRolloutWorker.set_global_vars1  s   . '++--&6&:&:)2'
 '
# 	67>>'	
 	
 	
 ",!7::T_=Q=Q=S=S 	 	C&.$2I2I#t2T2T.$99(  ./J/N/Ns/S/ST    	 011111rb   c                     | j         | j                                         | j        j                                        D ],}|                                }||                                 -dS )z2Releases all resources used by this RolloutWorker.N)re   r
  stopr   cacher  rZ  close)r   r_  sesss      rc   r  zRolloutWorker.stop`  sq    
 8N!!! o+2244 	 	F%%''D

		 	rb   c                 8    | j                                          dS )z4Locks this RolloutWorker via its own threading.Lock.N)r   acquirer   s    rc   lockzRolloutWorker.locko      
rb   c                 8    | j                                          dS )z6Unlocks this RolloutWorker via its own threading.Lock.N)r   releaser   s    rc   unlockzRolloutWorker.unlocks  r  rb   url
world_rank
world_sizebackendc                 P   t                               d                    ||||                     t          j                            ||||           | j                                        D ]8\  }}t          |t          t          f          st          d|          ||_        9dS )z/Join a torch process group for distributed SGD.zGJoining process group, url={}, world_rank={}, world_size={}, backend={})r  init_methodrankr  z.This policy does not support torch distributedN)rm   rn   r   r   distributedinit_process_groupr   r   r   r5   r6   r	  distributed_world_size)r   r  r  r  r  r=  r_  s          rc   setup_torch_data_parallelz'RolloutWorker.setup_torch_data_parallelw  s    
 	((.sJ
G(T(T	
 	
 	
 	,,:* 	- 	
 	
 	
  ?0022 	7 	7KCf{M&BCC  Df   -7F))	7 	7rb   c                     | j         S )z3Returns the kwargs dict used to create this worker.)r   r   s    rc   creation_argszRolloutWorker.creation_args  s    $$rb   c                 (    t          j                    S )z;Returns the hostname of the process running this evaluator.)platformnoder   s    rc   get_hostzRolloutWorker.get_host  s    }rb   c                 >    t           j                                        S )z<Returns the IP address of the node that this worker runs on.)r   utilget_node_ip_addressr   s    rc   get_node_ipzRolloutWorker.get_node_ip  s    x++---rb   c                 8    ddl m}  |t          j                  S )z7Finds a free port on the node that this worker runs on.r   )find_free_port)ray._common.network_utilsr	  socketAF_INET)r   r	  s     rc   r	  zRolloutWorker.find_free_port  s&    <<<<<<~fn---rb   )r_  r  r  r   r  r  c                X   |                      |          }|                     |||           |                     |           ||                                  | j        dk    rFt
                              d| j                    t
                              d| j                    dS dS )a  Updates the policy map (and other stuff) on this worker.

        It performs the following:
            1. It updates the observation preprocessors and updates the policy_specs
                with the postprocessed observation_spaces.
            2. It updates the policy_specs with the complete algorithm_config (merged
                with the policy_spec's config).
            3. If needed it will update the self.multi_rl_module_spec on this worker
            3. It updates the policy map with the new policies
            4. It updates the filter dict
            5. It calls the on_create_policy() hook of the callbacks on the newly added
                policies.

        Args:
            policy_dict: The policy dict to update the policy map with.
            policy: The policy to update the policy map with.
            policy_states: The policy states to update the policy map with.
            single_agent_rl_module_spec: The RLModuleSpec to add to the
                MultiRLModuleSpec. If None, the config's
                `get_default_rl_module_spec` method's output will be used to create
                the policy with.
        )r   r_  r  Nr   zBuilt policy map: zBuilt preprocessor map: )	_get_complete_policy_specs_dict_build_policy_map_update_filter_dict _call_callbacks_on_create_policyrv   rm   rn   r   r   )r   r   r_  r  r  updated_policy_dicts         rc   r  z RolloutWorker._update_policy_map  s    B #BB;OO 	+' 	 	
 	
 	
 	  !4555 >11333!!KK>T_>>???KKG43EGGHHHHH "!rb   c                 T   ddl m} t          j        |          }| j        pi | _        t          |                                          D ]\  }}t                              d	                    |                     t          |j        |          r|j        }n7| j                            d          }|                    |j        pi            | j        |_        |j        }d| j        |<   | j        r%t!          j        ||j        d          }||j        }||_        ||_        |S )aN  Processes the policy dict and creates a new copy with the processed attrs.

        This processes the observation_space and prepares them for passing to rl module
        construction. It also merges the policy configs with the algorithm config.
        During this processing, we will also construct the preprocessors dict.
        r   rZ   zCreating policy for {}F)copy_frozenN)include_multi_binary)r   r[   r   r   r   sortedr   rm   r\  r   r   ru   r   rv   r:  r   r!   get_preprocessor_for_spacer   )	r   r   r[   r  namer  merged_conf	obs_spacepreprocessors	            rc   r  z-RolloutWorker._get_complete_policy_specs_dict  sV    	JIIIII"mK88!/52!'(;(A(A(C(C!D!D !	6 !	6D+LL188>>??? +,o>> G)0 261A1Ae1A1T1T,,[-?-E2FFF (,'8K$ $5I'+Dt$) ?  ,F%).       + , >I!,K,5K))""rb   c                 X    |                                 D ]\  }}| j        |j        d<   |S )N__multi_rl_module_spec)r   r  ru   )r   r   r  r  s       rc   (_update_policy_dict_with_multi_rl_modulez6RolloutWorker._update_policy_dict_with_multi_rl_module  s@     "-!2!2!4!4 	U 	UD+;?;TK788rb   )r_  r  c          
         | j         p$t          | j        j        | j        j                  | _         t          |                                          D ]\  }}|Ht          |t          |j	        |j                  |j        |j
        |j        | j        | j                  }n|}|| j         |<   |pi                     |d          }|r|                    |           dS )a  Adds the given policy_dict to `self.policy_map`.

        Args:
            policy_dict: The MultiAgentPolicyConfigDict to be added to this
                worker's PolicyMap.
            policy: If the policy to add already exists, user can provide it here.
            policy_states: Optional dict from PolicyIDs to PolicyStates to
                restore the states of the policies being built.
        )capacitypolicy_states_are_swappableN)r  r  merged_configr:  r;  rv   rf   )r   r0   ru   policy_map_capacityr!  r  r   rC   rG   r  r:  r;  rv   rf   r   r  )r   r   r_  r  r  r  
new_policyrestore_statess           rc   r  zRolloutWorker._build_policy_map  s   $ / 
Y[4(,(O.
 .
 .
 "((9(9(;(;!<!< 	5 	5D+~8"!>#0+2D" " #."4&1&C!,!9!%!2
 
 


 $
$.DOD!+1r66tTBBN 5$$^444-	5 	5rb   c                     t          |                                          D ]E\  }}| j        |         }|j        |j        t          ||j                   t          | |           FdS )z2Updates the filter dict for the given policy_dict.N)r  r   r   agent_connectorsaction_connectorsr   ru   r   )r   r   r  r  r$  s        rc   r  z!RolloutWorker._update_filter_dict>  s~     "((9(9(;(;!<!< 	6 	6D+.J +3/7 -Z9KLLL)$5555#	6 	6rb   c                 |    | j                                         D ]!\  }}| j                            ||           "dS )zFCalls the on_create_policy callback for each policy in the policy map.)r  r_  N)r   r   r   on_create_policy)r   r  r_  s      rc   r  z.RolloutWorker._call_callbacks_on_create_policyT  sO     O1133 	K 	KLD&N++d6+JJJJ	K 	Krb   c                 \    d }t           j        j        t                    r j        j        S  j        j        dk    rd S  j        j        dk    r j        J  fdS t           j        j        t
                    r fdS t           j        j        t                    r2t           j        j                  rt           j        j                  S d j        j        v r* j        j        	                    d          d	         fd
S  | j        j                  r fdS  fdS )Nc                 J   t          | t                    rt          j                            |           snd| v rj|                     dd          \  }}	 t          j                            |          }|dS n,# t          t          f$ r t          d| d|             Y nw xY wdS )N.r   Tzmodule z% not found while trying to get input F)r   r   r   pathisfilersplit	importlibr  	find_specModuleNotFoundErrorr	  print)
class_pathmodule_path
class_namer  s       rc   valid_modulezBRolloutWorker._get_input_creator_from_config.<locals>.valid_moduleZ  s    :s++z22 :%%*4*;*;C*C*C'Z$>33K@@D'#t (+Z8   .+ . .!+. .    
 5s   !A7 7&B B r  c                 *    |                                  S r   )default_sampler_inputioctxs    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>q  s    !<!<!>!> rb   datasetc                 D    t          j        j                 |           S r   )r$   r   rv   r<  r   s    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>w  s      12E" " rb   c                 h    t          t          j        j        |           j        j                  S r   )r-   r*   ru   input_shuffle_buffer_sizer?  s    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>|  *    4;-u55t{7V" " rb   d4rlr-  c                 $    t          |           S r   )r#   )r<  env_names    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>  s    He!<!< rb   c                 T    t          t          j        j        |                     S )Nr;  )r-   rB   ru   rA  r?  s    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>  s&    DK.e<<<" " rb   c                 h    t          t          j        j        |           j        j                  S r   )r-   r(   ru   rA  rB  r?  s    rc   r   z>RolloutWorker._get_input_creator_from_config.<locals>.<lambda>  rC  rb   )
r   ru   rA  r   r   r   r   rS   rT   split)r   r8  rG  s   ` @rc   r  z,RolloutWorker._get_input_creator_from_configY  s   	 	 	& dk(,77 $	;%%[9,,>>>[9,,?...     *D11 	     *C00 	5LK6
 6
 	 &dk&8999t{))){)//44R8H<<<<<\$+,-- 	    
    rb   c                      t           j        j        t                    r j        j        S  j        j        d S  j        j        dk    r fdS  j        j        dk    r fdS  fdS )Nc                     t                      S r   )r+   r;  s    rc   r   z?RolloutWorker._get_output_creator_from_config.<locals>.<lambda>  s
     rb   r=  c                 :    t          | j        j                  S )N)compress_columns)r%   ru   output_compress_columnsr?  s    rc   r   z?RolloutWorker._get_output_creator_from_config.<locals>.<lambda>  s     (K" " " rb   logdirc                 \    t          | j        | j        j        j        j                  S N)max_file_sizerN  )r)   ry   ru   output_max_file_sizerO  r?  s    rc   r   z?RolloutWorker._get_output_creator_from_config.<locals>.<lambda>  s.    "k>!%!D	" " " rb   c                 f    t          j        j        | j        j        j        j                  S rR  )r)   ru   outputrT  rO  r?  s    rc   r   z?RolloutWorker._get_output_creator_from_config.<locals>.<lambda>  s1    ""k>!%!D	" " " rb   )r   ru   rV  r   r   s   `rc   r  z-RolloutWorker._get_output_creator_from_config  s    dk(,77 	;%%['---[9,,     [8++        rb   c                 F     fdj         s	 fd}|S S )Nc                                          |           } |          } ||            |          }t          |j        |            |S )Nr  )r  rr   rv   )r   env_ctxre   r   r}   env_wrapperrf   rt   s      rc   _make_sub_env_localz?RolloutWorker._get_make_sub_env_fn.<locals>._make_sub_env_local  sx     "55<5PPG+g&&C'S'***+c""C *T;3\   Jrb   c           	           |           }j                             |                    j        | d                     |S )NF)rv   r   r   r   )r   r   r  rv   )r   sub_envr[  r   r   s     rc   _make_sub_env_remotez@RolloutWorker._get_make_sub_env_fn.<locals>._make_sub_env_remote  sd    --l;;99$+ + ? ?%0%=%1$ !@ ! ! :    rb   )r   )r   r}   r   rt   rZ  rf   r^  r[  s   `````` @rc   r   z"RolloutWorker._get_make_sub_env_fn  sv    	 	 	 	 	 	 	 	 	, ! 	'       (' '&rb   r   )NN)F)NF)r_   N)Y__name__
__module____qualname____doc__rI   r   r   rJ   r   r   boolr   r	   rO   r   r   r   r.   r
   r   dataDatasetr   r9   r   r   r"  rQ   r   rK  methodrO  rQ  r   rf  rK   ri  r   rv  r   ry  rR   rH  r  r1   r  rN   rP   r   r   r  rH   r  r   r   r  rU   r   r  r  rG  r  r  r  r  r  rL   r  r  r  r  r  r  r  r  r  r  r  r	  rM   r  r  r  r  r  r  r  r  r   ra   rb   rc   r]   r]      ss       = =F IM.2%)!&!%@D7;;?N2 N2 N2  N2 x*(=t(CDE	N2
 *+N2 N2 c]N2 N2 #N2 heUl(;;<=N2 'tF|4N2 !ch&6!78N2 N2 N2 N2` Xi" " " Xi
 
 
 XiU/ U U U Un XiDeE5L&9!9:     SZA"5#)=#> " " " "2Ao A$ A A A AF+! +! +!  	+!
 !I+! 
tSy	+! +! +! +!` "O" O" O" O" 
~t#	$	O" O" O" O"b)F^T(N*B%CCD)F 
)F )F )F )FV XiT.1    +'A 6 +47 + + + +.gz2A56	a   : 0A 
. 
.H 
.XfEU 
. 
. 
. 
. .2#'	i* .2(,7;.2 .2i* i* i*i* T&\*i*  	i* $E?i* uoi* 34i* {+i* $*X&(O1Ld1R(SST
i* l+i* 
i* i* i* i*\ 0EI ; ; ; ; $HgY-@$AB	;
 $*X&(O1Ld1R(SST
; 
; ; ; ;F KOL L#Hgs^X-E$FGL 
L L L L5!x (Hh6O+PRV+V"WW
5
 
5 5 5 5> Y!!!15
 
o.
	X
 
 
 "!
2 ):: :.12: H%:
 
: : : :*Xfh>ABX	aX X X X"
fh>AB
	a
 
 
 
B1 1 1 1 1 1 t      
4 
 
 
 
</Et /E /E /E /E /Ef 48$+
 +
:h/0+
 +
 
h$	%	+
 +
 +
 +
` '+(,	2. 2.h,-2. d^2. !	2.
 
2. 2. 2. 2.h         0 04-2 -2-2 T(^,-2 
	-2 -2 -2 -2^ Xi         77$'7587CF7	7 7 7 7(%t % % % %#    .S . . . .. . . . . $(?C>B4I 4I 4I 04I  	4I
  X{%: ;<4I &.l%;4I 
4I 4I 4I 4Il2#52#	#2# 2# 2# 2#h5	#    $(?C.5 .5 .5 0.5  	.5
  X{%: ;<.5 
.5 .5 .5 .5`6/I 6d 6 6 6 6,K K K
8 8 8t  0+' +' +' +' +'rb   )r_   r]   )r   importlib.utilr1  r   r   r  r  r   collectionsr   typesr   typingr   r   r   r   r	   r
   r   r   r   r   r   gymnasium.spacesr   r   r   r   r  ray.rllib.connectors.utilr   r   ray.rllib.core.rl_moduler   "ray.rllib.core.rl_module.rl_moduler   ray.rllib.env.base_envr   r   ray.rllib.env.env_contextr   ray.rllib.env.env_runnerr   &ray.rllib.env.external_multi_agent_envr   ray.rllib.env.multi_agent_envr   %ray.rllib.env.wrappers.atari_wrappersr   r   ray.rllib.evaluation.metricsr   ray.rllib.evaluation.samplerr    ray.rllib.modelsr!   ray.rllib.models.preprocessorsr"   ray.rllib.offliner#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   ray.rllib.policy.policyr.   r/   ray.rllib.policy.policy_mapr0   ray.rllib.policy.sample_batchr1   r2   r3   r4   ray.rllib.policy.torch_policyr5    ray.rllib.policy.torch_policy_v2r6   ray.rllib.utilsr7   ray.rllib.utils.annotationsr8   r9   ray.rllib.utils.debugr:   r;   ray.rllib.utils.errorr<   r=   ray.rllib.utils.filterr>   r?   ray.rllib.utils.frameworkr@   rA   ray.rllib.utils.from_configrB   ray.rllib.utils.policyrC   ray.rllib.utils.sgdrD   ray.rllib.utils.tf_run_builderrE   ray.rllib.utils.tf_utilsrF   r   rG   ray.rllib.utils.typingrH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   ray.tune.registryrS   rT   ray.util.annotationsrU   ray.util.debugrV   rW   rX   ray.util.iterrY   r   r[   ray.rllib.callbacks.callbacksr\   r   r   tfvr   r  r   r_  rm   r^   __annotations__rd   r   rr   r]   ra   rb   rc   <module>r     s8         				       # # # # # #                                # " " " " " 



 0 0 0 0 0 0 0 0        8 7 7 7 7 7 ; ; ; ; ; ; ? ? ? ? ? ? ? ? 0 0 0 0 0 0 . . . . . . H H H H H H 7 7 7 7 7 7 I I I I I I I I 7 7 7 7 7 7 4 4 4 4 4 4 ) ) ) ) ) ) 7 7 7 7 7 7                          7 6 6 6 6 6 6 6 1 1 1 1 1 1            6 5 5 5 5 5 : : : : : : & & & & & & = = = = = = = = L L L L L L L L F F F F F F F F 3 3 3 3 3 3 3 3 E E E E E E E E 3 3 3 3 3 3 > > > > > > 0 0 0 0 0 0 8 8 8 8 8 8                                 J I I I I I I I * * * * * * W W W W W W W W W W 0 0 0 0 0 0 <EEEEEE;;;;;;}Rq		8	$	$
 -1) 0 0 0     	  ), :=       F x' x' x' x' x'*I x' x' x' x' x'rb   