
    &`i                         d dl Z d dlmZmZmZmZmZmZmZ d dl	Z
d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ erd d	lmZ  e j        e          Z G d
 d          Z G d d          ZdS )    N)TYPE_CHECKINGAnyDictListOptionalTupleUnion)BatchFormat)	RefBundle)	StatsDict)#convert_bytes_to_human_readable_str)Block	BlockType)DataContext)BlockMetadataWithSchemac                       e Zd ZdZdZdZddee         dee         fdZe	de
d	ed
e
deeedf                  fd            Ze	dddee         dedeedf         fd            Ze	dedee         fd            ZdS )ExchangeTaskSpeca  
    An interface to specify the exchange map and reduce tasks.

    Subclasses should implement the `map` and `reduce` static methods.
    `map` method is to transform one input block into multiple output blocks.
    `reduce` is to combine multiple map output blocks. Both methods are
    single-task operations. See `ExchangeScheduler` for how to distribute
    the operations across multiple tasks.

    Any custom arguments for `map` and `reduce` methods should be specified by
    setting `map_args` and `reduce_args`.

    The concept here is similar to the exchange operator described in
    "Volcano - An Extensible and Parallel Query Evaluation System"
    (https://dl.acm.org/doi/10.1109/69.273032).
    zShuffle MapzShuffle ReduceNmap_argsreduce_argsc                     |pg | _         |pg | _        t          | j         t                    sJ t          | j        t                    sJ d S N)	_map_args_reduce_args
isinstancelist)selfr   r   s      /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/planner/exchange/interfaces.py__init__zExchangeTaskSpec.__init__(   sQ    !R'-2$.$/////$+T2222222    idxblockoutput_num_blocksreturnr   c                     t           )z
        Map function to be run on each input block.

        Returns list of [BlockMetadata, Block1, Block2, ..., BlockN].
        NotImplementedError)r    r!   r"   s      r   mapzExchangeTaskSpec.map.   
     "!r   F)partial_reducemapper_outputsr)   c                     t           )a  
        Reduce function to be run for each output block.

        Args:
            mapper_outputs: List of map output blocks to reduce.
            partial_reduce: Whether should partially or fully reduce.

        Returns:
            The reduced block and its metadata.
        r%   )r)   r*   s     r   reducezExchangeTaskSpec.reduce;   s
     "!r   batch_formatc                 v    | t           j        k    rt          j        S | t           j        k    rt          j        S d S r   )r
   ARROWr   PANDAS)r-   s    r   _derive_target_block_typez*ExchangeTaskSpec._derive_target_block_typeL   s6    ;,,,?"[///## 4r   )NN)__name__
__module____qualname____doc__MAP_SUB_PROGRESS_BAR_NAMEREDUCE_SUB_PROGRESS_BAR_NAMEr   r   r   staticmethodintr   r	   r'   boolr   r,   strr   r   r1    r   r   r   r      s:        " !.#3 3 3c 3S	 3 3 3 3 
"
"
" 
" 
eE445	6	
" 
" 
" \
"   %" " "e"" 
u//	0" " " \"   8K    \  r   r   c                       e Zd ZdZdefdZ	 	 	 ddee         dede	e
eef                  de	e
eef                  d	e	e         d
eee         ef         fdZddZdeded
dfdZdS )ExchangeTaskSchedulerzb
    An interface to schedule exchange tasks (`exchange_spec`) for multi-nodes
    execution.
    exchange_specc                 N    || _         t          j                    j        | _        dS )zc
        Args:
            exchange_spec: The implementation of exchange tasks to execute.
        N)_exchange_specr   get_current!warn_on_driver_memory_usage_bytes)r   r?   s     r   r   zExchangeTaskScheduler.__init__^   s+    
 , #%%G 	...r   Nrefsr"   map_ray_remote_argsreduce_ray_remote_argswarn_on_driver_memory_usager#   c                     t           )z=
        Execute the exchange tasks on input `refs`.
        r%   )r   rD   r"   rE   rF   rG   s         r   executezExchangeTaskScheduler.executek   r(   r   c                     t           j        j        j        j        }|                                }|                     |dt          |           d           d S )Nz
More than a%   of driver memory used to store Ray Data block data and metadata. This job may exit if driver memory is insufficient.

This can happen when many tiny blocks are created. Check the block size using Dataset.stats() and see https://docs.ray.io/en/latest/data/performance-tips.html for mitigation.)ray_privateworkerglobal_workercore_worker!get_local_memory_store_bytes_usedrG   r   )r   ray_core_workerlocal_memory_store_bytes_useds      r   %warn_on_high_local_memory_store_usagez;ExchangeTaskScheduler.warn_on_high_local_memory_store_usagex   sm    ,-;G==?? 	& 	(()23PQQ  
	
 
	
 
	
 
	
 
	
r   memory_usage_byteslog_strc                 z    | j         d S || j         k    r&t                              |           |dz  | _         d S d S )N   )rC   loggerwarning)r   rT   rU   s      r   rG   z1ExchangeTaskScheduler.warn_on_driver_memory_usage   sN     19F FFFNN7###5G!5KD222 GFr   )NNN)r#   N)r2   r3   r4   r5   r   r   r   r   r9   r   r   r;   r   r   r   rI   rS   rG   r<   r   r   r>   r>   X   s        
H&6 H H H H" 9=;?59" "9o" " &d38n5	"
 !)c3h 8" &.c]" 
tI	)	*" " " "
 
 
 
"	L"%	L03	L		L 	L 	L 	L 	L 	Lr   r>   )loggingtypingr   r   r   r   r   r   r	   ray._private.workerrK   "ray.air.util.data_batch_conversionr
   'ray.data._internal.execution.interfacesr   ray.data._internal.statsr   ray.data._internal.utilr   ray.data.blockr   r   ray.data.contextr   r   	getLoggerr2   rX   r   r>   r<   r   r   <module>rd      s[    I I I I I I I I I I I I I I I I I I     : : : : : : = = = = = = . . . . . . G G G G G G + + + + + + + + ( ( ( ( ( ( 7666666		8	$	$B B B B B B B BJ:L :L :L :L :L :L :L :L :L :Lr   