
    &`i"                        d dl Z d dlmZ d dlZd dlZd dlmZ d dlmZm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZmZ  e            \  ZZ ed           G d de                      Zd Zd Zd Zd Z dS )    N)List)AlgorithmConfig)MultiAgentBatchSampleBatch)FaultAwareApply)try_import_torch)MetricsLogger))DEFAULT_HISTOGRAM_BOUNDARIES_SHORT_EVENTSTimerAndPrometheusLogger)EpisodeType)DeveloperAPI)Counter	Histogramalpha)	stabilityc                   F    e Zd ZdZdefdZdeej                 fdZ	d Z
dS )AggregatorActora  Runs episode lists through ConnectorV2 pipeline and creates train batches.

    The actor should be co-located with a Learner worker. Ideally, there should be one
    or two aggregator actors per Learner worker (having even more per Learner probably
    won't help. Then the main process driving the RL algo can perform the following
    execution logic:
    - query n EnvRunners to sample the environment and return n lists of episodes as
    Ray.ObjectRefs.
    - remote call the set of aggregator actors (in round-robin fashion) with these
    list[episodes] refs in async fashion.
    - gather the results asynchronously, as each actor returns refs pointing to
    ready-to-go train batches.
    - as soon as we have at least one train batch per Learner, call the LearnerGroup
    with the (already sharded) refs.
    - an aggregator actor - when receiving p refs to List[EpisodeType] - does:
    -- ray.get() the actual p lists and concatenate the p lists into one
    List[EpisodeType].
    -- pass the lists of episodes through its LearnerConnector pipeline
    -- buffer the output batches of this pipeline until enough batches have been
    collected for creating one train batch (matching the config's
    `train_batch_size_per_learner`).
    -- concatenate q batches into a train batch and return that train batch.
    - the algo main process then passes the ray.ObjectRef to the ready-to-go train batch
    to the LearnerGroup for calling each Learner with one train batch.
    configc                 h   || _         t          j                    | _        t                              d          | _        t          |j        d          | _	        |
                                | _        | j                                        | _        | j                             d d | j                  | _        t          ddt           d          | _        | j                            d	| j        j        i           t+          d
dd          | _        | j                            d	| j        j        i           t+          ddd          | _        | j                            d	| j        j        i           t+          ddd          | _        | j                            d	| j        j        i           d S )NcpuT)stats_cls_lookuproot)input_observation_spaceinput_action_spacedevice+rllib_utils_aggregator_actor_get_batch_timez)Time spent in AggregatorActor.get_batch())rllib)namedescription
boundariestag_keysr   7rllib_utils_aggregator_actor_episode_owner_died_counterz+N times ray.get() on an episode ref failed )r   r   r!   Arllib_utils_aggregator_actor_get_batch_input_episode_refs_counterz7Number of episode refs received as input to get_batch()=rllib_utils_aggregator_actor_get_batch_output_batches_counterz.Number of policy batches output by get_batch())r   platformnode_nodetorchr   _devicer	   r   metricsbuild_moduleas_multi_rl_modulebuild_learner_connector_learner_connectorr   r
   _metrics_get_batch_timeset_default_tags	__class____name__r   _metrics_episode_owner_died%_metrics_get_batch_input_episode_refs!_metrics_get_batch_output_batches)selfr   rl_module_specs      n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/algorithms/utils.py__init__zAggregatorActor.__init__3   s    ]__
||E**&3#4'
 '
 '
 &++--|6688 #'+"E"E$(#< #F #
 #
 (1>C@	(
 (
 (
$ 	$55dn-.	
 	
 	
 ,3JE,
 ,
 ,
(
 	(99dn-.	
 	
 	
 6=TQ6
 6
 6
2
 	2CCdn-.	
 	
 	
 29PH2
 2
 2
.
 	.??dn-.	
 	
 	
 	
 	
    episode_refsc                 &   t          | j                  5  t          |          dk    r(| j                            t          |                     g }	 t          j        t          j        |                    }n|# t          j	        j
        $ re |D ]_}	 |                    t          j        |                     +# t          j	        j
        $ r | j                            d           Y \w xY wY nw xY wt          d |D                       }|                     || j        | j                  }t#          d |                                D             |          }| j                            d           d d d            n# 1 swxY w Y   |S )Nr   )value   c              3   4   K   | ]}t          |          V  d S N)len).0es     r9   	<genexpr>z,AggregatorActor.get_batch.<locals>.<genexpr>   s(      55qCFF555555r;   )episodes	rl_moduler*   c                 4    i | ]\  }}|t          |          S  )r   )rC   pid	pol_batchs      r9   
<dictcomp>z-AggregatorActor.get_batch.<locals>.<dictcomp>   s3          4BCCY//     r;   )policy_batches	env_steps)r   r0   rB   r5   inctreeflattenrayget
exceptionsOwnerDiedErrorextendr4   sumr/   r,   r*   r   itemsr6   )r7   r<   rF   refrN   batchma_batchs          r9   	get_batchzAggregatorActor.get_batcht   s'   %d&BCC #	@ #	@<  1$$:>>SEVEV>WWW*,HF<(=(=>>>0 F F F' F FCF 5555>8 F F F8<<1<EEEEEFF FF 55H55555I ++!, ,  E '   FKkkmm      $	  H 266Q6???G#	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@ #	@H sZ   >F&A;:F;C4'B<;C4</C.	+C4-C.	.C41F3C44BFF
F
c                 4    | j                                         S rA   )r*   reduce)r7   s    r9   get_metricszAggregatorActor.get_metrics   s    |""$$$r;   N)r3   
__module____qualname____doc__r   r:   r   rR   	ObjectRefr\   r_   rI   r;   r9   r   r      sl         4?
 ?
 ?
 ?
 ?
B%d3=&9 % % % %N% % % % %r;   r   c                 D      fdt           j                  D             S )Nc                 <    g | ]}j         j        d j        S CPUGPU)num_cpus_per_env_runnernum_gpus_per_env_runnercustom_resources_per_env_runnerrC   _r   s     r9   
<listcomp>z+_get_env_runner_bundles.<locals>.<listcomp>   sK        	 11	
 	
 4	
  r;   )rangenum_env_runnersr   s   `r9   _get_env_runner_bundlesrs      s;        v-..   r;   c                 D      fdt           j                  D             S )Nc                 <    g | ]}j         j        d j        S rf   ) num_cpus_per_offline_eval_runner num_gpus_per_offline_eval_runner(custom_resources_per_offline_eval_runnerrm   s     r9   ro   z4_get_offline_eval_runner_bundles.<locals>.<listcomp>   sK        	 ::	
 	
 =	
  r;   )rp   num_offline_eval_runnersrr   s   `r9    _get_offline_eval_runner_bundlesrz      s;        v677   r;   c                    	 ddl m}  ||           S # t          $ r Y nw xY w| j        dk    r| j        dk    r
d| j        igS g S | j        dk    r| j        n| j        dk    rdnd}| j        || j        z   z  | j        | j        z  dg}|S )Nr   )_get_learner_bundlesrh   autor?   rg   )$ray.rllib.extensions.algorithm_utilsr|   	Exceptionnum_learners!num_aggregator_actors_per_learnernum_cpus_per_learnernum_gpus_per_learner)r   funcr   bundless       r9   r|   r|      s    UUUUUUtF||    a3a77FDEFFI &&00 	## &!++ Q  &#f&NNP&)DD	
 	
G Ns    
  c                     | j         dk    r?| j        dk    r| j        n| j        dk    rdnd}t          || j                  | j        d}n
| j        dd}|S )Nr   r}   r?   rg   )r   r   r   maxnum_cpus_for_main_process)r   r   bundles      r9   _get_main_process_bundler      s    a *f44 '' *a//  	 +V-MNN.
 

  9!DDMr;   )!r%   typingr   rP   rR   %ray.rllib.algorithms.algorithm_configr   ray.rllib.policy.sample_batchr   r   ray.rllib.utils.actor_managerr   ray.rllib.utils.frameworkr   &ray.rllib.utils.metrics.metrics_loggerr	   #ray.rllib.utils.metrics.ray_metricsr
   r   ray.rllib.utils.typingr   ray.util.annotationsr   ray.util.metricsr   r   r(   rn   r   rs   rz   r|   r   rI   r;   r9   <module>r      s           



 A A A A A A F F F F F F F F 9 9 9 9 9 9 6 6 6 6 6 6 @ @ @ @ @ @        / . . . . . - - - - - - / / / / / / / /q    D% D% D% D% D%o D% D% ! D%N      B    r;   