
    &`iP?                        d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZ e G d d	                      Ze G d
 de                      Z G d de          Ze G d d                      Ze G d d                      Ze G d d                      Ze G d d                      ZdS )    N)ListOptional)DataIterator)MultiAgentBatchSampleBatchconcat_samples)unflatten_dict)DeveloperAPI)
DeviceTypeEpisodeTypec                   <    e Zd ZdZdddddededed	ed
eddfdZdS )MiniBatchIteratorBasez+The base class for all minibatch iterators.   Tr   
num_epochsshuffle_batch_per_epochnum_total_minibatchesbatchr   r   minibatch_sizer   returnNc                    dS )a  Initializes a MiniBatchIteratorBase instance.

        Args:
            batch: The input multi-agent batch.
            num_epochs: The number of complete passes over the entire train batch. Each
                pass might be further split into n minibatches (if `minibatch_size`
                provided). The train batch is generated from the given `episodes`
                through the Learner connector pipeline.
            minibatch_size: The size of minibatches to use to further split the train
                batch into per epoch. The train batch is generated from the given
                `episodes` through the Learner connector pipeline.
            num_total_minibatches: The total number of minibatches to loop through
                (over all `num_epochs` epochs). It's only required to set this to != 0
                in multi-agent + multi-GPU situations, in which the MultiAgentEpisodes
                themselves are roughly sharded equally, however, they might contain
                SingleAgentEpisodes with very lopsided length distributions. Thus,
                without this fixed, pre-computed value, one Learner might go through a
                different number of minibatche passes than others causing a deadlock.
        N )selfr   r   r   r   r   s         s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/utils/minibatch_utils.py__init__zMiniBatchIteratorBase.__init__   s	    8 	    )__name__
__module____qualname____doc__r   intboolr   r   r   r   r   r      s}        55 (,%&   	
 "&   # 
     r   r   c                   L     e Zd ZdZdddddededed	ed
eddf fdZd Z xZ	S )MiniBatchCyclicIteratoraw  This implements a simple multi-agent minibatch iterator.

    This iterator will split the input multi-agent batch into minibatches where the
    size of batch for each module_id (aka policy_id) is equal to minibatch_size. If the
    input batch is smaller than minibatch_size, then the iterator will cycle through
    the batch until it has covered `num_epochs` epochs.
    r   Tr   r   r   r   r   r   r   r   Nc                F   t                                          ||||           || _        || _        || _        || _        d |j                                        D             | _        d |j                                        D             | _	        d| _
        || _        dS )z/Initializes a MiniBatchCyclicIterator instance.)r   r   r   c                     i | ]}|d S r   r   .0mids     r   
<dictcomp>z4MiniBatchCyclicIterator.__init__.<locals>.<dictcomp>O   s    EEE#sAEEEr   c                     i | ]}|d S r'   r   r(   s     r   r+   z4MiniBatchCyclicIterator.__init__.<locals>.<dictcomp>Q   s    #R#R#RsC#R#R#Rr   r   N)superr   _batch_minibatch_size_num_epochs_shuffle_batch_per_epochpolicy_batcheskeys_start_num_covered_epochs_minibatch_count_num_total_minibatches)r   r   r   r   r   r   	__class__s         r   r   z MiniBatchCyclicIterator.__init__8   s     	!)$;	 	 	
 	
 	
 -%(?% FE)=)B)B)D)DEEE#R#Re6J6O6O6Q6Q#R#R#R  !&;###r   c              #   &  K   | j         dk    r/t          | j                                                  | j        k     s| j         dk    rD| j        | j         k     r5i }| j        j                                        D ]\  }}t          |          dk    rt          d| d          | j        |         }| j        }g }|j        r]|                    t          j                  
J d            d }t#           ||          | j        t          |          z  z            }nd }||z    ||          k    r||d          }|                    |            ||          }	|	dk    s
J d            ||	z  }d}| j        |xx         dz  cc<   | j        r|                                 ||z    ||          k    ||z   }
|
|k    r|                    |||
                    t+          |          ||<   |
| j        |<   t-          |t          | j                            }|V  | xj        dz  c_        | j         dk    r0t          | j                                                  | j        k     | j         dk    r| j        | j         k     1d S d S d S d S )	Nr   zThe batch for module_id zt is empty! This will create an infinite loop because we need to cover the same number of samples for each module_id.z}MiniBatchCyclicIterator requires SampleBatch.SEQ_LENSto be present in the batch for slicing a batch in the batch dimension B.c                 @    t          | t          j                           S N)lenr   SEQ_LENSbs    r   get_lenz1MiniBatchCyclicIterator.__iter__.<locals>.get_len   s    "1[%9#:;;;r   c                      t          |           S r;   r<   r>   s    r   r@   z1MiniBatchCyclicIterator.__iter__.<locals>.get_len   s    "1vvr   zLength of a sample must be > 0!r   )r7   minr5   valuesr0   r6   r.   r2   itemsr<   
ValueErrorr4   r/   _slice_seq_lens_in_Bgetr   r=   r!   appendr1   shuffler   r   )r   	minibatch	module_idmodule_batchsn_stepssamples_to_concatr@   sample
len_samplees              r   __iter__z MiniBatchCyclicIterator.__iter__V   s<     
 +q0007799::T=MMM +a//)D,GGG I+/;+E+K+K+M+M F+ F+'	<|$$))$I9 I I I  
 K	* .$&!  4 &'++K,@AAMM' NMM< < < "--/#l2C2CCE GG& & & 'kWW\%:%:::)!""-F%,,V444!(J%>>>+L>>>z)GA,Y7771<777 4 /$,,... 'kWW\%:%::: Kq55%,,\!A#->??? (66G'H'H	)$)*I&&
 (	3t{3C3CDDIOOO!!Q&!!q +q0007799::T=MMM +a//)D,GGGGG 0/GG 0/r   )
r   r   r   r    r   r!   r"   r   rT   __classcell__r8   s   @r   r$   r$   .   s          (,%&< < << 	<
 < "&<  #< 
< < < < < <<]' ]' ]' ]' ]' ]' ]'r   r$   c                   *     e Zd Zdef fdZd Z xZS )MiniBatchDummyIteratorr   c                 J     t                      j        |fi | || _        d S r;   )r-   r   r.   )r   r   kwargsr8   s      r   r   zMiniBatchDummyIterator.__init__   s-    ))&)))r   c              #      K   | j         V  d S r;   )r.   )r   s    r   rT   zMiniBatchDummyIterator.__iter__   s      kr   )r   r   r   r   r   rT   rU   rV   s   @r   rX   rX      sS        o            r   rX   c            	       >    e Zd Zdedededee         fdZdefdZ	dS )	MiniBatchRayDataIteratoriteratordevicer   	num_itersc                    || _         d |                                D             | _         | j         j        d||d| j        | _        t          | j                  | _        || _        d S )Nc                 &    i | ]\  }}|d k    ||S )return_stater   )r)   kvs      r   r+   z5MiniBatchRayDataIterator.__init__.<locals>.<dictcomp>   s(    OOOA1;N;N1;N;N;Nr   )
batch_sizer_   r   )	_iteratorrE   _kwargsiter_torch_batches_batched_iterableiter_epoch_iterator
_num_iters)r   r^   r_   r   r`   rZ   s         r   r   z!MiniBatchRayDataIterator.__init__   s     "OOOOO "C!B "
%"
 "
 l"
 "
  $D$:;;#r   r   c           	   #     K   d}| j         || j         k     r| j        D ]}|dz  }t          |          }t          d |                                D             t          d |                                D                                 }|V  | j         r|| j         k    r n#t          | j                  | _        | j         sd S | j         || j         k     d S d S )Nr   r   c                 4    i | ]\  }}|t          |          S r   )r   )r)   rL   module_datas      r   r+   z5MiniBatchRayDataIterator.__iter__.<locals>.<dictcomp>   s6       2I{ ";{#;#;  r   c           	   3      K   | ]?}t          t          t          |                                                              V  @d S r;   )r<   nextrk   rD   )r)   rp   s     r   	<genexpr>z4MiniBatchRayDataIterator.__iter__.<locals>.<genexpr>   sZ       " "' Dk&8&8&:&:!;!;<<==" " " " " "r   )	env_steps)	rm   rl   r	   r   rE   sumrD   rk   rj   )r   	iterationr   s      r   rT   z!MiniBatchRayDataIterator.__iter__   s!     	o%T_)D)D-  Q	&u--' 6;kkmm   " " "+0<<>>" " "  	 	 	  ? yDO'C'CE (,D,B'C'C$ E9 o%T_)D)D)D)D)D)Dr   N)
r   r   r   r   r   r!   r   r   r   rT   r   r   r   r]   r]      so        $ $ 	$
 $ C=$ $ $ $0/      r   r]   c                   (    e Zd ZdZdedefdZd ZdS )ShardBatchIteratorzIterator for sharding batch into num_shards batches.

    Args:
        batch: The input multi-agent batch.
        num_shards: The number of shards to split the batch into.

    Yields:
        A MultiAgentBatch of size len(batch) / num_shards.
    r   
num_shardsc                 "    || _         || _        d S r;   )r.   _num_shards)r   r   ry   s      r   r   zShardBatchIterator.__init__  s    %r   c              #     K   t          | j                  D ]}i }| j        j                                        D ]z\  }}t          j        t          |          | j        z            }||z  }t          ||z   t          |                    }|t          |          t          |                   ||<   {t          |t          |                    }|V  d S r;   )ranger{   r.   r2   rE   mathceilr<   rC   r!   r   )	r   ibatch_to_sendpid	sub_batchrf   startend	new_batchs	            r   rT   zShardBatchIterator.__iter__
  s      t'(( 	 	A
 M"&+"<"B"B"D"D F FY!Ys9~~8H'HII
"Q%*,c)nn==%.s5zzCHH/D%Ec""'s:GGIOOOO	 	r   N)r   r   r   r    r   r!   r   rT   r   r   r   rx   rx      sO         &o &3 & & & &    r   rx   c                   Z    e Zd ZdZ	 d	dee         dedee         fdZdee         fdZ	dS )
ShardEpisodesIteratorzMIterator for sharding a list of Episodes into `num_shards` lists of Episodes.Nepisodesry   len_lookback_bufferc                 F   t          |t          d          | _        || _        || _        t          d |D                       | _        d t          | j                  D             | _        | j        }t          | j                  D ]}|||z
  z  }|| j        |<   ||z  }dS )a  Initializes a ShardEpisodesIterator instance.

        Args:
            episodes: The input list of Episodes.
            num_shards: The number of shards to split the episodes into.
            len_lookback_buffer: An optional length of a lookback buffer to enforce
                on the returned shards. When spitting an episode, the second piece
                might need a lookback buffer (into the first piece) depending on the
                user's settings.
        T)keyreversec              3   4   K   | ]}t          |          V  d S r;   rB   )r)   rS   s     r   rs   z1ShardEpisodesIterator.__init__.<locals>.<genexpr>2  s(       : :AQ : : : : : :r   c                     g | ]}d S r'   r   r)   _s     r   
<listcomp>z2ShardEpisodesIterator.__init__.<locals>.<listcomp>3  s    CCCaCCCr   N)	sortedr<   	_episodesr{   _len_lookback_bufferru   _total_lengthr}   _target_lengths)r   r   ry   r   remaining_lengthrN   len_s          r   r   zShardEpisodesIterator.__init__  s       c4@@@%$7!  : : : : :::CC51A+B+BCCC-t'(( 	% 	%A#
Q7D&*D #$	% 	%r   r   c              #     K   d t          | j                  D             }d t          | j                  D             }d}|t          | j                  k     rh| j        |         }|                    t          |                    }||         t          |          z   | j        |         k    r>||                             |           ||xx         t          |          z  cc<   |dz  }n| j        |         ||         z
  }|dk    rw|d|         |                    t          |d          | j	                  }}||                             |           ||xx         t          |          z  cc<   || j        |<   n(|dk    sJ ||                             |           |dz  }|t          | j                  k     h|D ]}	|	V  dS )a2  Runs one iteration through this sharder.

        Yields:
            A sub-list of Episodes of size roughly `len(episodes) / num_shards`. The
            yielded sublists might have slightly different total sums of episode
            lengths, in order to not have to drop even a single timestep.
        c                     g | ]}g S r   r   r   s     r   r   z2ShardEpisodesIterator.__iter__.<locals>.<listcomp>B  s    8881B888r   c                     g | ]}d S r'   r   r   s     r   r   z2ShardEpisodesIterator.__iter__.<locals>.<listcomp>C  s    6661666r   r   r   N)r   )
r}   r{   r<   r   indexrC   r   rI   slicer   )
r   sublistslengthsepisode_indexepisode	min_indexr   
slice_partremaining_partsublists
             r   rT   zShardEpisodesIterator.__iter__:  s      98d&6 7 788866eD$455666c$.1111n]3Gc'll33I y!CLL0D4H4SSS#**7333	"""c'll2"""" $(#7	#BWYEW#W #a''   1!1 12  !"2D99040I &   !/J Y'..z:::I&&&#j//9&&&4BDN=11+q0000Y'..w777!Q&M= c$.1111@   	 	GMMMM	 	r   r;   )
r   r   r   r    r   r   r!   r   r   rT   r   r   r   r   r     s}        WW .2	% %{#% % &c]	% % % %6-${+ - - - - - -r   r   c                   $    e Zd ZdZdefdZd ZdS )ShardObjectRefIteratora,  Iterator for sharding a list of ray ObjectRefs into num_shards sub-lists.

    Args:
        object_refs: The input list of ray ObjectRefs.
        num_shards: The number of shards to split the references into.

    Yields:
        A sub-list of ray ObjectRefs with lengths as equal as possible.
    ry   c                 "    || _         || _        d S r;   )_object_refsr{   )r   object_refsry   s      r   r   zShardObjectRefIterator.__init__v  s    '%r   c              #      K   t          | j                  }|| j        z  }|| j        z  }d}t          | j                  D ]%}||z   ||k     rdndz   }| j        ||         V  |}&d S )Nr   r   )r<   r   r{   r}   )r   nsublist_sizeremaining_elementsr   r   r   s          r   rT   zShardObjectRefIterator.__iter__z  s      !""D,,!11t'(( 	 	A,&q3E/E/E!!1MC#E#I....EE	 	r   N)r   r   r   r    r!   r   rT   r   r   r   r   r   j  sH         & & & & &    r   r   )r~   typingr   r   ray.datar   ray.rllib.policy.sample_batchr   r   r   ray.rllib.utilsr	   ray.rllib.utils.annotationsr
   ray.rllib.utils.typingr   r   r   r$   rX   r]   rx   r   r   r   r   r   <module>r      s    ! ! ! ! ! ! ! ! ! ! ! ! ! ! V V V V V V V V V V * * * * * * 4 4 4 4 4 4 : : : : : : : :        D D' D' D' D' D'3 D' D' D'N    2    7 7 7 7 7 7 7 7t        @ K K K K K K K K\          r   