
    &`i<                         d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZmZ d d
lmZ dZ G d d          Z G d de          Z G d de          ZdS )    N)Optional)ArrowBlockAccessor)transform_pyarrow)try_combine_chunked_columns)DelegatingBlockBuilder)memory_string)get_total_obj_store_mem_on_node)BlockBlockAccessor)log_onceg      ?c                   J    e Zd ZdefdZdefdZdefdZdefdZdefdZ	dS )	BatcherInterfaceblockc                     t                      )zmAdd a block to the block buffer.

        Args:
            block: Block to add to the block buffer.
        NotImplementedErrorselfr   s     n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/batcher.pyaddzBatcherInterface.add        "###    returnc                     t                      )zHIndicate to the batcher that no more blocks will be added to the buffer.r   r   s    r   done_addingzBatcherInterface.done_adding       !###r   c                     t                      )*Whether this Batcher has any full batches.r   r   s    r   	has_batchzBatcherInterface.has_batch!   r   r   c                     t                      )"Whether this Batcher has any data.r   r   s    r   has_anyzBatcherInterface.has_any%   r   r   c                     t                      )pGet the next batch from the block buffer.

        Returns:
            A batch represented as a Block.
        r   r   s    r   
next_batchzBatcherInterface.next_batch)   r   r   N)
__name__
__module____qualname__r
   r   boolr   r    r#   r&    r   r   r   r      s        $ $ $ $ $$T $ $ $ $$4 $ $ $ $$ $ $ $ $$E $ $ $ $ $ $r   r   c                   l    e Zd ZdZddee         defdZdefdZ	defd	Z
defd
ZdefdZdefdZdS )BatcherzChunks blocks into batches.F
batch_sizeensure_copyc                 L    || _         g | _        d| _        d| _        || _        dS )a  
        Construct a batcher that yields batches of batch_sizes rows.

        Args:
            batch_size: The size of batches to yield.
            ensure_copy: Whether batches are always copied from the underlying base
                blocks (not zero-copy views).
        r   FN)_batch_size_buffer_buffer_size_done_adding_ensure_copy)r   r.   r/   s      r   __init__zBatcher.__init__<   s1     &!'r   r   c                     t          j        |                                          dk    rP| j                            |           | xj        t          j        |                                          z  c_        dS dS )zAdd a block to the block buffer.

        Note empty block is not added to buffer.

        Args:
            block: Block to add to the block buffer.
        r   N)r   	for_blocknum_rowsr2   appendr3   r   s     r   r   zBatcher.addK   su     "5))2244q88L&&&!8!?!?!H!H!J!JJ 98r   r   c                     d| _         dS )zIIndicate to the batcher that no more blocks will be added to the batcher.TNr4   r   s    r   r   zBatcher.done_addingW   s     r   c                 \    |                                  o| j        du p| j        | j        k    S )r   N)r#   r1   r3   r   s    r   r    zBatcher.has_batch[   s2    ||~~ 
$M(9T=M(M	
r   c                     | j         dk    S )r"   r   )r3   r   s    r   r#   zBatcher.has_anya   s     1$$r   c                    |                                  s| j        r|                                 sJ | j        }| j        wt          | j                  dk    sJ | j        d         }|r>t          j        |          }|	                    d|
                                d          }g | _        d| _        |S t                      }g }| j        }| j        D ]/}t          j        |          }|dk    r|                    |           3|
                                |k    r?|                    |                                           ||
                                z  }t!          |t"                    r&t          j        t%          j        |                    }|                    |	                    d|d                     |                    |	                    ||
                                d                     d}1|| _        | xj        | j        z  c_        |o|                                 }|                                }|r>t          j        |          }|	                    d|
                                d          }|S )r%   N   r   T)copyF)r    r4   r#   r5   r1   lenr2   r   r8   slicer9   r3   r   r:   	add_blockto_block
isinstancer   r   r   will_build_yield_copybuild)r   
needs_copyr   outputleftoverneededaccessorbatchs           r   r&   zBatcher.next_batche   sv    ~~ID$5I$,,..III&
#t|$$))))LOE D%/66Au~~'7'7dCCDL !DL'))!\ 	 	E$.u55H{{ &&&&""$$..  !2!2!4!4555(++---
 h(:;; ,6)EeLL   H
   6!F!FGGGvx7H7H7J7JQV W WXXX  T--F(D(D(F(F$F
 	@!+E22EKK5>>#3#3$K??Er   N)F)r'   r(   r)   __doc__r   intr*   r6   r
   r   r   r    r#   r&   r+   r   r   r-   r-   2   s        %%( (8C= (t ( ( ( (
K 
K 
K 
K 
K!T ! ! ! !
4 
 
 
 
% % % % %8E 8 8 8 8 8 8r   r-   c                       e Zd ZdZ	 ddee         dedee         fdZdefdZe	d	ee         fd
            Z
e	d	ee         fd            Zd	efdZd	efdZd	efdZd	efdZd	efdZd	efdZd	efdZdS )ShufflingBatcherzLChunks blocks into shuffled batches, using a local in-memory shuffle buffer.Nr.   shuffle_buffer_min_sizeshuffle_seedc                 ,   |t          d          || _        || _        ||k     r|}|| _        t	          |t
          z            | _        t                      | _        d| _	        d| _
        d| _        t                      | _        d| _        d| _        dS )a  Constructs a random-shuffling block batcher.

        Args:
            batch_size: Record batch size.
            shuffle_buffer_min_size: Minimum number of rows that must be in the local
                in-memory shuffle buffer in order to yield a batch. When there are no
                more rows to be added to the buffer, the number of rows in the buffer
                *will* decrease below this value while yielding the remaining batches,
                and the final batch may have less than ``batch_size`` rows. Increasing
                this will improve the randomness of the shuffle but may increase the
                latency to the first batch.
            shuffle_seed: The seed to use for the local random shuffle.
        Nz3Must specify a batch_size if using a local shuffle.r   F)
ValueErrorr1   _shuffle_seed_min_rows_to_yield_batchrP   SHUFFLE_BUFFER_COMPACTION_RATIO_min_rows_to_trigger_compactionr   _builder_shuffle_buffer_batch_headr4   r	   _total_object_store_nbytes_total_num_rows_added_total_nbytes_added)r   r.   rS   rT   s       r   r6   zShufflingBatcher.__init__   s    & RSSS%)"Z// '1#(?%/2#&EE0
 0
, /00&*!*I*K*K'%&"#$   r   r   c           	         | j         f| j         | j        k    rVt          d          rGt          j        dt          | j                   dt          | j                    d| j         d           t          j        |          }|	                                dk    r`| j
                            |           | xj        |	                                z  c_        | xj        |                                z  c_        dS dS )zAdd a block to the shuffle buffer.

        Note empty block is not added to buffer.

        Args:
            block: Block to add to the shuffle buffer.
        Nshuffle_buffer_mem_warningz!The node you're iterating on has zA object store memory, but the shuffle buffer is estimated to use z5. If you don't decrease the shuffle buffer size from z$ rows, you might encounter spilling.r   ) _estimated_min_nbytes_in_buffersr^   r   warningswarnr   rX   r   r8   r9   r[   rD   r_   r`   
size_bytes)r   r   block_accessors      r   r   zShufflingBatcher.add   s)    1=58WWW566 X MW !@AAW W !!FGGW W
 0W W W   '077""$$q((M##E***&&.*A*A*C*CC&&$$(A(A(C(CC$$$$ )(r   r   c                 :    | j         dk    r| j        | j         z  ndS )zAReturn the average number of bytes per row added to this batcher.r   N)r_   r`   r   s    r   _average_row_nbytesz$ShufflingBatcher._average_row_nbytes   s-    
 )A-- $(BBB	
r   c                 2    | j         dS | j         | j        z  S )zReturn the estimated minimum number of bytes across all buffers.

        This includes data in both the compacted and uncompacted buffers.
        N)ri   rZ   r   s    r   rc   z1ShufflingBatcher._estimated_min_nbytes_in_buffers  s#     #+4'$*NNNr   c                     d| _         dS )zIndicate to the batcher that no more blocks will be added to the batcher.

        No more blocks should be added to the batcher after calling this.
        TNr<   r   s    r   r   zShufflingBatcher.done_adding  s    
 !r   c                 2    |                                  dk    S )z"Whether this batcher has any data.r   )	_num_rowsr   s    r   r#   zShufflingBatcher.has_any  s    ~~!##r   c                     |                                  }| j        s0|                                 | j        k    p|| j        z
  | j        k    S || j        k    S )z%Whether this batcher has any batches.)rm   r4   _num_compacted_rowsrX   r1   rZ   )r   r9   s     r   r    zShufflingBatcher.has_batch  sc    >>##  	0 ((**d.KK Wd..$2VV
 t///r   c                 T    |                                  |                                 z   S )zReturn the total number of rows that haven't been yielded yet.

        This includes rows in both the compacted and uncompacted buffers.
        )ro   _num_uncompacted_rowsr   s    r   rm   zShufflingBatcher._num_rows)  s'    
 ''))D,F,F,H,HHHr   c                     | j         dS t          dt          j        | j                                                   | j        z
            S )zBReturn number of unyielded rows in the compacted (shuffle) buffer.Nr   )r\   maxr   r8   r9   r]   r   s    r   ro   z$ShufflingBatcher._num_compacted_rows0  sL    '1 #D$899BBDDtGWW
 
 	
r   c                 4    | j                                         S )z:Return number of unyielded rows in the uncompacted buffer.)r[   r9   r   s    r   rq   z&ShufflingBatcher._num_uncompacted_rows<  s    }%%'''r   c                    |                                  s| j        r|                                 sJ |                                 dk    rk| j        s|                                 | j        k    rF| j        u| j        dk    rKt          j	        | j                  }|
                    | j        |                                          | _        | j                            | j                   | j                                        | _        t          j	        | j                                      | j                  | _        | j        | xj        dz  c_        t#          t          j	        | j                  t$                    rt'          | j                  | _        t)                      | _        d| _        | j        J t          j	        | j                                                  }t+          | j        |          }| j        }| xj        |z  c_        t          j	        | j                  
                    || j                  S )z{Get the next shuffled batch from the shuffle buffer.

        Returns:
            A batch represented as a Block.
        r   Nr@   )r    r4   r#   rq   ro   rX   r\   r]   r   r8   rC   r9   r[   rD   rH   random_shufflerW   rF   r   r   r   minr1   )r   r   buffer_sizer.   slice_starts        r   r&   zShufflingBatcher.next_batch@  s    ~~ID$5I$,,..III %%''!++ ,''))T-JJJ#/#a'')3D4HIIE+0;;(%..*:*:, ,D( ''(<===#'=#6#6#8#8D #0#:$$ $nT/00   !-""a'""'(<==?Q  Y (C4CW'X'X$ 344DM D#///#-d.BCCLLNN);77
&J&&t';<<BB)
 
 	
r   )N)r'   r(   r)   rO   r   rP   r6   r
   r   propertyri   rc   r*   r   r#   r    rm   ro   rq   r&   r+   r   r   rR   rR      s       VV. '+	&% &%SM&% "%&% sm	&% &% &% &%PD D D D D@ 
Xc] 
 
 
 X
 O(3- O O O XO!T ! ! ! !$ $ $ $ $04 0 0 0 0I3 I I I I

S 

 

 

 

(s ( ( ( (1
E 1
 1
 1
 1
 1
 1
r   rR   )rd   typingr   ray.data._internal.arrow_blockr   ray.data._internal.arrow_opsr   .ray.data._internal.arrow_ops.transform_pyarrowr   +ray.data._internal.delegating_block_builderr   !ray.data._internal.execution.utilr   ray.data._internal.utilr	   ray.data.blockr
   r   ray.utilr   rY   r   r-   rR   r+   r   r   <module>r      sY          = = = = = = : : : : : : V V V V V V N N N N N N ; ; ; ; ; ; C C C C C C / / / / / / / /       #& $ $ $ $ $ $ $ $<k k k k k k k k\Q
 Q
 Q
 Q
 Q
' Q
 Q
 Q
 Q
 Q
r   