
    &`i%                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ  e            \  ZZZ e j        e          Ze G d de                      Z G d dej                  ZdS )    N)deprecation_warning)RolloutWorker)LearnerThread)MinibatchBuffer)SampleBatch)OldAPIStackoverride)try_import_tf)LearnerInfoBuilder)_Timerc                        e Zd ZdZ	 	 	 	 	 	 	 	 	 	 dded	ed
ededededededef fdZ ee	          dd            Z
 xZS )MultiGPULearnerThreada   Learner that can use multiple GPUs and parallel loading.

    This class is used for async sampling algorithms.

    Example workflow: 2 GPUs and 3 multi-GPU tower stacks.
    -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.

    Workers collect data from env and push it into inqueue:
    Workers -> (data) -> self.inqueue

    We also have two queues, indicating, which stacks are loaded and which
    are not.
    - idle_tower_stacks = [0, 1, 2]  <- all 3 stacks are free at first.
    - ready_tower_stacks = []  <- None of the 3 stacks is loaded with data.

    `ready_tower_stacks` is managed by `ready_tower_stacks_buffer` for
    possible minibatch-SGD iterations per loaded batch (this avoids a reload
    from CPU to GPU for each SGD iter).

    n _MultiGPULoaderThreads: self.inqueue -get()->
    policy.load_batch_into_buffer() -> ready_stacks = [0 ...]

    This thread: self.ready_tower_stacks_buffer -get()->
    policy.learn_on_loaded_batch() -> if SGD-iters done,
    put stack index back in idle_tower_stacks queue.
       N     ,  Flocal_workernum_gpustrain_batch_sizenum_multi_gpu_tower_stacksnum_sgd_iterlearner_queue_sizelearner_queue_timeoutnum_data_load_threads
_fake_gpusc                    |rt          dd           t                                          |d|||           d| _        || _        | j        j        | _        t          t          | j        	                                                    j
        | _
        t                              d                    | j
                             | j        t          | j
                  z  dk    sJ | j        t          | j
                  k    s
J d            t          t!          |                    | _        t%          j                    | _        t%          j                    | _        | j        D ]}| j                            |           t!          |	          D ]5}t/          | |dk    	          | _        | j                                         6t5          | j        |||          | _        dS )
a  Initializes a MultiGPULearnerThread instance.

        Args:
            local_worker: Local RolloutWorker holding
                policies this thread will call `load_batch_into_buffer` and
                `learn_on_loaded_batch` on.
            num_gpus: Number of GPUs to use for data-parallel SGD.
            train_batch_size: Size of batches (minibatches if
                `num_sgd_iter` > 1) to learn on.
            num_multi_gpu_tower_stacks: Number of buffers to parallelly
                load data into on one device. Each buffer is of size of
                `train_batch_size` and hence increases GPU memory usage
                accordingly.
            num_sgd_iter: Number of passes to learn on per train batch
                (minibatch if `num_sgd_iter` > 1).
            learner_queue_size: Max size of queue of inbound
                train batches to this thread.
            num_data_load_threads: Number of threads to use to load
                data into GPU memory in parallel.
        z+MultiGPULearnerThread.minibatch_buffer_sizeT)olderrorr   )r   minibatch_buffer_sizer   r   r   Nz MultiGPULearnerThread devices {}zbatch too small)share_stats)r   super__init__minibatch_bufferr   r   
policy_mapnextitervaluesdevicesloggerinfoformatlenlistrangetower_stack_indicesqueueQueueidle_tower_stacksready_tower_stacksput_MultiGPULoaderThreadloader_threadstartr   ready_tower_stacks_buffer)selfr   r   lrr   r   r   r   r   r   r   r   idxi	__class__s                 /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/execution/multi_gpu_learner_thread.pyr"   zMultiGPULearnerThread.__init__1   s   J ! 	A    	%"#%1"7 	 	
 	
 	
 !% 0+6D!7!7!9!9::;;C6==dlKKLLL$s4<'8'88A====$DL(9(9999;L999#'.H(I(I#J#J 
 "'"'+-- + 	, 	,C"&&s++++ ,-- 	' 	'A!6t!q&!R!R!RD$$&&&&
 *9#&!	*
 *
&&&    returnc                    | j                                         st          d          | j        5  | j                                        \  }}d d d            n# 1 swxY w Y   d}| j        5  t          t          | j	                            }| j
                                        D ]}| j        j        | j                            |          s)| j
        |         }|                    d|          }|                    ||           | j                            |           ||                    |          z  }|                                | _        d d d            n# 1 swxY w Y   |r| j                            |           | j                            ||| j        f           | j                            | j                                                   d S )Nz`The `_MultiGPULoaderThread` has died! Will therefore also terminate the `MultiGPULearnerThread`.r   )num_devices)offsetbuffer_index)	policy_id)r6   is_aliveRuntimeErrorload_wait_timerr8   get
grad_timerr   r,   r(   r$   keysr   is_policy_to_trainlearn_on_loaded_batchadd_learn_on_batch_resultspolicy_ids_updatedappend"get_num_samples_loaded_into_bufferfinalizelearner_infor2   r4   outqueuer   pushinqueueqsize)r9   
buffer_idxreleasedrQ   learner_info_builderpidpolicydefault_policy_resultss           r>   stepzMultiGPULearnerThread.step   s   !**,, 	/  
 ! 	H 	H#'#A#E#E#G#G J	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H 	H ./*_ 	@ 	@ $6#dlBSBS#T#T#T ++--   %8D -@@EE E -)/)E)E: *F * *& %??*c @    '..s3332==jII22 !5 = = ? ?D7	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@ 	@:  	3"&&z222 	22!	
 	
 	
 	$$T\%7%7%9%9:::::s$   AA A-C,E%%E),E))
r   Nr   r   r   r   r   r   FNr@   N)__name__
__module____qualname____doc__r   intboolr"   r	   r   r^   __classcell__)r=   s   @r>   r   r      s         <  #*+"$%(%' "Y
 Y
#Y
 Y

 Y
 %(Y
 Y
  Y
  #Y
  #Y
 Y
 Y
 Y
 Y
 Y
 Y
v Xm3; 3; 3; 3; 3; 3; 3; 3;r?   r   c                   .    e Zd ZdedefdZddZddZdS )	r5   multi_gpu_learner_threadr    c                     t           j                            |            || _        d| _        |r|j        | _        |j        | _        d S t                      | _        t                      | _        d S )NT)	threadingThreadr"   rh   daemonqueue_timer
load_timerr   )r9   rh   r    s      r>   r"   z_MultiGPULoaderThread.__init__   sf     	!!$'''(@% 	'7CD6ADOOO%xxD$hhDOOOr?   r@   Nc                 .    	 |                                   )N)_step)r9   s    r>   runz_MultiGPULoaderThread.run   s    	JJLLL	r?   c                 v   | j         }|j        }| j        5  |j                                        }d d d            n# 1 swxY w Y   |j                                        }| j        5  |                                D ]}|j        j	        |j        	                    ||          s*||         }t          |t                    r|                    ||           _||j        v r"|                    |j        |         |           	 d d d            n# 1 swxY w Y   |j                            |           d S )N)batchrD   )rh   r$   rm   rV   rI   r2   rn   rK   r   rL   
isinstancer   load_batch_into_bufferpolicy_batchesr3   r4   )r9   sr$   rs   rX   r[   r\   s          r>   rp   z_MultiGPULoaderThread._step   s   )\
  	$ 	$IMMOOE	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ 	$ (,,..
 _ 	 	!((  N5AN==c5II B #Ce[11 	11#%/ 2     E00011#237%/ 2   	 	 	 	 	 	 	 	 	 	 	 	 	 	 	( 	
  ,,,,,s"   <A A 'B DDDr_   )r`   ra   rb   r   re   r"   rq   rp    r?   r>   r5   r5      s`        '(='LP' ' ' '    -  -  -  -  -  -r?   r5   )loggingr0   rj   ray._common.deprecationr   #ray.rllib.evaluation.rollout_workerr   "ray.rllib.execution.learner_threadr   $ray.rllib.execution.minibatch_bufferr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   r	   ray.rllib.utils.frameworkr
   $ray.rllib.utils.metrics.learner_infor   ray.util.timerr   tf1tftfv	getLoggerr`   r)   r   rk   r5   rx   r?   r>   <module>r      sd         7 7 7 7 7 7 = = = = = = < < < < < < @ @ @ @ @ @ 5 5 5 5 5 5 = = = = = = = = 3 3 3 3 3 3 C C C C C C ! ! ! ! ! !}R		8	$	$ k; k; k; k; k;M k; k; k;\2- 2- 2- 2- 2-I, 2- 2- 2- 2- 2-r?   