
    &`iA                     6   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ ededee         fd	            Zeded
edefd            Z G d d          Z G d de          Z G d de          Z G d d          Zdee         fdZdS )    N)deque)AnyCallableDequeDictListOptionalUnion)Dataset)AggregateFnV2)DeveloperAPIvaluereturnc                 ,    |                      d          S )z*Tokenize a string using a split on spaces. )split)r   s    p/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/preprocessors/utils.pysimple_split_tokenizerr   
   s     ;;s    num_featuresc                     t          |                                           }t          j        |          }t	          |                                d          }||z  S )z6Deterministically hash a value into the integer space.   )strencodehashlibsha1int	hexdigest)r   r   encoded_valuehashed_valuehashed_value_ints        r   simple_hashr"      sQ     JJ%%''M<..L<1133R88l**r   c            	       P    e Zd ZdZd ddeeef         dedeegef         fdZdS )	BaseStatSpeczEEncapsulates a statistical computation with optional post-processing.c                     | S N xs    r   <lambda>zBaseStatSpec.<lambda>        a r   post_process_fnstat_fnr-   post_key_fnc                0    || _         || _        || _        d S r&   r.   r-   r/   )selfr.   r-   r/   s       r   __init__zBaseStatSpec.__init__   s!     .&r   N)	__name__
__module____qualname____doc__r
   r   r   r   r3   r'   r   r   r$   r$      sr        OO %0K		' 	' 	' }h./	' "		'
 seSj)	' 	' 	' 	' 	' 	'r   r$   c            
       ~     e Zd ZdZd dddeeeegef         f         dedeegef         dee         f fd	Z	 xZ
S )
AggregateStatSpecz5Represents an AggregateFnV2 spec for a single column.c                     | S r&   r'   r(   s    r   r*   zAggregateStatSpec.<lambda>/   r+   r   N)r-   columnaggregator_fnr-   r/   r;   c                ^    t                                          |||           || _        d S Nr1   )superr3   r;   )r2   r<   r-   r/   r;   	__class__s        r   r3   zAggregateStatSpec.__init__+   s<     	!+# 	 	
 	
 	

 r   )r4   r5   r6   r7   r
   r   r   r   r	   r3   __classcell__r@   s   @r   r9   r9   (   s        ?? %0K $   ]HcUM5I,JJK "	
 seSj)          r   r9   c                        e Zd ZdZd ddedeeegef                  deeegef                  dedee         f
 fd	Z xZ	S )
CallableStatSpeczPRepresents a user-defined stat function that operates outside Dataset.aggregate.c                     | S r&   r'   r(   s    r   r*   zCallableStatSpec.<lambda>D   r+   r   r,   r.   stat_key_fnr/   r-   columnsc                l    t                                          |||           || _        || _        d S r>   )r?   r3   rG   rF   )r2   r.   rF   r/   r-   rG   r@   s         r   r3   zCallableStatSpec.__init__>   sD     	_+ 	 	
 	
 	
 &r   )
r4   r5   r6   r7   r   r	   r   r   r3   rA   rB   s   @r   rD   rD   ;   s        ZZ %0K' ' ' ' hucz23	'
 hucz23' "' c' ' ' ' ' ' ' ' ' 'r   rD   c                   j   e Zd ZdZd Zd Zd dddeegef         ded	e	eegef                  d
e
e         ddf
dZd dddeg ef         dedeegef         d	e	eegef                  d
e
e         ddfdZdedeeef         fdZde
e         fdZde
e         fdZde
e         fdZd Zd ZdS )StatComputationPlana\  
    Encapsulates a set of aggregators (AggregateFnV2) and legacy stat functions
    to compute statistics over a Ray dataset.

    Supports two types of aggregations:
    1. AggregateFnV2-based aggregators, which are batch-executed using `Dataset.aggregate(...)`.
    2. Callable-based stat functions, executed sequentially (legacy use case).
    c                 ,    t                      | _        d S r&   )r   _aggregatorsr2   s    r   r3   zStatComputationPlan.__init__X   s    16r   c                 8    | j                                          d S r&   )rL   clearrM   s    r   resetzStatComputationPlan.reset[   s    !!!!!r   c                     | S r&   r'   r(   s    r   r*   zStatComputationPlan.<lambda>b   r+   r   N)r-   r/   r<   r-   r/   rG   r   c          	      |    |D ]8} ||          }| j                             t          ||||                     9dS )a  
        Registers an AggregateFnV2 factory for one or more columns.

        Args:
            aggregator_fn: A callable (typically a lambda or class) that accepts a column name and returns an instance of AggregateFnV2.
            post_process_fn: Function to post-process the aggregated result.
            post_key_fn: Optional key generator to use to save aggregation results after post-processing.
            columns: List of column names to aggregate.
        )r<   r-   r/   r;   N)rL   appendr9   )r2   r<   r-   r/   rG   r;   agg_instances          r   add_aggregatorz"StatComputationPlan.add_aggregator^   sj    "  		 		F(=00L$$!".$3 +!	     		 		r   c                     | S r&   r'   r(   s    r   r*   zStatComputationPlan.<lambda>~   r+   r   r.   rF   c          	      b    | j                             t          |||||p|                     dS )a  
        Registers a custom stat function to be run sequentially.

        This supports legacy use cases where arbitrary callables are needed
        and cannot be run via Dataset.aggregate().

        Args:
            stat_fn: A zero-argument callable that returns the stat.
            post_process_fn: Function to apply to the result.
            stat_key_fn:
            post_key_fn:
            columns:
        )r.   r-   rG   rF   r/   N)rL   rS   rD   )r2   r.   r-   rF   r/   rG   s         r   add_callable_statz%StatComputationPlan.add_callable_statz   sO    , 	   /''6;  	
 	
 	
 	
 	
r   datasetc                    i }|                                  }|rn |j        | }|                                 D ]O}|j        j        }|j        |                    |j                  n|}|                    ||                   ||<   P|                                 D ]n}|                    |j	                  }|j
        D ]J}	|	                    |	          }|                    |	          }|                    ||                   ||<   Ko|S )aq  
        Executes all registered aggregators and stat functions.

        AggregateFnV2-based aggregators are batched and executed via Dataset.aggregate().
        Callable-based stat functions are run sequentially.

        Args:
            dataset: The Ray Dataset to compute statistics on.

        Returns:
            A dictionary of computed statistics.
        )_get_aggregate_fn_list	aggregate_get_aggregate_specsr.   namer/   r;   r-   _get_custom_stat_fn_specsrF   rG   )
r2   rY   statsaggregators
raw_resultspecstat_keypost_keyresultcols
             r   computezStatComputationPlan.compute   s;    1133 		M**K8J1133 M M<, '3 $$T[111! 
 #'"6"6z(7K"L"Lh 2244 	I 	ID\\$"233F| I I++C00++C00"&"6"6vh7G"H"HhI
 r   c                 $    d | j         D             S )Nc                 F    g | ]}t          |t                    |j        S r'   )
isinstancer9   r.   .0rc   s     r   
<listcomp>z>StatComputationPlan._get_aggregate_fn_list.<locals>.<listcomp>   s;     
 
 
$ 122
L
 
 
r   rL   rM   s    r   r[   z*StatComputationPlan._get_aggregate_fn_list   s&    
 
)
 
 
 	
r   c                 $    d | j         D             S )Nc                 <    g | ]}t          |t                    |S r'   )rk   r9   rl   s     r   rn   z<StatComputationPlan._get_aggregate_specs.<locals>.<listcomp>   s8     
 
 
*TCT2U2U

 
 
r   ro   rM   s    r   r]   z(StatComputationPlan._get_aggregate_specs   &    
 
!.
 
 
 	
r   c                 $    d | j         D             S )Nc                 <    g | ]}t          |t                    |S r'   )rk   rD   rl   s     r   rn   zAStatComputationPlan._get_custom_stat_fn_specs.<locals>.<listcomp>   s8     
 
 
*TCS2T2T

 
 
r   ro   rM   s    r   r_   z-StatComputationPlan._get_custom_stat_fn_specs   rr   r   c                 L    t          |                                           dk    S )Nr   )lenr_   rM   s    r   has_custom_stat_fnz&StatComputationPlan.has_custom_stat_fn   s!    4113344q88r   c                 D    t          |                                           S )z4
        Iterates over all AggregatorSpecs.
        )iterr]   rM   s    r   __iter__zStatComputationPlan.__iter__   s     D--//000r   )r4   r5   r6   r7   r3   rP   r   r   r   r	   r   rU   r   rX   r   r   rh   r[   r9   r]   rD   r_   rw   rz   r'   r   r   rJ   rJ   N   s        9 9 9" " " %0K6:    } 45 "	
 hucz23 c 
   @ %0K6:
 
 
 "c'"
 "	

 seSj)
 hucz23
 c
 

 
 
 
@#w #4S> # # # #J
](; 
 
 
 

d+<&= 
 
 
 


40@+A 
 
 
 

9 9 91 1 1 1 1r   rJ   	callbacksc                       fd}|S )z
    Wraps a base post-processing function with a sequence of callback functions.
    Useful when multiple post-processing steps need to be applied in order.
    c                 >     |           }D ]} ||          }|S r&   r'   )rf   	processedcbbase_fnr{   s      r   wrapperz$make_post_processor.<locals>.wrapper   s4    GFOO	 	& 	&B9IIr   r'   )r   r{   r   s   `` r   make_post_processorr      s)          Nr   )r   collectionsr   typingr   r   r   r   r   r	   r
   ray.datar   ray.data.aggregater   ray.util.annotationsr   r   r   objectr   r"   r$   r9   rD   rJ   r   r'   r   r   <module>r      s          D D D D D D D D D D D D D D D D D D       , , , , , , - - - - - - # $s)    
 +v +S +S + + + +' ' ' ' ' ' ' '       &' ' ' ' '| ' ' '&I1 I1 I1 I1 I1 I1 I1 I1XDN      r   