
    &`i                         d dl Z d dlmZmZmZmZ d dlmZmZm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ 	 	 	 ddedee         dee         deeeef                  dee         defdZdS )    N)AnyDictListOptional)AllToAllTransformFn	RefBundleTaskContext)AllToAllTransformFnResult)MapTransformer)PullBasedShuffleTaskScheduler)PushBasedShuffleTaskScheduler)ShuffleTaskSpec)DataContextShuffleStrategy)	INT32_MAXdata_contextseednum_outputsray_remote_args,_debug_limit_shuffle_execution_to_num_blocksreturnc                      nt          j                    t          z  dt          t                   dt
          dt          f fd}|S )z=Generate function to randomly shuffle each records of blocks.Nrefsctxr   c                    t          d | D                       }j        d }r"                    d            fd}j        
t	          j        pj        d|          }j        t          j	        k    r!	t          d          t          |          }nt          |          }|                    | 	p|

          S )Nc              3   >   K   | ]}t          |j                  V  d S N)lenblocks).0rs     }/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/planner/random_shuffle.py	<genexpr>z9generate_random_shuffle_fn.<locals>.fn.<locals>.<genexpr>)   s*      ;;s18}};;;;;;    c                 0                         |           S r   )apply_transform)r   r   map_transformers    r"   upstream_map_fnz?generate_random_shuffle_fn.<locals>.fn.<locals>.upstream_map_fn6   s    &66vsCCCr$   T)target_shuffle_max_block_sizerandom_shufflerandom_seedr(   z:Push-based shuffle doesn't support setting num_blocks yet.)task_ctxmap_ray_remote_argsreduce_ray_remote_args$_debug_limit_execution_to_num_blocks)sumupstream_map_transformeroverride_target_max_block_sizeupstream_map_ray_remote_argsr   target_max_block_size_overridetarget_max_block_sizeshuffle_strategyr   SORT_SHUFFLE_PUSH_BASEDNotImplementedErrorr   r   execute)r   r   num_input_blocksr(   shuffle_spec	schedulerr'   r   r   r   r   r   s    `    @r"   fnz&generate_random_shuffle_fn.<locals>.fn%   s+    ;;d;;;;;
 584P 
	? ::4@@@D D D D D D
 ">O&2Xl6X+
 
 
 (O,SSS&)P   6lCCII5lCCI  ++ /#2< ! 	
 	
 		
r$   )timetime_nsr   r   r   r	   r
   )r   r   r   r   r   r=   s   ````` r"   generate_random_shuffle_fnr@      sz     #44$,..9*DD3
9o3
3
 
#3
 3
 3
 3
 3
 3
 3
 3
 3
 3
j Ir$   )NNN)r>   typingr   r   r   r   'ray.data._internal.execution.interfacesr   r   r	   4ray.data._internal.execution.interfaces.transform_fnr
   6ray.data._internal.execution.operators.map_transformerr   Eray.data._internal.planner.exchange.pull_based_shuffle_task_schedulerr   Eray.data._internal.planner.exchange.push_based_shuffle_task_schedulerr   5ray.data._internal.planner.exchange.shuffle_task_specr   ray.data.contextr   r   ray.util.commonr   intstrr@    r$   r"   <module>rM      s    , , , , , , , , , , , ,         
      R Q Q Q Q Q           R Q Q Q Q Q 9 9 9 9 9 9 9 9 % % % % % % "&04BFB BB
3-B #B d38n-	B
 3;3-B B B B B B Br$   