
    &`id                     R   d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
 d dlZd dlZd dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( erd dl)m*Z* d dl+m,Z, dgZ- ej.        e/          Z0 G d d          Z1dS )    N)TYPE_CHECKINGIteratorListOptionalTupleTypeUnion)get_memory_info_replyget_state_from_address)	RefBundle)SourceOperator)LogicalOperator)LogicalPlan)Operator)Read)get_plan_conversion_fns)DatasetStats)BlockMetadataWithSchema_take_first_non_empty_schema)DataContext)omit_traceback_stdout)log_onceStreamingExecutor)Datasetscheduling_strategyc                   
   e Zd ZdZdedefdZdefdZd/dZ	defd	Z
defd
Ze	 	 	 	 d0dededededef
d            Zded         defdZd1dZd2dZd2dZdee         fdZ	 d3dedeedf         fd Zd!eedf         fd"Zdeee                  fd#Zdee         fd$Zede e!e"         eed         f         fd%            Z#e	 d3d&ede"fd'            Z$e%defd(            Z&d4d*Z'defd+Z(defd,Z)defd-Z*defd.Z+d)S )5ExecutionPlana  A lazy execution plan for a Dataset.

    This lazy execution plan builds up a chain of ``List[RefBundle]`` -->
    ``List[RefBundle]`` operators. Prior to execution, we apply a set of logical
    plan optimizations, such as operator fusion, in order to reduce Ray task
    overhead and data copies.

    Internally, the execution plan holds a snapshot of a computed list of
    blocks and their associated metadata under ``self._snapshot_bundle``,
    where this snapshot is the cached output of executing the operator chain.statsdata_contextc                     || _         d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        d| _	        || _
        dS )zCreate a plan with no transformation operators.

        Args:
            stats: Stats for the base blocks.
            data_context: :class:`~ray.data.context.DataContext`
                object to use for execution.
        NF)	_in_stats_snapshot_operator_snapshot_stats_snapshot_bundle_snapshot_metadata_schema_schema_dataset_uuid
_run_index_dataset_name_has_started_execution_context)selfr   r    s      k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/plan.py__init__zExecutionPlan.__init__1   se      >B# $ OS& !!&+#$    returnc                 8    | j         pd d| j         d| j         S )ziUnique ID of the dataset, including the dataset name,
        UUID, and current execution index.
        dataset_)r+   r)   r*   r.   s    r/   get_dataset_idzExecutionPlan.get_dataset_id[   s/    
 !.YWW1CWWdoWW	
r1   r   c                 x    ddl m} | xj        dz  c_         || j        |                                           }|S )z!Create an executor for this plan.r   r      )/ray.data._internal.execution.streaming_executorr   r*   r-   r7   )r.   r   executors      r/   create_executorzExecutionPlan.create_executorc   sI    UUUUUU1$$T]D4G4G4I4IJJr1   c                 (    d| j          d| j         dS )NzExecutionPlan(dataset_uuid=z, snapshot_operator=))r)   r$   r6   s    r/   __repr__zExecutionPlan.__repr__k   s1     . !%!8  	
r1   c                 .   d gt                      z   }g d}| j        }g }t          ||          D ]P\  }} ||          }|                     |j        d          \  }}d| d}	|	 | }
|                    |
           Qd                    |          S )z@Return a string representation of the logical and physical plan.c                     | S N )xs    r/   <lambda>z'ExecutionPlan.explain.<locals>.<lambda>v   s     r1   )zLogical PlanzLogical Plan (Optimized)zPhysical PlanzPhysical Plan (Optimized)T)show_op_reprz

-------- z
 --------
 )r   _logical_planzipgenerate_plan_stringdagappendjoin)r.   convert_fnstitlesplansectionstitle
convert_fnplan_strr5   bannersections              r/   explainzExecutionPlan.explains   s     #{m&=&?&??
 
 
 !!$V[!9!9 
	% 
	%E: :d##D 33DH43PPKHa55555F+++GOOG$$$$wwx   r1   rG   r   TFopcurr_strdepthincluding_sourcerF   c                 :   |st          | t                    r||fS |}|rt          |           n| j        }|dk    r	|| dz  }nd|dz
  dz  z  }|| d| dz  }| j        D ]6}t
                              |||dz   ||          \  }}	t          ||	          }7||fS )zXTraverse (DFS) the Plan DAG and
        return a string representation of the operators.r   
 r9      +- )
isinstancer   reprnameinput_dependenciesr   rJ   max)
rX   rY   rZ   r[   rF   curr_max_depthop_strtrailing_spaceinputinput_max_depths
             r/   rJ   z"ExecutionPlan.generate_plan_string   s       	#Jr>$B$B 	#U?" *6brwA::6%HH UQY!O4N>88f8888H* 	B 	BE(5(J(Jx,<l) )%Ho !AANN''r1   dataset_clsr   c                    ddl m} d}d}|                                 ss|                     | j        j        d          \  }}| j        '| j        j        }| j                                        }nP| j	        | j	        j        }| j	        j
        j        }n*d}| j        j        }t          |t                    s=t          |j                  dk    rd}n"|j        d         }t          |t                    =|rd}d}nt          |t                    s
J |            t          t!          i d	          | j                  }	|	                    t'          ||	j                             |	                                }|	                                }n/|                     d
          }| j                                        }|d}
nt          |t*                    rt-          |          }
npg }
t/          |j        |j                  D ]6\  }}t5          |d          r|j        }|
                    | d|            7d                    |
          }
d|
z   dz   }
|d}d}||k    r|                                 }|J | j        d                     | j                  nd}|rd| dnd}d                     |j        ||||
          }d}d}d}||z  }t          |          |k    r:| | d|
 }t          |          |k    rg }
t/          |j        |j                  D ]\  }}t5          |d          r|j        }| |dz   | d| }t          |          |k    r@dt-          |           }tC          |t          |          z
  |          }|d|          | }|
                    |           d                    |
          }
d|
z   d| | z   dz   }
| j        d| | d| j         dnd}|rd| | d| dnd}|j         d | | d| | d!| d| | d|
 d| d"}|dk    r||z  }n|||dz
  z   d#| z  }|S )$zCreate a cosmetic string representation of this execution plan.

        Returns:
            The string representation of this execution plan.
        r   )MaterializedDatasetrG   F)r[   Nr9   Tmetadataparent)fetch_if_missingzUnknown schema__name__z: z, {}?z	name={}, znum_blocks=z{}({}{}num_rows={}, schema={})P   
   z   zschema=   z...: z,
z{
r]   zname=,(z	num_rows=r>   r`   )"ray.data.datasetrm   has_computed_outputrJ   rH   rK   r&   schemanum_rowsr'   ro   ra   r   lenrd   r   r   r-   link_logical_planr   
meta_counttypestrrI   namestypeshasattrrr   rL   rM   initial_num_blocksr+   formatre   )r.   rk   rm   rT   plan_max_depthr}   counthas_n_ary_operatorrK   rP   
schema_strnt
num_blocksname_strnum_blocks_strdataset_strSCHEMA_LINE_CHAR_LIMITMIN_FIELD_LENGTH
INDENT_STRrh   schema_str_on_new_linecol_strshortened_suffixchars_left_for_col_names                            r/   get_plan_as_stringz ExecutionPlan.get_plan_as_string   s    	988888 '')) )	5'+'@'@"& (A ( ($Hn $0.5-6688/;7>6?H &+"(,$S.99 43122Q66-1*03C %S.99 4 & .!F EE%c>::??C??:($b>>> D **;sDM+J+JKKK![[]]F OO--EE [[%[88F)2244E>)JJ%% 		0VJJJFL&,77 / /11j)) #
A!!Q++!++....:..Jz)C/J=E
---0022J))) !- t1222 	
 :DK5z55556== 
 
 "$
#n4{444
 )7%W
%W%W:%W%W")**-CCC  
fl;; / /DAqq*-- 'J!/IaIIIaIIG 7||&<<<+;3q66+;+;( 362S9I5J5JJ,3 3/
  ''?(?'?@TBRTT   %%g...."ZZ
33
J&)Jn)Jj)J)JJSP 
 %1 L^KZKKd6HKKKK  I^IZIIJIIII  ' ' ''!' ' $' &0' ' ;@' ' $	' &0	' ' 9C	' '
 $' ' '  Q#HH:!);<NNNNNHr1   logical_planr   c                 6    || _         | j        | j         _        dS )zLink the logical plan into this execution plan.

        This is used for triggering execution for optimizer code path in this legacy
        execution plan.
        N)rH   r-   )r.   r   s     r/   r   zExecutionPlan.link_logical_planN  s     *&*m###r1   c                     t          | j        | j                  }| j        $| j        |_        | j        |_        | j        |_        | j        |_        |S )zCreate a shallow copy of this execution plan.

        This copy can be executed without mutating the original, but clearing the copy
        will also clear the original.

        Returns:
            A shallow copy of this execution plan.
        r    )r   r#   r-   r&   r$   r%   r+   r.   	plan_copys     r/   copyzExecutionPlan.copyW  s_     "N
 
 
	  ,)-)>I&+/+BI((,(<I%"&"4	r1   c                 ^   t          t          j        | j                  | j                                                  }| j        rZt          j        | j                  |_        t          j        | j                  |_        t          j        | j                  |_        | j        |_        |S )zCreate a deep copy of this execution plan.

        This copy can be executed AND cleared without mutating the original.

        Returns:
            A deep copy of this execution plan.
        r   )r   r   r#   r-   r&   r$   r%   r+   r   s     r/   	deep_copyzExecutionPlan.deep_copyl  s     "Idn%%++--
 
 
	   	H)-43H)I)II&+/9T5L+M+MI((,	$2F(G(GI%"&"4	r1   c                 >    | j         j                                        S )zGet the estimated number of blocks from the logical plan
        after applying execution plan optimizations, but prior to
        fully executing the dataset.)rH   rK   estimated_num_outputsr6   s    r/   r   z ExecutionPlan.initial_num_blocks  s     !%;;===r1   rq   zpyarrow.lib.Schemac                 v   | j         | j         S d}|                                 r| j        j        }nm| j        j                                        }|M|rK|                                 \  }}}|5  t          d |D                       }ddd           n# 1 swxY w Y   | 	                    |           | j         S )aK  Get the schema after applying all execution plan optimizations,
        but prior to fully executing the dataset
        (unless `fetch_if_missing` is set to True).

        Args:
            fetch_if_missing: Whether to execute the plan to fetch the schema.

        Returns:
            The schema of the output dataset.
        Nc              3   $   K   | ]}|j         V  d S rB   r}   .0bundles     r/   	<genexpr>z'ExecutionPlan.schema.<locals>.<genexpr>  s5       : :*0: : : : : :r1   )
r(   r|   r&   r}   rH   rK   infer_schemaexecute_to_iteratorr   cache_schema)r.   rq   r}   iter_ref_bundlesr5   r;   s         r/   r}   zExecutionPlan.schema  s    <#<##%% 	*1FF'+88::F~"2~ 150H0H0J0J- !X  9 : :4D: : :  F               	&!!!|s   .BBBr}   c                     || _         d S rB   )r(   )r.   r}   s     r/   r   zExecutionPlan.cache_schema  s    r1   c                 H    | j         j                                        j        S )z1Get the input files of the dataset, if available.)rH   rK   infer_metadatainput_filesr6   s    r/   r   zExecutionPlan.input_files  s    !%4466BBr1   c                     | j         j        }|                                 r$t          d | j        j        D                       }n5|                                j        |                                j        }nd}|S )zGet the number of rows after applying all plan optimizations, if possible.

        This method will never trigger any computation.

        Returns:
            The number of records of the result Dataset, or None.
        c              3   $   K   | ]}|j         V  d S rB   )r~   )r   ms     r/   r   z+ExecutionPlan.meta_count.<locals>.<genexpr>  s$      NN!1:NNNNNNr1   N)rH   rK   r|   sumr&   ro   r   r~   )r.   rK   r~   s      r/   r   zExecutionPlan.meta_count  s|      $##%% 	NNt/D/MNNNNNHH!!*6))++4HHHr1   c                    d| _         |                                 r,|                                 }t          |g          | j        dfS ddlm} |                                 } |||           }t          |          }	 t          j	        t          |          g|          }n# t          $ r Y nw xY w|                                | _        || j        |fS )a  Execute this plan, returning an iterator.

        This will use streaming execution to generate outputs.

        NOTE: Executor will be shutdown upon either of the 2 following conditions:

            - Iterator is fully exhausted (ie until StopIteration is raised)
            - Executor instances is garbage-collected

        Returns:
            Tuple of iterator over output RefBundles, DatasetStats, and the executor.
        TNr   )!execute_to_legacy_bundle_iterator)r,   r|   executeiterr%   *ray.data._internal.execution.legacy_compatr   r<   	itertoolschainnextStopIteration	get_stats)r.   r   r   r;   bundle_itergens         r/   r   z!ExecutionPlan.execute_to_iterator  s      '+###%% 	>\\^^F>>4#7==	
 	
 	
 	
 	
 	
 ''))77$GG ;	#/499+s;;KK 	 	 	D	'1133D0(::s   >#B" "
B/.B/preserve_orderc                 T   d| _         | j        }t          j                                        d          s)t          d          rt                              d           |                                 sddl	m
}m} t          | j        j        t                    r| j        j                                        x ||           | j        j                                        }t#          d |D                       }t%          d	 |D                       }t'          d
 |D             ||          }n|                                 5 }	 ||	| | j        |          }
t'          t-          |
                                          |
j        |
                                          }ddd           n# 1 swxY w Y   |	                                                                                    d          }|j        rt                              |           	 t?          tA          t          j!                    j"                            }|j#        j$        dk    rtK          |j#        j&                  _'        |j#        j(        dk    rtK          |j#        j)                  _*        n4# tV          $ r'}t          ,                    d|            Y d}~nd}~ww xY wd_-        fd            || _.        | j        j        | _/        | _0        | j        | j0        _1        | j.        S )zExecutes this plan (eagerly).

        Args:
            preserve_order: Whether to preserve order in execution.

        Returns:
            The blocks of the output dataset.
        TCPUcpu_warninga<  Warning: The Ray cluster currently does not have any available CPUs. The Dataset job will hang unless more CPUs are freed up. A common reason is that cluster resources are used by Actors or Tune trials; see the following link for more details: https://docs.ray.io/en/latest/data/data-internals.html#ray-data-and-tuner   )_get_initial_stats_from_planexecute_to_legacy_block_listNc              3   $   K   | ]}|j         V  d S rB   )owns_blocksr   s     r/   r   z(ExecutionPlan.execute.<locals>.<genexpr>  s%      !R!R&"4!R!R!R!R!R!Rr1   c              3   $   K   | ]}|j         V  d S rB   r   r   s     r/   r   z(ExecutionPlan.execute.<locals>.<genexpr>  s5       6 6&,FM6 6 6 6 6 6r1   c                 0    g | ]}|j         D ]	\  }}||f
S rC   )blocks)r   r   blockro   s       r/   
<listcomp>z)ExecutionPlan.execute.<locals>.<listcomp>  sJ       "/5}  ,E8 )   r1   )r   r}   )dataset_uuidr   F)include_parentzLSkipping recording memory spilled and restored statistics due to exception: c                     xj         | j                            dd          z  c_         | j        D ]} |           d S )Nobj_store_mem_spilledr   )dataset_bytes_spilledextra_metricsgetparents)	cur_statsrp   collect_statsr   s     r/   r   z,ExecutionPlan.execute.<locals>.collect_statsK  sb    ++y/F/J/J+Q0 0 ++ (/ * *F!M&))))* *r1   )2r,   r-   rayavailable_resourcesr   r   loggerwarningr|   r   r   r   ra   rH   rK   r   output_dataallr   r   r<   r)   tupleiter_blocks_with_metadata_owned_by_consumer
get_schemar   
to_summary	to_stringenable_auto_log_statsinfor
   r   get_runtime_contextgcs_addressstore_statsspill_time_total_sintspilled_bytes_totalglobal_bytes_spilledrestore_time_total_srestored_bytes_totalglobal_bytes_restored	Exceptiondebugr   r&   r$   r%   r   )r.   r   contextr   r   output_bundlesr   r}   r   r;   r   stats_summary_stringreplyer   r   s                 @@r/   r   zExecutionPlan.execute  s	    '+# -&((,,U33 		&& _   '')) X	C        4-1>BB.6&*6688D
 54T::
 "&!3!7!C!C!E!E!!R!R>!R!R!RRR5 6 60>6 6 6   # &4  
 !,!   ))++ x99 %)%7'5	  F 'f>>@@AA$*$=%0022  F               !**,,','7'7'9'9'C'C#( (D ( ($ 0 6KK 4555-*3+B+D+D+PQQ  $7!;;14)=2 2E. $9A==25)>3 3E/    &"#& &        +,E'* * * * * * M%    %+D!&*&8&<D##(D 040BD -$$s,   AF//F36F3BJ' '
K1KKc                     | j         S )zBReturn ``True`` if this plan has been partially or fully executed.)r,   r6   s    r/   has_started_executionz#ExecutionPlan.has_started_execution\  s     **r1   Nc                 0    d| _         d| _        d| _        dS )z;Clear the snapshot kept in the plan to the beginning state.N)r&   r$   r%   r6   s    r/   clear_snapshotzExecutionPlan.clear_snapshota  s      $"&#r1   c                 @    | j         st          i d          S | j         S )zqReturn stats for this plan.

        If the plan isn't executed, an empty stats object will be returned.
        Nrn   )r%   r   r6   s    r/   r   zExecutionPlan.statsg  s+    
 # 	:D9999##r1   c                 b    t          d | j                                        D                       S )z/Return whether this plan has lazy input blocks.c              3   @   K   | ]}t          |t                    V  d S rB   )ra   r   )r   rX   s     r/   r   z/ExecutionPlan.has_lazy_input.<locals>.<genexpr>r  s,      OOB:b$''OOOOOOr1   )r   rH   sourcesr6   s    r/   has_lazy_inputzExecutionPlan.has_lazy_inputp  s.    OO$2D2L2L2N2NOOOOOOr1   c                 >    | j         duo| j        | j        j        k    S )ztWhether this plan has a computed snapshot for the final operator, i.e. for
        the output of this plan.
        N)r&   r$   rH   rK   r6   s    r/   r|   z!ExecutionPlan.has_computed_outputt  s+    
 !- B'4+=+AA	
r1   c                     ddl m} ddlm} | j        j                                        D ]}t          |||f          r dS dS )z-Whether this plan requires to preserve order.r   )Sort)ZipTF)8ray.data._internal.logical.operators.all_to_all_operatorr  3ray.data._internal.logical.operators.n_ary_operatorr  rH   rK   post_order_iterra   )r.   r  r  rX   s       r/   require_preserve_orderz$ExecutionPlan.require_preserve_order}  sn    QQQQQQKKKKKK$(88:: 	 	B"sDk** ttur1   )r2   r   )rG   r   TF)r   r   )r2   r   )F)r2   N),rr   
__module____qualname____doc__r   r   r0   r   r7   r<   r?   rW   staticmethodr   r   boolrJ   r   r   r   r   r   r   r   r	   r   r}   r   r   r   r   r   r   r   r   r   r   propertyr   r   r   r  r|   r  rC   r1   r/   r   r   %   so       	Q 	Q(%(% "(% (% (% (%T
 
 
 
 
   
# 
 
 
 
! ! ! ! !:  !%"( ((( ( 	(
 ( ( ( \(>\d9o \# \ \ \ \|4 4 4 4   *   (>HSM > > > > (-  $	t))	*   B5/C)C#D    CXd3i0 C C C CHSM    " #;	x	"L(;N2OO	P#; #; #; #;J   %t% t%t% 
t% t% t% t%l +t + + + X+$ $ $ $$| $ $ $ $P P P P P
T 
 
 
 
      r1   r   )2r   r   loggingtypingr   r   r   r   r   r   r	   pyarrowr   ray._private.internal_apir
   r   'ray.data._internal.execution.interfacesr   %ray.data._internal.logical.interfacesr   6ray.data._internal.logical.interfaces.logical_operatorr   2ray.data._internal.logical.interfaces.logical_planr   .ray.data._internal.logical.interfaces.operatorr   2ray.data._internal.logical.operators.read_operatorr   %ray.data._internal.logical.optimizersr   ray.data._internal.statsr   ray.data.blockr   r   ray.data.contextr   ray.data.exceptionsr   ray.util.debugr   r:   r   r{   r   INHERITABLE_REMOTE_ARGS	getLoggerrr   r   r   rC   r1   r/   <module>r$     s         N N N N N N N N N N N N N N N N N N  



 S S S S S S S S = = = = = = @ @ @ @ @ @ R R R R R R J J J J J J C C C C C C C C C C C C I I I I I I 1 1 1 1 1 1 P P P P P P P P ( ( ( ( ( ( 5 5 5 5 5 5 # # # # # # )      )((((( 11  
	8	$	$`	 `	 `	 `	 `	 `	 `	 `	 `	 `	r1   