
    &`i                          d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	m
Z
mZ d dlmZ e G d d                      Z G d	 d
          ZdS )    N)	dataclass)AnyOptional)DelegatingBlockBuilder)BlockBlockAccessor	DataBatch)MAX_SAFE_BLOCK_SIZE_FACTORc                       e Zd ZU dZee         ed<   dZee         ed<   dZe	ed<   d Z
e	 	 	 d	dee         dee         de	ded          fd            ZdS )
OutputBlockSizeOptionNtarget_max_block_sizetarget_num_rows_per_blockFdisable_block_shapingc                 V    | j         | j        | j        st          d          d S d S d S )NzOEither `target_max_block_size` or `target_num_rows_per_block` must be specified)r   r   r   
ValueErrorselfs    t/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/output_buffer.py__post_init__z#OutputBlockSizeOption.__post_init__   sL    &..6. 7 $  	 /.6666    returnc                 6    |||sd S t          |||          S )N)r   r   r   )r   )clsr   r   r   s       r   ofzOutputBlockSizeOption.of   s?     "))1) 2 4(&;*C&;   r   )NNF)__name__
__module____qualname__r   r   int__annotations__r   r   boolr   classmethodr    r   r   r   r   
   s         +/8C=////3x}333"'4'''	 	 	  0437&+	 '} $,C=  $	
 
)	*   [  r   r   c                       e Zd ZdZdee         fdZdeddfdZde	ddfd	Z
d
eddfdZddZdefdZdefdZdee         fdZdee         fdZdefdZd
edefdZd
edefdZdefdZdS )BlockOutputBuffera$  Generates output blocks of a given size or number of rows given a stream of
    inputs.

    This class is used to turn a stream of items / blocks of arbitrary size
    into a stream of blocks of target max block size or
    target max rows per block. The caller should check ``has_next()`` after each
    ``add()`` call, and call ``next()`` to get the next block when ``has_next()``
    returns True.

    When all items have been added, the caller must call ``finalize()`` and
    then check ``has_next()`` one last time.

    Examples:
        >>> from ray.data._internal.output_buffer import BlockOutputBuffer
        >>> udf = ... # doctest: +SKIP
        >>> generator = ... # doctest: +SKIP
        >>> # Yield a stream of output blocks.
        >>> output_block_size_option = OutputBlockSizeOption(target_max_block_size=500 * 1024 * 1024) # doctest: +SKIP
        >>> output = BlockOutputBuffer(output_block_size_option) # doctest: +SKIP
        >>> for item in generator(): # doctest: +SKIP
        ...     output.add(item) # doctest: +SKIP
        ...     if output.has_next(): # doctest: +SKIP
        ...         yield output.next() # doctest: +SKIP
        >>> output.finalize() # doctest: +SKIP
        >>> if output.has_next() # doctest: +SKIP
        ...     yield output.next() # doctest: +SKIP
    output_block_size_optionc                 V    || _         t                      | _        d| _        d| _        d S NF)_output_block_size_optionr   _buffer
_finalized_has_yielded_blocks)r   r%   s     r   __init__zBlockOutputBuffer.__init__R   s,    )A&-//#(   r   itemr   Nc                 L    | j         rJ | j                            |           dS )z(Add a single item to this output buffer.N)r*   r)   add)r   r-   s     r   r/   zBlockOutputBuffer.addX   s,    ?"""r   batchc                 L    | j         rJ | j                            |           dS )z'Add a data batch to this output buffer.N)r*   r)   	add_batch)r   r0   s     r   r2   zBlockOutputBuffer.add_batch]   ,    ?"""u%%%%%r   blockc                 L    | j         rJ | j                            |           dS )z'Add a data block to this output buffer.N)r*   r)   	add_blockr   r4   s     r   r6   zBlockOutputBuffer.add_blockb   r3   r   c                 &    | j         rJ d| _         dS )z.Must be called once all items have been added.TN)r*   r   s    r   finalizezBlockOutputBuffer.finalizeg   s    ?"""r   c                     | j         j        rdS |                                 d uo.| j                                        |                                 k    S r'   )r(   r   _max_num_rows_per_blockr)   num_rowsr   s    r   _exceeded_buffer_row_limitz,BlockOutputBuffer._exceeded_buffer_row_limitl   sX    )? 	5 ((**$6 I%%''$*F*F*H*HH	
r   c                     | j         j        rdS |                                 d uo.| j                                        |                                 k    S r'   )r(   r   _max_bytes_per_blockr)   get_estimated_memory_usager   s    r   _exceeded_buffer_size_limitz-BlockOutputBuffer._exceeded_buffer_size_limitu   sX    )? 	5 %%''t3 X7799D<U<U<W<WW	
r   c                 H    | j         d S | j         j        rd S | j         j        S N)r(   r   r   r   s    r   r;   z)BlockOutputBuffer._max_num_rows_per_block~   s/    )14)? 	4-GGr   c                 H    | j         d S | j         j        rd S | j         j        S rC   )r(   r   r   r   s    r   r?   z&BlockOutputBuffer._max_bytes_per_block   s/    )14)? 	4-CCr   c                    | j         r%| j         p| j                                        dk    S | j        dS | j        j        r| j                                        dk    S |                                 p|                                 S )z6Returns true when a complete output block is produced.r   NF)r*   r+   r)   r<   r(   r   r=   rA   r   s    r   has_nextzBlockOutputBuffer.has_next   s     ? 
	///N4<3H3H3J3JQ3NN+3
 5+A 	/<((**Q....00VD4T4T4V4VVr   c                     |                                  d uo1|                                t          |                                  z  k    S rC   )r?   
size_bytesr
   r7   s     r    _exceeded_block_size_slice_limitz2BlockOutputBuffer._exceeded_block_size_slice_limit   sL    
 %%''t3 H  "")D,E,E,G,GGH	
r   c                     |                                  d uo)|                                |                                  k    S rC   )r;   r<   r7   s     r   _exceeded_block_row_slice_limitz1BlockOutputBuffer._exceeded_block_row_slice_limit   sA    
 ((**$6 B  4#?#?#A#AA	
r   c                 :   |                                  sJ | j                                        }t          j        |          }d}d}|                     |          r|                                 }n|                     |          r|                                dk    s
J d            |	                                |                                z  }t          dt          j        |                                 |z                      }|Z||                                k     rB|                    d|d          }|                    ||                                d          }t                      | _        || j                            |           d| _        |S )z'Returns the next complete output block.Nr   zBlock may not be empty   F)copyT)rF   r)   buildr   	for_blockrK   r;   rI   r<   rH   maxmathceilr?   slicer   r6   r+   )r   r4   accessorblock_remaindertarget_num_rowsnum_bytes_per_rows         r   nextzBlockOutputBuffer.next   s   }}""$$ *511//99 	"::<<OO228<< 	$$&&***,D*** ( 3 3 5 58I8I8K8K K!49T6688;LLMM O &?X=N=N=P=P+P+PNN1oENBBE&nn!2!2!4!45 -  O .//&L""?333#' r   )r   N)r   r   r   __doc__r   r   r,   r   r/   r	   r2   r   r6   r9   r    r=   rA   r   r;   r?   rF   r   rI   rK   rY   r"   r   r   r$   r$   5   s        8):O1P ) ) ) )     
&y &T & & & &
&u & & & & &
   

D 
 
 
 

T 
 
 
 
H# H H H HDhsm D D D DW$ W W W W$
m 
 
 
 
 

] 
t 
 
 
 
e      r   r$   )rR   dataclassesr   typingr   r   +ray.data._internal.delegating_block_builderr   ray.data.blockr   r   r	   ray.data.contextr
   r   r$   r"   r   r   <module>r`      s     ! ! ! ! ! !                 N N N N N N : : : : : : : : : : 7 7 7 7 7 7 ' ' ' ' ' ' ' 'T_ _ _ _ _ _ _ _ _ _r   