
    &`i/                        d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZmZ d dlZd dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ d dlmZmZmZ d d	lmZ d d
l m!Z! d dl"m#Z#  ej$        e%          Z&de	e!e                  dee'e'e'f         fdZ(	 	 d,dee!e                  dede
e         de
ee'eg e'f         f                  dee         f
dZ)	 	 	 	 	 	 d-dee         de
e         de
e'         de*de
e'         de
e'         de*dee         fdZ+	 d.dee         de
e,         de
e         dee         fdZ-	 d.dee         de
eegef                  de
e         dee         fd Z.	 d.dee         d!eegef         de
e         dee         fd"Z/dee         dee         fd#Z0d$Z1 G d% d&e          Z2 G d' d(e          Z3 ej4        d )           G d* d+                      Z5dS )/    N)nullcontext)AnyCallableIteratorListOptionalTupleUnion)ActorHandle)BatcherShufflingBatcher)BatchBatchMetadataBlockPrefetcherCollatedBatch)DatasetStats)BlockBlockAccessor	DataBatch)DataContext)	ObjectRef)NodeAffinitySchedulingStrategyrefsreturnc                    t          j                                                    t           j        j                                        }|j        rt           j                            |           }d |	                                D             }t          fd|D                       }t          d |D                       }t          |          |z
  |z
  }|||fS dS )aG  Given a list of object references, returns how many are already on the local
    node, how many require fetching from another node, and how many have unknown
    locations. If `DataContext.get_current().enable_get_object_locations_for_metrics` is
    False, this will return `(0, 0, 0)` as getting object locations is disabled.c                     g | ]
}|d          S )node_ids ).0locs     z/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/block_batching/util.py
<listcomp>z'_calculate_ref_hits.<locals>.<listcomp>#   s    !K!K!Kc#j/!K!K!K    c              3       K   | ]}|v V  	d S Nr   )r   r   current_node_ids     r!   	<genexpr>z&_calculate_ref_hits.<locals>.<genexpr>$   s(      EE8?h.EEEEEEr#   c              3      K   | ]}|d V  	dS )   Nr   )r   r   s     r!   r'   z&_calculate_ref_hits.<locals>.<genexpr>%   s'      ??Xh?q??????r#   )r   r   r   )rayget_runtime_contextget_node_iddatar   get_current'enable_get_object_locations_for_metricsexperimentalget_object_locationsvaluessumlen)r   ctxlocsnodeshitsunknownsmissesr&   s          @r!   _calculate_ref_hitsr;      s    
 -//;;==O
(

*
*
,
,C
2 &44T::!K!KT[[]]!K!K!KEEEEuEEEEE???????Ud"X-VX%%7r#   block_ref_iterr5   statsmax_get_batch_sizec              #   \  	
K   dd	ddt           ffd}g 
dt          t                   f	
fd}| D ]C}
                    |           t	          
           |            k    r |            D ]}|V  D |            D ]}|V  r_        	_        _        dS dS )a%  Resolves the block references for each logical batch.

    Args:
        block_ref_iter: An iterator over block object references.
        ctx: The ``DataContext`` to use.
        stats: An optional stats object to recording block hits and misses.
        max_get_batch_size: Maximum number of block references to resolve in a
            single ``ray.get()`` call. This can be an integer override or a callable
            that returns the desired batch size dynamically. If ``None``, defaults to
            ``ctx.iter_get_block_batch_size``.
    r   r   c                  r    t                    r             } n} | | nj        }t          d|          S )Nr)   )callableiter_get_block_batch_sizemax)override	candidater5   r>   s     r!   _get_effective_batch_sizez5resolve_block_refs.<locals>._get_effective_batch_sizeA   sJ    &'' 	*))++HH)H ( 4HH#:W	1i   r#   c                  n   sg S t                    \  } }}| |cxk    r|cxk    rdk    r
n ndxxndk    r| z  |z  |z  rj                                        nt                      5  t	          j                  }d d d            n# 1 swxY w Y                                    |S )N)r;   
iter_get_stimerr   r*   getclear)	current_hitcurrent_misscurrent_unknownblocksr8   r:   pendingr=   r9   s	       r!   _resolve_pendingz,resolve_block_refs.<locals>._resolve_pendingM   s'    	I5H5Q5Q2\?,????/????R?????'))D)6HHRZZKDl"F'H).AU##%%%KMM 	& 	&WW%%F	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	& 	s   5BBBN)intr   r   appendr4   iter_blocks_localiter_blocks_remoteiter_unknown_location)r<   r5   r=   r>   rF   rR   	block_refblockr8   r:   rQ   r9   s    ```    @@@@r!   resolve_block_refsrZ   ,   sA     " DFH!s ! ! ! ! ! ! ! ')Gd5k          ( $  	y!!!w<<446666))++  !!##   /"&#) &.###/ /r#   F
block_iter
batch_size	drop_lastshuffle_buffer_min_sizeshuffle_seedensure_copyc              #     K   |t          |||          }nt          ||          }fd}d}	| D ]}
|                    |
           |                                rp |            5  |                                }ddd           n# 1 swxY w Y   t          t          |	          |          V  |	dz  }	|                                p|                                 |                                rp |            5  |                                }ddd           n# 1 swxY w Y   t          t          |	          |          V  |	dz  }	|                                p|sr|                                r` |            5  |                                }ddd           n# 1 swxY w Y   t          t          |	          |          V  |	dz  }	dS dS dS )	a  Given an iterator over blocks, returns an iterator over blocks
    of the appropriate bacth size.

    If the shuffling configurations are specified, then the
    output blocks contain shuffled data.

    Args:
        block_iter: An iterator over blocks.
        stats: Dataset stats object used to store block batching time.
        batch_size: Record batch size, or None to let the system pick.
        drop_last: Whether to drop the last batch if it's incomplete.
        shuffle_buffer_min_size: If non-None, the data will be randomly shuffled
            using a local in-memory shuffle buffer, and this value will serve as the
            minimum number of rows that must be in the local in-memory shuffle buffer in
            order to yield a batch.
        shuffle_seed: The seed to use for the local random shuffle.
        ensure_copy: Whether batches are always copied from the underlying base
            blocks (not zero-copy views).

    Returns:
        An iterator over blocks of the given size that are potentially shuffled.
    N)r\   r^   r_   )r\   r`   c                  V     r j                                         nt                      S r%   )iter_next_batch_srJ   r   )r=   s   r!   get_iter_next_batch_s_timerz6blocks_to_batches.<locals>.get_iter_next_batch_s_timer   s&    27Ju&,,...[]]Jr#   r   )	batch_idxmetadatar-   r)   )	r   r   add	has_batch
next_batchr   r   done_addinghas_any)r[   r=   r\   r]   r^   r_   r`   batcherrd   global_counterrY   batchs    `          r!   blocks_to_batchesrp   p   s     > *"!$;%
 
 
 Z[IIIK K K K K N    E!! 	 ,,.. - -**,,- - - - - - - - - - - - - - -!H!H!HuUUUUUUaN	 !! 	   



 ((** 	) 	)&&((E	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	)]^DDD5QQQQQQ!	 



   ** ((** 	) 	)&&((E	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	) 	)]^DDD5QQQQQQ!	   s6   (B		B	B	D""D&)D&F((F,/F,
batch_iterbatch_formatc              #     K   | D ]}|r|j                                         nt                      5  t          j        |j                                      |          }ddd           n# 1 swxY w Y   t          j        ||          V  dS )aC  Given an iterator of blocks, returns an iterator of formatted batches.

    Args:
        batch_iter: An iterator over batches.
        batch_format: The batch format to use.
        stats: An optional stats object to record formatting times.

    Returns:
        An iterator over batch index and the formatted batch.
    Nr-   )	iter_format_batch_srJ   r   r   	for_blockr-   to_batch_formatdataclassesreplace)rq   rr   r=   ro   formatted_batchs        r!   format_batchesr{      s        ? ?27JU&,,...[]] 	 	+5ejAAQQ O	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 !%o>>>>>>>? ?s   -A))A-	0A-	
collate_fnc              #      K   | D ]k}|r|j                                         nt                      5   ||j                  }ddd           n# 1 swxY w Y   t	          |j        |          V  ldS )a  Returns an iterator with the provided collate_fn applied to items of the batch
    iterator.

    Args:
        batch_iter: An iterator over formatted batches.
        collate_fn: A function to apply to each batch.
        stats: An optional stats object to record formatting times.
    Nrf   )iter_collate_batch_srJ   r   r-   r   rg   )rq   r|   r=   ro   collated_batchs        r!   collater      s        J J38KU'--///kmm 	4 	4'Z
33N	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4 	4U^.IIIIIIIJ J   AA	A	finalize_fnc              #      K   | D ]k}|r|j                                         nt                      5   ||j                  }ddd           n# 1 swxY w Y   t	          j        ||          V  ldS )a  Returns an iterator with the provided finalize_fn applied to items of the batch
    iterator.

    This is the same as `collate` except the input batches can be of type Any.

    Args:
        batch_iter: An iterator over processed batches.
        finalize_fn: A function to apply to each batch.
        stats: An optional stats object to record formatting times.

    Returns:
        An iterator over batch index and the finalized batch.
    Nrt   )iter_finalize_batch_srJ   r   r-   rx   ry   )rq   r   r=   ro   finalized_batchs        r!   finalize_batchesr      s      $  ? ?49LU(..000{}} 	6 	6)k%*55O	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6 	6!%o>>>>>>>? ?r   c              #   &   K   | D ]}|j         V  d S r%   rt   )rq   ro   s     r!   extract_data_from_batchr      s0        j r#   zray.datasetc                   Z    e Zd ZdZd Zd Zdeee                  fdZ	de
fdZd Zd	 Zd
S )WaitBlockPrefetcherz Block prefetcher using ray.wait.c                     g | _         d| _        d| _        t          j                    | _        t          j        | j        dd          | _        | j        	                                 d S )NFr   
PrefetcherT)targetnamedaemon)
_blocks_stopped_last_prefetch_size	threading	Condition
_conditionThread_run_threadstartselfs    r!   __init__zWaitBlockPrefetcher.__init__  sh    #$ #-// '9
 
 

 	r#   c                    | j         s	 | j        5  t          | j                  dk    r| j                                         | j        d d          g c}| _        d d d            n# 1 swxY w Y   t          |          dk    rt          j        |ddd           n*# t          $ r t                              d           Y nw xY w| j         t          	                    d           d S )Nr   r)   T)num_returnstimeoutfetch_localzError in prefetcher thread.z&Exiting prefetcher's background thread)
r   r   r4   r   waitr*   	Exceptionlogger	exceptiondebug)r   blocks_to_fetchs     r!   r   zWaitBlockPrefetcher._run  se   - 	@@_ H H4<((A-- ,,...48LOR1OT\H H H H H H H H H H H H H H H ''!++H'$% !"$(     @ @ @  !>?????@' - 	@, 	=>>>>>s5   B A	A%B %A))B ,A)-.B $CCrP   c                     | j         5  | j        rt          d          || _        t	          |          | _        | j                                          d d d            d S # 1 swxY w Y   d S )NzPrefetcher is stopped.)r   r   RuntimeErrorr   r4   r   notifyr   rP   s     r!   prefetch_blocksz#WaitBlockPrefetcher.prefetch_blocks)  s    _ 	% 	%} ="#;<<<!DL'*6{{D$O""$$$	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	%s   AA  A$'A$r   c                 R    | j         5  | j        cd d d            S # 1 swxY w Y   d S r%   )r   r   r   s    r!   num_prefetched_blocksz)WaitBlockPrefetcher.num_prefetched_blocks1  st    _ 	, 	,+	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	, 	,s     c                     | j         5  | j        r	 d d d            d S d| _        | j                                          d d d            d S # 1 swxY w Y   d S )NT)r   r   r   r   s    r!   stopzWaitBlockPrefetcher.stop5  s    _ 	% 	%} 	% 	% 	% 	% 	% 	% 	% 	% !DMO""$$$		% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	% 	%s   	A AAAc                 .    |                                   d S r%   )r   r   s    r!   __del__zWaitBlockPrefetcher.__del__<  s    		r#   N)__name__
__module____qualname____doc__r   r   r   r   r   r   rS   r   r   r   r   r#   r!   r   r     s        **
 
 
? ? ?2%d9U+;&< % % % %,s , , , ,% % %    r#   r   c                   `    e Zd ZdZd Zed
d            Zdeee	                  fdZ
defdZd	S )ActorBlockPrefetcherz%Block prefetcher using a local actor.c                 F    |                                  | _        d| _        d S )Nr   )_get_or_create_actor_prefetcherprefetch_actorr   r   s    r!   r   zActorBlockPrefetcher.__init__C  s$    "BBDD#$   r#   r   r   c                      t          j                                                    } d|  }t                              t          | d          |t          d                                          S )Nzdataset-block-prefetcher-F)softT)scheduling_strategyr   	namespaceget_if_exists)r*   r+   r,   _BlockPretcheroptionsr   PREFETCHER_ACTOR_NAMESPACEremote)node_id
actor_names     r!   r   z4ActorBlockPrefetcher._get_or_create_actor_prefetcherG  sj    )++7799:::
%% >wU S S S0	 & 
 

 &((	r#   rP   c                 V    t          |          | _         | j        j        j        |  d S r%   )r4   r   r   prefetchr   r   s     r!   r   z$ActorBlockPrefetcher.prefetch_blocksR  s,    #&v;; +$+V4444r#   c                     | j         S r%   )r   r   s    r!   r   z*ActorBlockPrefetcher.num_prefetched_blocksV  s    ''r#   N)r   r   )r   r   r   r   r   staticmethodr   r   r   r   r   rS   r   r   r#   r!   r   r   @  s        //% % %    \5d9U+;&< 5 5 5 5(s ( ( ( ( ( (r#   r   )num_cpusc                       e Zd ZdZddZdS )r   z3Helper actor that prefetches blocks asynchronously.r   Nc                     d S r%   r   r   s     r!   r   z_BlockPretcher.prefetch^  s    r#   )r   N)r   r   r   r   r   r   r#   r!   r   r   Z  s.        ==     r#   r   )NN)NNFNNFr%   )6rx   loggingr   
contextlibr   typingr   r   r   r   r   r	   r
   r*   	ray.actorr   ray.data._internal.batcherr   r   ,ray.data._internal.block_batching.interfacesr   r   r   r   ray.data._internal.statsr   ray.data.blockr   r   r   ray.data.contextr   	ray.typesr   ray.util.scheduling_strategiesr   	getLoggerr   r   rS   r;   rZ   boolrp   strr{   r   r   r   r   r   r   r   r   r   r#   r!   <module>r      s            " " " " " " H H H H H H H H H H H H H H H H H H 



 ! ! ! ! ! ! @ @ @ @ @ @ @ @            2 1 1 1 1 1 : : : : : : : : : : ( ( ( ( ( (       I I I I I I		8	$	$d9S>2 uS#s]7K    , %)BF	A/ A/Yu-.A/	A/ L!A/ !sHRW,='=!>?	A/
 e_A/ A/ A/ A/L %) $-1"&D DDL!D D 	D
 &c]D 3-D D e_D D D DT %)? ??3-? L!? e_	? ? ? ?4 %)J JJ9+s"234J L!J m	J J J J, %)? ?'?3%*%? L!? m	? ? ? ?0 HSM    
 + < < < < </ < < <~( ( ( ( (? ( ( (4 Q         r#   