
    &`i              
       ^   d Z ddlZddlmZmZmZ ddlmZ ddlm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZmZ dZ ej        e          Zde	dedee         fdZde	dedede def
dZ!de	dede dee
ef         fdZ"dedefdZ#dee         defdZ$dededdfdZ%dS )zThis file contains temporary helper functions for legacy plan/executor interaction.

It should be deleted once we fully move to the new executor backend.
    N)IteratorOptionalTuple)	BlockList)ExecutorPhysicalOperator	RefBundle)OutputIterator)Topology)record_operators_usage)ExecutionPlan)DatasetStats)BlockMetadataBlockMetadataWithSchema_take_first_non_empty_schemai executorplanreturnc                     t          | d          \  }}|                     ||          }| j         G fddt                    } ||          S )zExecute a plan with the new executor and return a bundle iterator.

    Args:
        executor: The executor to use.
        plan: The legacy plan to execute.

    Returns:
        The output as a bundle iterator.
    F)preserve_orderinitial_statsc                   T    e Zd ZdZdefdZd
dee         def fdZ	dedefd	Z
dS )@execute_to_legacy_bundle_iterator.<locals>.CacheMetadataIteratora.  Wrapper for `bundle_iterator` above.

        For a given iterator which yields output RefBundles,
        collect the metadata from each output bundle, and yield the
        original RefBundle. Only after the entire iterator is exhausted,
        we cache the resulting metadata to the execution plan.base_iteratorc                 D    || _         t          ddd d           | _        d S )Nr   )num_rows
size_bytesinput_files
exec_stats)_base_iteratorr   _collected_metadata)selfr   s     ~/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/execution/legacy_compat.py__init__zIexecute_to_legacy_bundle_iterator.<locals>.CacheMetadataIterator.__init__>   s5     #0D'4 	( ( (D$$$    Noutput_split_idxr   c                 &   	 | j                             |          }|                     |           |S # t          $ rR t	          t                                                              j        }t          | j	        |          }|_
         w xY w)N)metadataschema)r!   get_next_collect_metadataStopIterationnextreversedvalues_schemar   r"   _snapshot_metadata_schema)r#   r'   bundler*   meta_with_schemar   topologys        r$   r+   zIexecute_to_legacy_bundle_iterator.<locals>.CacheMetadataIterator.get_nextJ   s    ,556FGG&&v...    
 hx'8'899::B#:!5!$ $ $  2B.s
   04 ABr3   c                     | j         xj        |                                z  c_        | j         xj        |                                z  c_        |S )zCollect the metadata from each output bundle and accumulate
            results, so we can access important information, such as
            row count, schema, etc., after iteration completes.)r"   r   r   )r#   r3   s     r$   r,   zRexecute_to_legacy_bundle_iterator.<locals>.CacheMetadataIterator._collect_metadata]   sM     $--1B1BB--$//63D3D3F3FF//Mr&   N)__name__
__module____qualname____doc__r
   r%   r   intr	   r+   r,   )r   r5   s   r$   CacheMetadataIteratorr   6   s        	B 	B
	. 
	 
	 
	 
		 	Xc] 	i 	 	 	 	 	 	 	&	I 	) 	 	 	 	 	 	r&   r=   )_get_execution_dagexecute	_topologyr
   )r   r   dagstatsbundle_iterr=   r5   s    `    @r$   !execute_to_legacy_bundle_iteratorrD      s     $  JC ""3e"<<K#-H- - - - - - - - - - -^ ! ---r&   dataset_uuidr   c                     t          | ||          \  }}|                     ||          }t          |          }t          |                                 |           |S )ag  Execute a plan with the new executor and translate it into a legacy block list.

    Args:
        executor: The executor to use.
        plan: The legacy plan to execute.
        dataset_uuid: UUID of the dataset for this execution.
        preserve_order: Whether to preserve order in execution.

    Returns:
        The output as a legacy block list.
    r   )r>   r?   _bundles_to_block_list_set_stats_uuid_recursive	get_stats)r   r   rE   r   rA   rB   bundles
block_lists           r$   execute_to_legacy_block_listrL   h   si    " $ JC
 s%88G'00Jh0022LAAAr&   c                    ddl m} t          |d          r |j        t	          |j        j                    ||j                  j        }t          |          }|s|                                rd| j        _	        ||fS )z+Get the physical operators DAG from a plan.r   )get_execution_plan_logical_planNT)
%ray.data._internal.logical.optimizersrN   hasattrrO   r   rA   _get_initial_stats_from_planrequire_preserve_order_optionsr   )r   r   r   rN   rA   rB   s         r$   r>   r>      s     IHHHHH t_%% 7$*<*Ht15666 
T/
0
0
4C(..E  04466 0+/(:r&   c                 v    | j         | j        S |                                 rt          i d           S | j        S )N)r)   parent)_snapshot_bundle_snapshot_statshas_lazy_inputr   	_in_stats)r   s    r$   rR   rR      sE    (##  R5555~r&   rJ   c                    g g }}d}t          |           }t          d |D                       }|D ]?}|j        sd}|                    |j                   |                    |j                   @t          ||||          S )NTc              3   $   K   | ]}|j         V  d S r7   )r*   ).0
ref_bundles     r$   	<genexpr>z)_bundles_to_block_list.<locals>.<genexpr>   s6       * *(
* * * * * *r&   F)owned_by_consumerr*   )listr   owns_blocksextend
block_refsr)   r   )rJ   blocksr)   rb   bundle_listr*   r^   s          r$   rG   rG      s    2HFKw--K) * *,7* * *  F " - -
% 	 Kj+,,,
+,,,,VXVTTTTr&   rB   c                 Z    | j         s|| _         | j        pg D ]}t          ||           d S r7   )rE   parentsrH   )rB   rE   rV   s      r$   rH   rH      sH     *)-%2 8 8!&,77778 8r&   )&r;   loggingtypingr   r   r   ray.data._internal.block_listr   'ray.data._internal.execution.interfacesr   r   r	   0ray.data._internal.execution.interfaces.executorr
   5ray.data._internal.execution.streaming_executor_stater   ray.data._internal.logical.utilr   ray.data._internal.planr   ray.data._internal.statsr   ray.data.blockr   r   r   TASK_SIZE_WARN_THRESHOLD_BYTES	getLoggerr8   loggerrD   strboolrL   r>   rR   rG   rH    r&   r$   <module>ry      sg     , , , , , , , , , , 3 3 3 3 3 3         
 L K K K K K J J J J J J B B B B B B 1 1 1 1 1 1 1 1 1 1 1 1          "( 		8	$	$F.F.
F. iF. F. F. F.R
  	
    :
  \)*	   0}     UHY$7 UI U U U U"8\ 8 8 8 8 8 8 8 8r&   