
    &`i                     \    d dl mZ d dlmZmZmZ d dlmZ d dlm	Z	 	  G d de	          Z
dS )    )deque)DequeListTuple)	RefBundle)BaseRefBundlerc                       e Zd ZdZdefdZddefdZdefdZ	d	efd
Z
d	eee         ef         fdZd Zd Zd	efdZdS )StreamingRepartitionRefBundlerzNIncrementally builds task inputs to produce multiples of target-sized outputs.target_num_rows_per_blockc                     |dk    s
J d            || _         t                      | _        t                      | _        g | _        d| _        d S )Nr   zEtarget_num_rows_per_block must be positive for streaming repartition.)_target_num_rowsr   _pending_bundles_ready_bundles_consumed_input_bundles_total_pending_rows)selfr   s     |/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/streaming_repartition.py__init__z'StreamingRepartitionRefBundler.__init__"   sT    %)))R *)) 927''058:$#$       Fflush_remainingc                    | j         | j        k    rT| j        d                                         | j         | j        z  z
  }|dk    sJ t	          | j                  }d }|dk    r_||d                                         k     rA|                                }|                    |          \  }}|                    |           | j                            t          j
        |                     | j                                         d| _         |rT|                                dk    r<| j                            |           | xj         |                                z  c_         |r^| j         dk    rU| j                            t          j
        | j                             | j                                         d| _         d S d S d S )Nr   )r   r   r   num_rowslistpopsliceappendr   r   merge_ref_bundlesclear)r   r   rows_needed_from_last_bundlepending_bundlesremaining_bundlelast_bundlesliced_bundles          r   _try_build_ready_bundlez6StreamingRepartitionRefBundler._try_build_ready_bundle,   s   #t'<<<%b)2244*T-BBC ) 014444"4#899O#,q000?23F3O3O3Q3QQQ-11332=2C2C03 3//  &&}555&&y'B?'S'STTT!'')))'(D$ H$4$=$=$?$?!$C$C%,,-=>>>((,<,E,E,G,GG(( 	)t7!;;&&+D,ABB   !'')))'(D$$$	) 	);;r   
ref_bundlec                     | xj         |                                z  c_         | j                            |           |                                  | j                            |           d S N)r   r   r   r   r%   r   )r   r&   s     r   
add_bundlez)StreamingRepartitionRefBundler.add_bundleK   sh      J$7$7$9$99  $$Z000$$&&&$++J77777r   returnc                 2    t          | j                  dk    S )Nr   )lenr   r   s    r   
has_bundlez)StreamingRepartitionRefBundler.has_bundleQ   s    4&''!++r   c                 T    | j         }g | _         || j                                        fS r(   )r   r   popleft)r   consumed_input_bundless     r   get_next_bundlez.StreamingRepartitionRefBundler.get_next_bundleT   s/     "&!=')$%t':'B'B'D'DDDr   c                 f    t          | j                  dk    r|                     d           d S d S )Nr   T)r   )r,   r   r%   r-   s    r   done_adding_bundlesz2StreamingRepartitionRefBundler.done_adding_bundles[   s=    t$%%))(((>>>>> *)r   c                 |    t          d | j        D                       t          d | j        D                       z   S )Nc              3   4   K   | ]}t          |          V  d S r(   r,   .0bundles     r   	<genexpr>z<StreamingRepartitionRefBundler.num_blocks.<locals>.<genexpr>`   s(      CC63v;;CCCCCCr   c              3   4   K   | ]}t          |          V  d S r(   r7   r8   s     r   r;   z<StreamingRepartitionRefBundler.num_blocks.<locals>.<genexpr>`   sA       J
 J
"CKKJ
 J
 J
 J
 J
 J
r   sumr   r   r-   s    r   
num_blocksz)StreamingRepartitionRefBundler.num_blocks_   s[    CCT-BCCCCCc J
 J
&*&9J
 J
 J
 G
 G
 
 	
r   c                 |    t          d | j        D                       t          d | j        D                       z   S )Nc              3   >   K   | ]}|                                 V  d S r(   
size_bytesr8   s     r   r;   z<StreamingRepartitionRefBundler.size_bytes.<locals>.<genexpr>e   s.      KK66$$&&KKKKKKr   c              3   >   K   | ]}|                                 V  d S r(   rB   r8   s     r   r;   z<StreamingRepartitionRefBundler.size_bytes.<locals>.<genexpr>e   sG       R
 R
$*FR
 R
 R
 R
 R
 R
r   r=   r-   s    r   rC   z)StreamingRepartitionRefBundler.size_bytesd   s[    KKT5JKKKKKc R
 R
.2.AR
 R
 R
 O
 O
 
 	
r   N)F)__name__
__module____qualname____doc__intr   boolr%   r   r)   r.   r   r   r2   r4   r?   rC    r   r   r
   r
      s        XX%# % % % %) )t ) ) ) )>8Y 8 8 8 8,D , , , ,E	tI	)	*E E E E? ? ?
 
 


C 
 
 
 
 
 
r   r
   N)collectionsr   typingr   r   r   'ray.data._internal.execution.interfacesr   3ray.data._internal.execution.operators.map_operatorr   r
   rK   r   r   <module>rP      s          % % % % % % % % % % = = = = = = N N N N N N0H
 H
 H
 H
 H
^ H
 H
 H
 H
 H
r   