
    &`i                         d dl mZmZmZ d dlmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZ 	 ddeeeee         f                  dee         dededee         defdZdS )    )ListOptionalUnion)AllToAllTransformFn	RefBundleTaskContext)AllToAllTransformFnResult)SortAggregateTaskSpec)PullBasedShuffleTaskScheduler)PushBasedShuffleTaskScheduler)SortKeySortTaskSpec)unify_ref_bundles_schema)AggregateFn)DataContextShuffleStrategyNkeyaggsbatch_formatdata_context,_debug_limit_shuffle_execution_to_num_blocksreturnc                      j         t          j        t          j        fv sJ t	                    dk    rt          d          dt          t                   dt          dt          f fd}|S )z[Generate function to aggregate blocks by the specified key column or key
    function.
    r   z+Aggregate requires at least one aggregationrefsctxr   c                    g }g }| D ]6}|                     |j                   |                     |j                   7t          |          dk    r|i fS t	          |           }D ]}|                    |           t          |          }t                    }d}	g }
n0|}	|j        t          j	                 }t          j
        |||	|          }
t          |
|          }j        t          j        k    rt          |          }n=j        t          j        k    rt#          |          }nt%          dj         d          |                    | |	|          S )Nr      )
boundariesr   r   r   zInvalid shuffle strategy '')$_debug_limit_execution_to_num_blocks)extend
block_refsmetadatalenr   	_validater   sub_progress_bar_dictr   !SORT_SAMPLE_SUB_PROGRESS_BAR_NAMEsample_boundariesr
   shuffle_strategyr   SORT_SHUFFLE_PUSH_BASEDr   SORT_SHUFFLE_PULL_BASEDr   
ValueErrorexecute)r   r   blocksr#   
ref_bundleunified_schemaagg_fnnum_mapperssort_keynum_outputsr   
sample_baragg_spec	schedulerr   r   r   r   r   s                 x/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/planner/aggregate.pyfnz!generate_aggregate_fn.<locals>.fn,   s     	1 	1JMM*/000OOJ/0000v;;!B<1$77 	- 	-F^,,,,&kk3<<;KJJ &K2>J &7+z J )!%	
 
 
 (O,SSS5h??II*o.UUU5h??IIM\-JMMM     < ! 
 
 	
    )
r)   r   r+   r*   r$   r,   r   r   r   r	   )r   r   r   r   r   r9   s   ````` r8   generate_aggregate_fnr;      s     (//-    
 4yyA~~FGGG9
9o9
9
 
#9
 9
 9
 9
 9
 9
 9
 9
 9
 9
v Ir:   )N)typingr   r   r   'ray.data._internal.execution.interfacesr   r   r   4ray.data._internal.execution.interfaces.transform_fnr	   7ray.data._internal.planner.exchange.aggregate_task_specr
   Eray.data._internal.planner.exchange.pull_based_shuffle_task_schedulerr   Eray.data._internal.planner.exchange.push_based_shuffle_task_schedulerr   2ray.data._internal.planner.exchange.sort_task_specr   r   ray.data._internal.utilr   ray.data.aggregater   ray.data.contextr   r   strintr;    r:   r8   <module>rI      s   ( ( ( ( ( ( ( ( ( (         
                     U T T T T T T T < < < < < < * * * * * * 9 9 9 9 9 9 9 9 CGM M	%T#Y'	(M
{
M M 	M
 3;3-M M M M M M Mr:   