
    &`i'                       d dl Z d dlZd dlZd dlZd dl mZ d dlmZ d dlmZm	Z	 d dl
mZmZmZmZmZmZmZmZ d dlmZ d dlZd dlZd dlmZ d dlmZ d d	lmZ d d
lmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3m4Z4m5Z5m6Z6 d dl7m8Z8  ej9        e:          Z;dZ<dZ=dZ>ee?ee-         f         Z@deAde?fdZBd/deCdeCde?fdZD G d d          ZE G d d           ZF ejG        d !           G d" d#                      ZHdeeH         fd$ZI G d% d&          ZJ G d' d(          ZKe1e G d) d*                                  ZLe G d+ d,                      ZMe G d- d.                      ZNdS )0    N)defaultdict)contextmanager)	dataclassfields)AnyDictListMappingOptionalSetTupleUnion)uuid4)ActorHandle)	BlockList)DatasetState)RuntimeMetricsHistogram)NODE_UNKNOWNMetricsGroupMetricsTypeNodeMetricsOpRuntimeMetrics)DatasetMetadataTopologyget_dataset_metadata_exporter)capfirst)
BlockStats)DataContext)DeveloperAPI)CounterGauge	HistogramMetric)NodeAffinitySchedulingStrategydatasets_stats_actor_dataset_stats_actorunknownsecondsreturnc                     | dk    r t          t          | d                    dz   S | dk    r#t          t          | dz  d                    dz   S t          t          | dz  dz  d                    dz   S )N      sgMbP?  msus)strround)r(   s    l/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/stats.pyfmtr4   8   sx    {{5!$$%%++	554++,,t3354$.2233d::       lvlspaces_per_indentc                     d|z  | z  S )zReturns a string of spaces which contains `level` indents,
    each indent containing `spaces_per_indent` spaces. For example:
    >>> leveled_indent(2, 3)
    '      '
      )r7   r8   s     r3   leveled_indentr<   A   s     ##s**r5   c                   p    e Zd ZdZd Zedd            ZdeddfdZdefdZ	defd	Z
defd
ZdefdZdS )Timerz8Helper class for tracking accumulated time (in seconds).c                 X    d| _         t          d          | _        d| _        d| _        d S )Nr   inf)_totalfloat_min_max_total_countselfs    r3   __init__zTimer.__init__M   s+     <<		#$r5   r)   Nc              #      K   t          j                    }	 d V  |                     t          j                    |z
             d S # |                     t          j                    |z
             w xY wN)timeperf_counteradd)rG   
time_starts     r3   timerzTimer.timerS   sn      &((
	7EEEHHT&((:566666DHHT&((:56666s   A +A1valuec                     | xj         |z  c_         || j        k     r|| _        || j        k    r|| _        | xj        dz  c_        d S )Nr+   )rA   rC   rD   rE   )rG   rP   s     r3   rM   z	Timer.add[   sT    u49DI49DIQr5   c                     | j         S rJ   )rA   rF   s    r3   getz	Timer.getc   s
    {r5   c                     | j         S rJ   )rC   rF   s    r3   minz	Timer.minf   
    yr5   c                     | j         S rJ   )rD   rF   s    r3   maxz	Timer.maxi   rV   r5   c                 L    | j         r| j        | j         z  nt          d          S )Nr@   )rE   rA   rB   rF   s    r3   avgz	Timer.avgl   s%    262CUt{T...uUr5   r)   N)__name__
__module____qualname____doc__rH   r   rO   rB   rM   rS   rU   rX   rZ   r;   r5   r3   r>   r>   J   s        BB% % % 7 7 7 ^7 4    U    U    U    VU V V V V V Vr5   r>   c                   R    e Zd ZdZdedddee         fdZdeddfd	Z	d
e
ddfdZdS )_DatasetStatsBuilderzHelper class for building dataset stats.

    When this class is created, we record the start time. When build() is
    called with the final blocks of the new dataset, the time delta is
    saved as part of the stats.operator_nameparentDatasetStatsoverride_start_timec                 V    || _         || _        |pt          j                    | _        d S rJ   )rb   rc   rK   rL   
start_time)rG   rb   rc   re   s       r3   rH   z_DatasetStatsBuilder.__init__w   s,     +-D1B1D1Dr5   metadatar)   c                    i }t          |                                          D ]o\  }\  }}t          |          }t          |          dk    r;|dk    r||| j        |z   <   >||| j                            d          d         |z   <   e||| j        <   pt          || j        | j                  }t          j	                    | j
        z
  |_        |S )Nr+   r   z->)rh   rc   	base_name)	enumerateitemsr   lenrb   splitrd   rc   rK   rL   rg   time_total_s)rG   rh   op_metadataikvcapped_kstatss           r3   build_multioperatorz(_DatasetStatsBuilder.build_multioperator   s    "8>>#3#344 	4 	4IAv1{{H8}}q  66ABK 2X =>>QRK 2 8 8 > >r BX MNN23D.// ;(
 
 

 ".004?Br5   final_blocksc                     t          | j        |                                i| j                  }t	          j                    | j        z
  |_        |S )N)rh   rc   )rd   rb   get_metadatarc   rK   rL   rg   rp   )rG   rx   rv   s      r3   buildz_DatasetStatsBuilder.build   sQ    (,*C*C*E*EF;
 
 
 ".004?Br5   N)r\   r]   r^   r_   r1   r   rB   rH   	StatsDictrw   r   r{   r;   r5   r3   ra   ra   p   s        # #EE E &e_	E E E EI .    &)       r5   ra   )num_cpusc                      e Zd ZdZd$dZdedeedf         deee	f         fdZ
deeef         fd	Zdefd
Z	 d%dedeeeeeef         f                  dee         deeef         deeeeeeeef         f         f                  f
dZd&dZ	 	 d'dZdededee         dedef
dZdedeeef         fdZd%dee         fdZdedefdZdedeeef         fd Z	 	 d(ded!ee         d"ee         fd#ZdS ))_StatsActora  Actor holding stats for blocks created by LazyBlockList.

    This actor is shared across all datasets created in the same cluster.
    In order to cap memory usage, we set a max number of stats to keep
    in the actor. When this limit is exceeded, the stats will be garbage
    collected in FIFO order.

    TODO(ekl) we should consider refactoring LazyBlockList so stats can be
    extracted without using an out-of-band actor.r.   c                 	   t          j        t                    | _        i | _        i | _        || _        d| _        i | _        i | _	        t                      | _        i | _        t          j                    | _        d}t          dd|          | _        t          dd|          | _        t          dd	|          | _        t          d
d|          | _        t          dd|          | _        t          dd|          | _        t          dd|          | _        |                     t0          j        |          | _        |                     t0          j        |          | _        |                     t0          j        |          | _        |                     t0          j        |          | _         |                     t0          j!        |          | _"        |                     t0          j#        |          | _$        | %                                | _&        d}t          dd|          | _'        t          dd|          | _(        t          dd|          | _)        t          dd|          | _*        t          dd|          | _+        t          dd|          | _,        t          dd |          | _-        t          d!d"|          | _.        t          d#d$|          | _/        t          d%d&|          | _0        t          d'd(|          | _1        t          d)d*|          | _2        t          d+d,|          | _3        t          d-d.|          | _4        t          d/d0|          | _5        t          d1d2|          | _6        t          d3d4|          | _7        t          d5d6|          | _8        t          d7d8|          | _9        d9}t          d:d;|          | _:        t          d<d=|          | _;        t          d>d?d@<                    dA tz          D                        dB|          | _>        d}t          dCdD|          | _?        t          dEdF|          | _@        t          dGdH|          | _A        t          dIdJd@<                    dK tz          D                        dB|          | _B        d S )LNr   datasetoperatordata_spilled_byteszBytes spilled by dataset operators.
                DataContext.enable_get_object_locations_for_metrics
                must be set to True to report this metricdescriptiontag_keysdata_freed_bytesz Bytes freed by dataset operatorsdata_current_bytesz9Bytes currently in memory store used by dataset operatorsdata_cpu_usage_coresz#CPUs allocated to dataset operatorsdata_gpu_usage_coresz#GPUs allocated to dataset operatorsdata_output_bytesz$Bytes outputted by dataset operatorsdata_output_rowsz#Rows outputted by dataset operators)metrics_groupr   )r   %data_iter_time_to_first_batch_secondszTotal time spent waiting for the first batch after starting iteration. This includes the dataset pipeline warmup time. This metric is accumulated across different epochs. data_iter_block_fetching_secondsz>Seconds taken to fetch (with ray.get) blocks by iter_batches()data_iter_batch_shaping_secondszCSeconds taken to shape batch from incoming blocks by iter_batches()"data_iter_batch_formatting_secondsz1Seconds taken to format batches by iter_batches()!data_iter_batch_collating_secondsz2Seconds taken to collate batches by iter_batches()"data_iter_batch_finalizing_secondsdata_iter_total_blocked_secondsz0Seconds user thread is blocked by iter_batches()data_iter_user_secondszSeconds spent in user codedata_iter_initialize_secondsz-Seconds spent in iterator initialization code!data_iter_get_ref_bundles_secondsz:Seconds spent getting RefBundles from the dataset iteratordata_iter_get_secondsz;Seconds spent in ray.get() while resolving block referencesdata_iter_next_batch_secondsz:Seconds spent getting the next batch from the block bufferdata_iter_format_batch_secondsz"Seconds spent formatting the batchdata_iter_collate_batch_secondsz!Seconds spent collating the batch data_iter_finalize_batch_secondsz"Seconds spent finalizing the batchdata_iter_blocks_localz*Number of blocks already on the local nodedata_iter_blocks_remotez8Number of blocks that require fetching from another nodedata_iter_unknown_locationz,Number of blocks that have unknown locationsdata_iter_prefetched_bytesz2Current bytes of prefetched blocks in the iteratorr   job_idrg   #data_dataset_estimated_total_blocksz&Total work units in blocks for dataset!data_dataset_estimated_total_rowsz$Total work units in rows for datasetdata_dataset_statezState of dataset (z, c                 0    g | ]}|j          d |j         S =rP   name.0r-   s     r3   
<listcomp>z(_StatsActor.__init__.<locals>.<listcomp>  s+    7d7d7dRS178M8MQV8M8M7d7d7dr5   )$data_operator_estimated_total_blocksz'Total work units in blocks for operator"data_operator_estimated_total_rowsz%Total work units in rows for operatordata_operator_queued_blocksz$Number of queued blocks for operatordata_operator_statezState of operator (c                 0    g | ]}|j          d |j         S r   r   r   s     r3   r   z(_StatsActor.__init__.<locals>.<listcomp>  s+    8e8e8eSTAG9N9Naf9N9N8e8e8er5   )Ccollectionsr   dictrh   	last_timerg   	max_statsnext_dataset_iddatasets_ray_nodes_cacher   _metadata_exporterdataset_metadatasdequefinished_datasets_queuer!   spilled_bytesfreed_bytescurrent_bytescpu_usage_coresgpu_usage_coresoutput_bytesoutput_rows0_create_prometheus_metrics_for_execution_metricsr   INPUTSexecution_metrics_inputsOUTPUTSexecution_metrics_outputsTASKSexecution_metrics_tasksOBJECT_STORE_MEMORY"execution_metrics_obj_store_memoryACTORSexecution_metrics_actorsMISCexecution_metrics_misc/_create_prometheus_metrics_for_per_node_metricsper_node_metricstime_to_first_batch_siter_block_fetching_siter_batch_shaping_siter_batch_formatting_siter_batch_collating_siter_batch_finalizing_siter_total_blocked_siter_user_siter_initialize_siter_get_ref_bundles_s
iter_get_siter_next_batch_siter_format_batch_siter_collate_batch_siter_finalize_batch_siter_blocks_localiter_blocks_remoteiter_unknown_locationiter_prefetched_bytesr   r   joinr   r   r   r   r   r   )rG   r   op_tags_keysiter_tag_keysdataset_tagsoperator_tagss         r3   rH   z_StatsActor.__init__   s^   #/55"  !(* 13 #@"A"A=? (3'8':':$
 / # = "
 
 
 !:!
 
 

 # S!
 
 

  %"=! 
  
  

  %"=! 
  
  

 ">!
 
 

 !=!
 
 
 AA*1% B   	% AA*2% B   	& AA*0% B   	$ AA*>% B   	/ AA*1% B   	% AA*/% B   	# !% T T V V$%*3r"	&
 &
 &
" &+.X"&
 &
 &
"
 %*-]"%
 %
 %
!
 (-0K"(
 (
 (
$
 ',/L"'
 '
 '
#
 (-0L"(
 (
 (
$ %*-J"%
 %
 %
!
 !$4"
 
 

 "'*G""
 "
 "

 ',/T"'
 '
 '
#
  #U"
 
 

 "'*T""
 "
 "

 $),<"$
 $
 $
 
 %*-;"%
 %
 %
!
 &+.<"&
 &
 &
"
 "'$D""
 "
 "

 #(%R"#
 #
 #

 &+(F"&
 &
 &
"
 &+(L"&
 &
 &
" ;381@!4
 4
 4
0
 27/>!2
 2
 2
.
 #( hTYY7d7dWc7d7d7d-e-ehhh!#
 #
 #
 0492A"5
 5
 5
1
 380?"3
 3
 3
/
 ,1)>",
 ,
 ,
(
 $)!idii8e8eXd8e8e8e.f.fiii"$
 $
 $
   r5   r   r   .r)   c                    i }t          j                    D ]}|j        |k    sd|j         }|j        }|j        t          j        k    rt          |||          ||j        <   O|j        t          j        k    rt          |f||d|j	        ||j        <   |j        t          j
        k    rt          |||          ||j        <   |S )Ndata_r   )r   get_metricsr   r   r   metrics_typer   r!   r"   metrics_argsr    )rG   r   r   metricsmetricmetric_namemetric_descriptions          r3   r   z<_StatsActor._create_prometheus_metrics_for_execution_metrics  s    &244 	 	F'=88/&+//K!'!3"k&777', 2%( ( ($$
 $(==='0( 2%( ( )	( ($$ $(;;;'. 2%( ( ($
 r5   c                     i }t          t                    D ]'}d|j         d}t          |dd          ||j        <   (|S )Nr   	_per_node )r   node_ipr   )r   r   r   r!   )rG   r   fieldr   s       r3   r   z;_StatsActor._create_prometheus_metrics_for_per_node_metrics  s^    K(( 	 	E7%*777K"'/# # #GEJ
 r5   c                 N    t          | j                  }| xj        dz  c_        |S )z3Generate a unique dataset_id for tracking datasets.r+   )r1   r   )rG   
dataset_ids     r3   gen_dataset_idz_StatsActor.gen_dataset_id  s,    -..
!r5   Ndataset_tag
op_metricsr   stater   c           	         	 ddt           dt          t          t          t          t                   f         dt
          t          t          f         fd}t          ||          D ]\  }}|                     ||          }	| j	        
                    |                    dd          |	           | j        
                    |                    dd          |	           | j        
                    |                    dd          |	           | j        
                    |                    d	d          |	           | j        
                    |                    d
d          |	           | j        
                    |                    dd          |	           | j        
                    |                    dd          |	           | j                                        D ]&\  }
} |||                    |
d          |	           '| j                                        D ]&\  }
} |||                    |
d          |	           '| j                                        D ]&\  }
} |||                    |
d          |	           '| j                                        D ]&\  }
} |||                    |
d          |	           '| j                                        D ]&\  }
} |||                    |
d          |	           '| j                                        D ]&\  }
} |||                    |
d          |	           '||                                D ]\  }}|| j        vr|                                  | j                            |t6                    }|                     ||          }	|                                D ]\  }}| j        |         } ||||	            |                     ||           d S )Nprom_metricrP   tagsc                 B   t          | t                    r|                     ||           d S t          | t                    r|                     ||           d S t          | t
                    r-t          |t                    r|                    | |           d S d S d S rJ   )
isinstancer!   setr    incr"   r   	export_to)r  rP   r  s      r3   _recordz5_StatsActor.update_execution_metrics.<locals>._record  s    
 +u-- 7t,,,,,K11 7t,,,,,K33 7e%<== 7OOK666667 77 7r5   obj_store_mem_spilledr   obj_store_mem_freedobj_store_mem_usedbytes_task_outputs_generatedrow_outputs_taken	cpu_usage	gpu_usage)r   node_ip_tagrJ   )r#   r   intrB   r	   r   r1   zip_create_tagsr   r  rS   r   r   r   r   r   r   r   rm   r   r   r   r   r   r   _rebuild_ray_nodes_cacher   r   update_dataset)rG   r   r   r   r  r   r
  rv   operator_tagr  
field_namer  node_idnode_metricsr   r   metric_values                    r3   update_execution_metricsz$_StatsActor.update_execution_metrics  sI    $(	7 	7	7eT#Y./	7 sCx.	7 	7 	7 	7 $'z=#A#A 	E 	EE<$$[,??D""599-Da#H#H$OOO  +@!!D!DdKKK""599-A1#E#EtLLL!!%)),JA"N"NPTUUU  +>!B!BDIII $$UYY{A%>%>EEE $$UYY{A%>%>EEE+/+H+N+N+P+P E E'
KUYYz1%=%=tDDDD+/+I+O+O+Q+Q E E'
KUYYz1%=%=tDDDD+/+G+M+M+O+O E E'
KUYYz1%=%=tDDDD
 8>>@@E E UYYz1%=%=tDDDD+/+H+N+N+P+P E E'
KUYYz1%=%=tDDDD+/+F+L+L+N+N E E'
KUYYz1%=%=tDDDDE ')9)?)?)A)A = =%$"777 11333/33G\JJ(([g(VV1=1C1C1E1E = =-K"&"7"DKGKt<<<<= 	K/////r5   c                     t          j                    }|D ]<}|                    dd           }|                    dd           }||
|| j        |<   =d S )NNodeIDNodeName)raynodesrS   r   )rG   current_nodesnoder  	node_names        r3   r  z$_StatsActor._rebuild_ray_nodes_cache'  se    	! 	; 	;Dhhx..GT22I"y'<1:%g.		; 	;r5   rv   rd   c                    |                      |          }| j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j	                            |j	                                        |           | j
                            |j
        |           | j                            |j        |           | j                            |j        |           | j                            |j        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j	                                        |           | j                            |j                                        |           | j                            |j                                        |           | j                            |j                                        |           d S rJ   )r  r   r  rS   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   iter_time_to_first_batch_sr   r   )rG   rv   r   r  s       r3   update_iteration_metricsz$_StatsActor.update_iteration_metrics/  s   
   --""5#:#>#>#@#@$GGG#''(D(H(H(J(JDQQQE,0022D999""5#:#>#>#@#@$GGG $$U%>%B%B%D%DdKKK!%%e&@&D&D&F&FMMM"&&u'B'F'F'H'H$OOO""5#:DAAA##E$<dCCC"&&u'BDIII"&&u'BDIII"&&u'7';';'='=tDDD!%%e&=&A&A&C&CTJJJ$(()B)F)F)H)H$OOO#''(B(F(F(H(H$OOO$(()D)H)H)J)JDQQQ"&&u'G'K'K'M'MtTTT!%%e&@&D&D&F&FMMMU.2244d;;;;;r5   r   topologydata_contextc           
      <   t          j                     }|t          j        j        ddd|d d |D             d| j        |<   | j        Ut          |||||d d t          j        j                  | j        |<   | j                            | j        |                    d S d S )Nr   c                 <    i | ]}|t           j        j        d d d dS )r   )r  progresstotalqueued_blocks)r   PENDINGr   )r   r   s     r3   
<dictcomp>z0_StatsActor.register_dataset.<locals>.<dictcomp>^  sF         )16 !%&	   r5   )r   r  r-  r.  
total_rowsrg   end_time	operators)r   r)  r   rg   r*  execution_start_timeexecution_end_timer  )	rK   r   r0  r   r   r   r   r   export_dataset_metadata)rG   r   r   r   r)  r*  rg   s          r3   register_datasetz_StatsActor.register_datasetM  s     Y[[
!).$  !.  &
 &
k"$ ".2A!&%)%)#'"*/	3 	3 	3D";/ #;;&{3     /.r5   c                    | j         |                             |           | j         |         }| j         |                             dd          }t          t	          | j         |                             dd                              }|||d}| j                            |                    dd          |           | j                            |                    dd          |           |                    dt          j	        j
                  }t          j        |          }| j                            |j        |           |                     ||           i }|                    d	i                                           D ]\  }	}
||	d
}| j                            |
                    dd          |           | j                            |
                    dd          |           | j                            |
                    dd          |           |
                    dt          j	        j
                  }t          j        |          }| j                            |j        |           |||	<   |                     ||           |d         t          j        j
        t          j        j
        hv r| j                            |           t3          | j                   | j        k    r~| j        r{| j                                        }| j                             |d            | j                            |d            t3          | j                   | j        k    r| j        sd S d S d S d S d S )Nr   Nonerg   r   r   r.  r2  r  r4  r   r/  )r   updaterS   r1   r  r   r  r   r   UNKNOWNr   from_stringr   rP   update_dataset_metadata_staterm   r   r   r   r   'update_dataset_metadata_operator_statesFINISHEDFAILEDr   appendrn   r   popleftpopr   )rG   r   r  r   rg   r   state_string
state_enumoperator_statesr   op_stater   tag_to_evicts                r3   r  z_StatsActor.update_datasetw  se   k"))%000k*{+//&AAT];7;;L!LLMMNN
 #$
 

 	044IIgq!!<	
 	
 	
 	.22IIlA&&	
 	
 	
 yy,*>*CDD!-l;;
##J$4lCCC**;EEE +-"'))K"<"<"B"B"D"D 	5 	5Hh&$ M 599Wa((-   377\1--}   ,00_a00-  
 $<<1E1JKKL%1,??J$(()9=III(4OH%%44[/RRR >l38,:M:RSSS(//<<<dm$$t~55$:V5#;CCEE!!,555&**<>>> dm$$t~55$:V555 TS555555r5   c                 `    s| j         S fd| j                                         D             S )Nc                 4    i | ]\  }}|d          k    ||S )r   r;   )r   rs   rt   r   s      r3   r1  z,_StatsActor.get_datasets.<locals>.<dictcomp>  s-    PPPA!H+:O:O1:O:O:Or5   )r   rm   )rG   r   s    `r3   get_datasetsz_StatsActor.get_datasets  s;     	!= PPPP!4!4!6!6PPPPr5   r   	new_statec                 
   || j         vrd S t          j                    }| j         |         }|j        |k    rd S t          j        |          }||_        |t
          j        j        k    r||_        nb|t
          j	        j        t
          j
        j        fv r>||_        |j        j        D ]*}|j        t
          j        j        k    r||_        ||_        +|| j         |<   | j        | j                            |dd           d S d S )NFinclude_data_contextinclude_op_args)r   rK   r  copydeepcopyr   RUNNINGr   r5  r@  rA  r6  r)  r4  r   r7  )rG   r   rM  update_timedataset_metadataupdated_dataset_metadatar   s          r3   r>  z)_StatsActor.update_dataset_metadata_state  s&   T333Fikk1*=!Y..F#'=1A#B#B )2 &,111<G$99<05|7J7OPPP:E$74=G > >>\%9%>>>%.HN2=H/-Ez*".#;;(%* % <      /.r5   rG  c                 p   || j         vrd S | j         |         }d}|j        j        D ]%}|j        |v r|j        ||j                 k    rd} n&|sd S t          j        |          }t          j                    }|j        j        D ]}|j        |v rv||j                 }|j        |k    r$||_        |t          j	        j
        k    r||_        H|t          j        j
        t          j        j
        fv r||_        |j        s||_        || j         |<   | j        | j                            |dd           d S d S )NFTrO  )r   r)  r4  idr  rR  rS  rK   r   rT  r   r5  r@  rA  r6  r   r7  )	rG   r   rG  rV  update_neededr   rW  rU  rM  s	            r3   r?  z3_StatsActor.update_dataset_metadata_operator_states  s    T333F1*=(1; 	 	H..Nohk&BBB $ 	F#'=1A#B#B ikk09C 	D 	DH{o--+HK8	>Y..!* 4 9994?H11 ). ',#   3>H/ $8 D8C5-Ez*".#;;(%* % <      /.r5   r  r  c                 *    d|i}|||d<   |||d<   |S )Nr   r   r   r;   )rG   r   r  r  r  s        r3   r  z_StatsActor._create_tags  s2     ;'#+D")DOr5   )r.   rJ   r[   )rv   rd   )NN)r\   r]   r^   r_   rH   r   r   r1   r   r#   r   r!   r   r   r	   r   r  rB   r   r   r  r  r(  r   r   r8  r  rL  r>  r?  r  r;   r5   r3   r   r      s       5 5C
 C
 C
 C
J)5:38_	c6k	   <	c5jAQ 	 	 	 	     OSH0 H0H0 c5e#4456H0 Cy	H0
 CH~H0 #4T#uS%Z7H2H-I(I#JKH0 H0 H0 H0T; ; ; ;<< < < <<(( ( Cy	(
 ( "( ( ( (T7?# 7?d38n 7? 7? 7? 7?rQ Q8C= Q Q Q Q
     6--04S#X- - - -d '+%)	  sm c]	     r5   r   c                     t           j        j        j        t	          d          t           j        j        j        j        } t                              d|             t          t          j	                    
                                d          }t                              t          t          dd|                                          S )	zEach cluster will contain exactly 1 _StatsActor. This function
    returns the current _StatsActor handle, or create a new one if one
    does not exist in the connected cluster. The _StatsActor is pinned on
    on driver process' node.
    NzEGlobal node is not initialized. Driver might be not connected to Ray.z"Stats Actor located on cluster_id=F)softTdetached)r   	namespaceget_if_existslifetimescheduling_strategy)r!  _privateworker_global_nodeRuntimeError
cluster_idloggerdebugr$   get_runtime_contextget_node_idr   optionsSTATS_ACTOR_NAMESTATS_ACTOR_NAMESPACEremote)current_cluster_idrb  s     r3   get_or_create_stats_actorrq    s     |'/S
 
 	
 ,9D
LLJ6HJJKKK 9!!--//  
 '/    fhhr5   c                   :   e Zd ZdZedee         deee	ee	e
eef         f         f                  fd            Zede	dee         dee	         dee	ef         fd            Zed	d
de	fd            Zede	dee	         dedefd            Zede	fd            ZdS )_StatsManagera  A Class containing util functions that manage remote calls to _StatsActor.

    Ray Data updates metrics through the _StatsManager, and direct remote calls
    to the _StatsActor is discouraged. Some functionalities provided by
    _StatsManager:
        - Format and update iteration metrics
        - Format and update execution metrics
        - Aggregate per node metrics
        - Dataset registration
    r   r)   c           
      0   t          j                    j        sdS t          d           }| D ]h}|j                                        D ]L\  }}||         }t          t                    D ]*}||j        xx         t          ||j                  z  cc<   +Mi|S )aR  
        Aggregate per-node metrics from a list of OpRuntimeMetrics objects.

        If per-node metrics are disabled in the current DataContext, returns None.
        Otherwise, it sums up all NodeMetrics fields across the provided metrics and
        returns a nested dictionary mapping each node ID to a dict of field values.
        Nc                  *    t          t                    S rJ   )r   r  r;   r5   r3   <lambda>z;_StatsManager._aggregate_per_node_metrics.<locals>.<lambda>E  s    S1A1A r5   )
r   get_currentenable_per_node_metricsr   _per_node_metricsrm   r   r   r   getattr)r   aggregated_by_noder   r  r  agg_node_metricsfs          r3   _aggregate_per_node_metricsz)_StatsManager._aggregate_per_node_metrics7  s     &((@ 	4()A)ABB! 	N 	NG)0)B)H)H)J)J N N%#5g#> ,, N NA$QV,,,af0M0MM,,,,NN
 "!r5   r   r   r  c                    t                               |          }d |D             }| ||||f}	  t                      j        j        |  d S # t
          $ r*}t                              d| d           Y d }~d S d }~ww xY w)Nc                 6    g | ]}|                                 S r;   )as_dict)r   r   s     r3   r   z:_StatsManager.update_execution_metrics.<locals>.<listcomp>V  s"    FFFFNN,,FFFr5   zKError occurred during update_execution_metrics.remote call to _StatsActor: Texc_info)rs  r~  rq  r  ro  	Exceptionrh  warning)r   r   r   r  r   op_metrics_dictsargses           r3   r  z&_StatsManager.update_execution_metricsN  s     )DDZPPFF:FFF
	G%''@GNNNN 	 	 	NNa^_aa     FFFFF	s   A 
B A;;B rv   rd   c                     | |f}	  t                      j        j        |  d S # t          $ r*}t                              d| d           Y d }~d S d }~ww xY w)NzKError occurred during update_iteration_metrics.remote call to _StatsActor: Tr  )rq  r(  ro  r  rh  r  )rv   r   r  r  s       r3   r(  z&_StatsManager.update_iteration_metricsg  s    {#	G%''@GNNNN 	 	 	NNa^_aa          	s   # 
AAAr)  r*  c                     t                      j                            t          j                                                    | |||           dS )a1  Register a dataset with the stats actor.

        Args:
            dataset_tag: Tag for the dataset
            operator_tags: List of operator tags
            topology: Optional Topology representing the DAG structure to export
            data_context: The DataContext attached to the dataset
        N)rq  r8  ro  r!  rj  
get_job_id)r   r   r)  r*  s       r3   register_dataset_to_stats_actorz-_StatsManager.register_dataset_to_stats_actorr  sS      	"##4;;#%%0022	
 	
 	
 	
 	
r5   c                     	 t                      } t          j        | j                                                  S # t
          $ r:}t                              d|            t                      j	        cY d }~S d }~ww xY w)Nz?Failed to generate dataset_id, falling back to random uuid_v4: )
rq  r!  rS   r   ro  r  rh  r  r   hex)stats_actorr  s     r3   gen_dataset_id_from_stats_actorz-_StatsManager.gen_dataset_id_from_stats_actor  s    
	355K7;5<<>>??? 	 	 	NNURSUU  
 77;	s   8; 
A?/A:4A?:A?N)r\   r]   r^   r_   staticmethodr	   r   r   r
   r1   r   r  rB   r~  r   r   r  r(  r   r   r  r  r;   r5   r3   rs  rs  +  si       	 	 ")*"	'#wsE#u*,='=>>?	@" " " \", )* Cy CH~	   \0  S    \ 

Cy
 
 "	
 
 
 \
. S    \  r5   rs  c                       e Zd ZdZdddedeed          ed          f         defdZ	e
d             Z	 dd	ed
ee         defdZddZdefdZdS )rd   zHolds the execution times for a given Dataset.

    This object contains a reference to the parent Dataset's stats as well,
    but not the Dataset object itself, to allow its blocks to be dropped from
    memory.N)rk   rh   rc   rk   c                f   || _         |t          |t                    s|g}|pg | _        | j        sdn t	          d | j        D                       dz   | _        || _        d| _        d| _        t                      | _
        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        t                      | _        i | _        d| _        d| _        d| _        d| _        d| _        d| _        d| _        t                      | _        dS )a  Create dataset stats.

        Args:
            metadata: Dict of operators used to create this Dataset from the
                previous one. Typically one entry, e.g., {"map": [...]}.
            parent: Reference to parent Dataset's stats, or a list of parents
                if there are multiple.
            base_name: The name of the base operation for a multi-operator operation.
        Nr   c              3   $   K   | ]}|j         V  d S rJ   )numberr   ps     r3   	<genexpr>z(DatasetStats.__init__.<locals>.<genexpr>  s$      *J*J18*J*J*J*J*J*Jr5   r+   unknown_uuid) rh   r  listparentsrX   r  rk   dataset_uuidrp   r>   streaming_exec_schedule_siter_wait_sr   r   r   r   r   r   r'  r   r   r   iter_total_sextra_metricsr   r   r   r   global_bytes_spilledglobal_bytes_restoreddataset_bytes_spilledstreaming_split_coordinator_s)rG   rh   rc   rk   s       r3   rH   zDatasetStats.__init__  sw   " $,j&>&>XF-3\r\NAAs*J*JT\*J*J*J'J'JQ'N 	 # "0#$ 16& #(''-2WW#!&(-*/'' +077!,1GG"16'+077!"'''(-#(77 '('(*+"*+" *+!*+"*+" 5:GG***r5   c                     t                      S rJ   )rq  rF   s    r3   r  zDatasetStats.stats_actor  s    (***r5   r   re   r)   c                 $    t          || |          S )z>Start recording stats for an op of the given name (e.g., map).)ra   )rG   r   re   s      r3   child_builderzDatasetStats.child_builder  s     $D$0CDDDr5   DatasetStatsSummaryc                 n   g }t          | j                  dk    t          | j        | j        | j        | j        | j        | j        | j	        | j
        | j        | j        | j        | j        | j        | j        | j        | j        | j                  }g }| j        d | j        D             }d}t+          |          D ]\  }}|j        ru|j        d         }t/          |j        t2                    r|j                            dd          nd}t6                              d|dz    d|j         d	| d
           ||z  }fd| j                                        D             }	t+          |	          D ]a\  }}
r>|dk    r||
_        n7|	|dz
           }|j        rd|j        v r|j        d         nd|
_        n||
_        |                     |
           b| j!        r| j!                                        nd}tE          |||| j#        | j$        | j%        | j&        | j'        | j(        | j)        | j*        |          S )zGenerate a `DatasetStatsSummary` object from the given `DatasetStats`
        object, which can be used to generate a summary string.r+   Nc                 6    g | ]}|                                 S r;   )
to_summaryr  s     r3   r   z+DatasetStats.to_summary.<locals>.<listcomp>	  s     $J$J$JQ\\^^$J$J$Jr5   r   rj   sumzParent z (operator: z) contributes z rows to inputc                 P    g | ]"\  }}t                               ||           #S ))is_sub_operator)OperatorStatsSummaryfrom_block_metadata)r   r   rv   r  s      r3   r   z+DatasetStats.to_summary.<locals>.<listcomp>  sK     
 
 
 e !44e_ 5  
 
 
r5   )+rn   rh   IterStatsSummaryr  r   r   r   r   r   r   r'  r   r   r   r  r  r   r   r   r   r  rl   operators_statsr  output_num_rowsr   rS   rh  ri  rb   rm   total_input_num_rowsrB  r  r  r  r  rp   rk   r  r  r  r  )rG   r  
iter_statsstats_summary_parentsparent_total_outputrr   parent_summarylast_parent_op	op_outputop_statsop_statprev_opr  r  s                @r3   r  zDatasetStats.to_summary  s    dm,,q0%'O"$%&+%"."#&&#
 

( !#<#$J$JT\$J$J$J!  !*+@!A!A 	1 	1A~- 1!/!?!C "."@$GGN266ua@@@ 
 va!evv1Mvv]fvvv   $y0#
 
 
 
  $}2244	
 
 
 $H-- 	, 	,JAw C663FG00 'q1uoG $38=AX8X8X  /66  00 0C,""7++++ -D*..000 	"
 #!KN%&&%
 
 	
r5   c                 N    |                                                                  S )ax  Generate a string representing the runtime metrics of a Dataset. This is
        a high level summary of the time spent in Ray Data code broken down by operator.
        It also includes the time spent in the scheduler. Times are shown as the total
        time for each operator and percentages of time are shown as a fraction of the
        total time for the whole dataset.)r  runtime_metricsrF   s    r3   r  zDatasetStats.runtime_metricsL  s        00222r5   rJ   )r)   r  )r\   r]   r^   r_   r|   r   r   r	   r1   rH   propertyr  rB   ra   r  r  r  r;   r5   r3   rd   rd     s         A< A< A< A< h~.^0DDE	A<
 A< A< A< A<F + + X+ AEE EE.6uoE	E E E E]
 ]
 ]
 ]
~3 3 3 3 3 3 3r5   rd   c                      e Zd ZU ed         ed<   ded<   ed          ed<   eed<   eed<   eed<   eed	<   eee	f         ed
<   eed<   eed<   eed<   eed<   	 	 	 d!de
ee                  dedefdZedefd            Zedd ded          fd            Zedd deeef         fd            ZdefdZd"defdZdefdZdefdZdefdZdefd ZdS )#r  r  r  r  r  r  r  r  rp   rk   r  r  r  r  r  NTalready_printedinclude_parentr)   c                    |t                      }d}| j        r/|r-| j        D ]%}|                    |d          }|r
||z  }|dz  }&d}t          | j                  dk    rn| j        d         }|j        }| j        |z   }	|d                    | j        |          z  }|	|v r|d	z  }n|	                    |	           |t          |          z  }nt          | j                  dk    rt          | j        d
          }
|
dk    rd}
|d                    | j        | j        |
          z  }t          | j                  D ]e\  }}|j        }| j        |z   }	|dz  }|d                    ||          z  }|	|v r|dz  }>|	                    |	           |t          |          z  }ft          j                    j        }|r6| j        r/|r	|j        rdnd}||z  }|dt          | j                  z   dz   z  }|t          | j                  z  }t          | j                  dk    r|rt          | j        dz            }t          | j        dz            }|s|r5|dz  }|d                    |          z  }|d                    |          z  }t          | j        dz            }|r|dz  }|d                    |          z  }| j        r|dz  }|dz  }|d| j         dz  }|r|r|d|                                 z   z  }|S )a  Return a human-readable summary of this Dataset's stats.

        Args:
            already_printed: Set of operator IDs that have already had its stats printed
               out.
            include_parent: If true, also include parent stats summary; otherwise, only
               log stats of the latest operator.
            add_global_stats: If true, includes global stats to this summary.
        Returns:
            String with summary statistics for executing the Dataset.
        Nr   F)add_global_stats
r+   r   zOperator {} {}: z[execution cached]
r,   z Operator {} {}: executed in {}s
z	Suboperator {} {}: z	[execution cached]
	z* Extra metrics:     .Az
Cluster memory:
z* Spilled to disk: {}MB
z* Restored from disk: {}MB
z
Dataset memory:
zDataset throughput:
	* Ray Data throughput:  rows/s
)r  r  	to_stringrn   r  rb   r  formatr  rM   r1   r2   rp   rk   rl   r   rw  verbose_stats_logsr  r  r  r  r  r  num_rows_per_sr  )rG   r  r  r  outr  
parent_sumoperators_stats_summaryrb   operator_uuidrounded_totalnr  indent
mb_spilledmb_restoreddataset_mb_spilleds                    r3   r  zDatasetStatsSummary.to_stringe  s   " "!eeO< 	 N 	 \    [[5[QQ
  :%C4KC"&t#$$))&*&:1&=#3AM -=M%,,T[-HHHC//--##M222s2333%&&**!$"3Q77M!! !6==T^]  C /88L.M.M 	8 	8** 7 E $ 1M At.55aGGG O3333CC#''66636777CC(466I 	H$"4 	H +/F/V 
 6MC&T-?)@)@@4GGCs4?###t#$$q((-=(t83>??J :S @AAK J[ J,,299*EEE5<<[III!&t'AC'G!H!H! N,,299:LMMM" Rt..Q43FQQQQ 	1"2 	14$..0000C
r5   c                     | j         r| j         d         j        nd}|r|d         nd}|                                 }|r|sdS ||z  S )zDCalculates the throughput in rows per second for the entire dataset.rj   r   r          )r  r  get_total_wall_time)rG   r  total_num_out_rows	wall_times       r3   r  z"DatasetStatsSummary.num_rows_per_s  sm     9=8LSD $44RS 	 8GM_U33A,,..	! 	 	3!I--r5   currc                     g }| j         D ]8}|r4|j         r-|                    t                              |                     9|| gz   S rJ   )r  extendr   _collect_dataset_stats_summaries)r  summsr  s      r3   r  z4DatasetStatsSummary._collect_dataset_stats_summaries  s^      	V 	VA VQY V0QQRSTTUUUv~r5   summc                     t          d | j        D                       }t          d | j        D                       }||fS )Nc              3   $   K   | ]}|j         V  d S rJ   )earliest_start_timer   opss     r3   r  z:DatasetStatsSummary._find_start_and_end.<locals>.<genexpr>  s%      UUS4UUUUUUr5   c              3   $   K   | ]}|j         V  d S rJ   )latest_end_timer  s     r3   r  z:DatasetStatsSummary._find_start_and_end.<locals>.<genexpr>  s%      MM,MMMMMMr5   )rU   r  rX   )r  earliest_start
latest_ends      r3   _find_start_and_endz'DatasetStatsSummary._find_start_and_end  sK    UU@TUUUUUMM8LMMMMM
z))r5   c                    |                                  dt          dt          dt          ffd}t                              |           }d}|D ]P}t          |j                  dk    r6t                              |          \  }}||z
  }| ||j        |          z  }Q| |d| j	                  z  }| |d          z  }|S )	Nr   rK   r)   c                 X    dk    r|z  nd}d|  dt          |           d|dz  ddS )Nr   z* : z (d   z.3fz%)
r4   )r   rK   fractiontotal_wall_times      r3   fmt_linez5DatasetStatsSummary.runtime_metrics.<locals>.fmt_line  sJ    1@11D1Dto--!HEEED		EEX^EEEEEr5   zRuntime Metrics:
r   
SchedulingTotal)
r  r1   rB   r  r  rn   r  r  rk   r  )	rG   r  	summariesr  r  r  r  op_total_timer  s	           @r3   r  z#DatasetStatsSummary.runtime_metrics  s   2244	F3 	Fe 	F 	F 	F 	F 	F 	F 	F (HHNN	" 	? 	?D4'((1,,-@-T-T. .*
 !+^ ;xx>>>xxd&DEEExx111
r5   r   c                    t                    }d                    fd| j        D                       }d                    fd| j        D                       }d                    fd| j                                        D                       }|r	d| d| dnd}|r	d| d| dnd}|r	d| d| dnd}d                    g | d| d	| j         d| d
| j         d| d| j         d| d| d| d| d| d| j	        
                    dz              d| d| j        dz   d| d| j        dz   d| d| j        dz   d| d| d| d          S )Nr  c                 @    g | ]}|                     d z             S r,   __repr__)r   sslevels     r3   r   z0DatasetStatsSummary.__repr__.<locals>.<listcomp>  s)    CCCR[[##CCCr5   c                 @    g | ]}|                     d z             S r  r  )r   psr   s     r3   r   z0DatasetStatsSummary.__repr__.<locals>.<listcomp>  s)    !P!P!PR"++eai"8"8!P!P!Pr5   c              3   R   K   | ]!\  }}t          d z              | d| dV  "dS )r,   r  ,N)r<   )r   rs   rt   r   s      r3   r  z/DatasetStatsSummary.__repr__.<locals>.<genexpr>  s]       "
 "
1 eai((3!33q333"
 "
 "
 "
 "
 "
r5   ,
z   r   zDatasetStatsSummary(
z   dataset_uuid=z   base_name=z
   number=z   extra_metrics={z},
z   operators_stats=[z],
z   iter_stats=r+   z   global_bytes_spilled=r  zMB,
z   global_bytes_restored=z   dataset_bytes_spilled=z   parents=[r   )r<   r   r  r  r  rm   r  rk   r  r  r  r  r  r  )rG   r   r  r  parent_statsr  s    `    r3   r  zDatasetStatsSummary.__repr__  s   &&))CCCCd.BCCC
 
 yy!P!P!P!P4<!P!P!PQQ		 "
 "
 "
 "
*0022"
 "
 "
 
 
 5DK000V0000 	 =IP8L88V8888b=JR9]99f9999PR   v       '+'8      $(N        "&    	    +8	   
    
 ,;        &*_%=%=eAg%F%F        04/H3/N        150JS0P        150JS0P        $0         	
r5   c                     d t                               |           D             }t          |          dk    rdS t          d |D                       }t	          d |D                       }||z
  S )zCalculate the total wall time for the dataset, this is done by finding
        the earliest start time and latest end time for any block in any operator.
        The wall time is the difference of these two times.
        c                 r    g | ]4}t          |j                  d k    t                              |          5S r   )rn   r  r  r  r   r  s     r3   r   z;DatasetStatsSummary.get_total_wall_time.<locals>.<listcomp>  sF     
 
 
4'((1,,  33D99,,,r5   r   c              3   &   K   | ]}|d          V  dS )r   Nr;   r   	start_ends     r3   r  z:DatasetStatsSummary.get_total_wall_time.<locals>.<genexpr>"  s&       J J)1 J J J J J Jr5   c              3   &   K   | ]}|d          V  dS )r+   Nr;   r  s     r3   r  z:DatasetStatsSummary.get_total_wall_time.<locals>.<genexpr>#  s&      FFiYq\FFFFFFr5   )r  r  rn   rU   rX   )rG   
start_endsr  r  s       r3   r  z'DatasetStatsSummary.get_total_wall_time  s    

 
+LLTRR
 
 


 z??a1  J Jz J J JJJNFF:FFFFFJ..r5   c                 h    t                               |           }t          d |D                       S )zGCalculate the sum of the wall times across all blocks of all operators.c              3   R   K   | ]"}t          d  |j        D                       V  #dS )c              3   ^   K   | ](}|j         r|j                             d d          ndV  )dS r  r   N)r  rS   r  s     r3   r  zJDatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>.<genexpr>+  sT         47=GCM%%eQ///a     r5   N)r  r  r
  s     r3   r  z@DatasetStatsSummary.get_total_time_all_blocks.<locals>.<genexpr>)  sa       
 
    #3    
 
 
 
 
 
r5   )r  r  r  )rG   r  s     r3   get_total_time_all_blocksz-DatasetStatsSummary.get_total_time_all_blocks&  sE    'HHNN	 
 
 "
 
 
 
 
 	
r5   c                     t          d | j        D                       }|t          d | j        D                       z   S )Nc              3   >   K   | ]}|                                 V  d S rJ   )get_total_cpu_timer  s     r3   r  z9DatasetStatsSummary.get_total_cpu_time.<locals>.<genexpr>4  s.      FFA--//FFFFFFr5   c              3   L   K   | ]}|j                             d d          V   dS r  )cpu_timerS   r   r  s     r3   r  z9DatasetStatsSummary.get_total_cpu_time.<locals>.<genexpr>5  sC        
  
*,BKOOE1%% 
  
  
  
  
  
r5   )r  r  r  )rG   r  s     r3   r  z&DatasetStatsSummary.get_total_cpu_time3  sX    FFFFFFF
C  
  
040D 
  
  
 
 
 
 	
r5   c                     d | j         D             }|rt          |          nd}| j        s|S t          |gd | j        D             R  S )Nc                 6    g | ]}|                                 S r;   )get_max_heap_memoryr  s     r3   r   z;DatasetStatsSummary.get_max_heap_memory.<locals>.<listcomp>:  s$    GGGQ..00GGGr5   r   c                 D    g | ]}|j                             d d          S )rX   r   )memoryrS   r  s     r3   r   z;DatasetStatsSummary.get_max_heap_memory.<locals>.<listcomp>A  s(    EEE"bimmE1%%EEEr5   )r  rX   r  )rG   parent_memory
parent_maxs      r3   r  z'DatasetStatsSummary.get_max_heap_memory9  sr    GG$,GGG+8?S'''a
# 	
EE0DEEE
 
 
 	
r5   )NTTr	  )r\   r]   r^   r	   __annotations__r  r1   rB   r   r   r   r   boolr  r  r  r  r  r   r  r  r  r  r  r  r  r;   r5   r3   r  r  U  sE         01111""""'((((KKKNNNS>!!!$$$$ /3#	W W!#c(+W W
 
W W W Wr . . . . X." #	#	$   \ *"7 *E%,<O * * * \*
    (
 
3 
 
 
 
@/U / / / /"
5 
 
 
 

E 
 
 
 
	
U 	
 	
 	
 	
 	
 	
r5   r  c            	       <   e Zd ZU eed<   eed<   eed<   eed<   eed<   eed<   dZee	eef                  ed<   dZ
ee	eef                  ed	<   dZee	eef                  ed
<   dZee	eef                  ed<   dZee         ed<   dZee	eef                  ed<   dZee	eef                  ed<   dZee	eef                  ed<   dZee	eef                  ed<   edefd            Zedefd            Zededee         dedd fd            ZdefdZddefdZdS )r  rb   r  rp   r  r  block_execution_summary_strNr  r  udf_timer  r  r  output_size_bytes
node_count	task_rowsr)   c                 L    | j         r| j        sdS | j         d         | j        z  S )Nr  r  )r  rp   rF   s    r3   r  z#OperatorStatsSummary.num_rows_per_sd  s4    
 # 	4+< 	3#E*T->>>r5   c                 r    | j         r| j        r| j        d         sdS | j         d         | j        d         z  S )zCCalculates the estimated single-task throughput in rows per second.r  r  )r  r  rF   s    r3   num_rows_per_task_sz(OperatorStatsSummary.num_rows_per_task_sm  sF     # 	4> 	PUAV 	3#E*T^E-BBBr5   block_statsc                 P
   d |D             }d}d}d\  }}|r7t          d |D                       }t          d |D                       }||z
  }|r#d                    t          |                    }	nE|r<t	          |d          }|dk    rd}d                    t          |          |          }	nd	}	|	d
z  }	t          j        t                    }
|D ]/}|j        &|j	        |
|j	        j
        xx         |j        z  cc<   0d}t          |
          dk    rt          |
                                          t          |
                                          t          t          j        t          |
                                                              t          |
          d}d                    t          |
          |	          }	d\  }}}}|rt          d |D                       t          d |D                       t          j        d |D                       t          d |D                       d}t          d |D                       t          d |D                       t          j        d |D                       t          d |D                       d}d |D             }t          |          t          |          t          t          j        |                    d}t          d |D                       t          d |D                       t          j        d |D                       t          d |D                       d}d}d |D             }|rMt          |          t          |          t          t          j        |                    t          |          d}d}d |D             }|rMt          |          t          |          t          t          j        |                    t          |          d}d}|rt          j        t                     }|D ]'}||j                                     |j
                   (d  |                                D             }t          |                                          t          |                                          t          t          j        t          |                                                              t          |          d}d}t)          ||||||	|||||||||!          S )"a  Calculate the stats for a operator from a given list of blocks,
        and generates a `OperatorStatsSummary` object with the results.

        Args:
            block_stats: List of `BlockStats` to calculate stats of
            operator_name: Name of operator associated with `blocks`
            is_sub_operator: Whether this set of blocks belongs to a sub operator.
        Returns:
            A `OperatorStatsSummary` object initialized with the calculated statistics
        c                 *    g | ]}|j         	|j         S rJ   )
exec_statsr   ms     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  s!    TTTq1<;Sal;S;S;Sr5   r   )r   r   c              3   $   K   | ]}|j         V  d S rJ   )start_time_sr   s     r3   r  z;OperatorStatsSummary.from_block_metadata.<locals>.<genexpr>  s$      %I%Ian%I%I%I%I%I%Ir5   c              3   $   K   | ]}|j         V  d S rJ   )
end_time_sr   s     r3   r  z;OperatorStatsSummary.from_block_metadata.<locals>.<genexpr>  s$      !C!C1!,!C!C!C!C!C!Cr5   z{} blocks produced
r,   z{} blocks produced in {}sr   r  N)rU   rX   meancountz{} tasks executed, {})NNNNc                     g | ]	}|j         
S r;   wall_time_sr   r  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>      >>>aAM>>>r5   c                     g | ]	}|j         
S r;   r:  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  r=  r5   c                     g | ]	}|j         
S r;   r:  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  s     C C C1 C C Cr5   c                     g | ]	}|j         
S r;   r:  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  r=  r5   )rU   rX   r7  r  c                     g | ]	}|j         
S r;   
cpu_time_sr<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>      ===QAL===r5   c                     g | ]	}|j         
S r;   rB  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rD  r5   c                     g | ]	}|j         
S r;   rB  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>       B B B! B B Br5   c                     g | ]	}|j         
S r;   rB  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rD  r5   c                 B    g | ]}t          |j        pd dz  d          S )r   i   r,   )r2   max_uss_bytesr<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  s;       EFq+!<a@@  r5   )rU   rX   r7  c                     g | ]	}|j         
S r;   
udf_time_sr<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rD  r5   c                     g | ]	}|j         
S r;   rL  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rD  r5   c                     g | ]	}|j         
S r;   rL  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rG  r5   c                     g | ]	}|j         
S r;   rL  r<  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  rD  r5   c                 *    g | ]}|j         	|j         S rJ   )num_rowsr1  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  s!    UUU!aj>T1:>T>T>Tr5   c                 *    g | ]}|j         	|j         S rJ   )
size_bytesr1  s     r3   r   z<OperatorStatsSummary.from_block_metadata.<locals>.<listcomp>  s(     
 
 
1IAL1I1I1Ir5   c                 4    i | ]\  }}|t          |          S r;   )rn   )r   r$  taskss      r3   r1  z<OperatorStatsSummary.from_block_metadata.<locals>.<dictcomp>  s$    RRRe4URRRr5   )rb   r  rp   r  r  r%  r  r  r&  r  r  r  r'  r(  r)  )rU   rX   r  rn   r2   r   r   r  rR  r0  task_idxvaluesnpr7  r  r  r  r  rM   rm   r  )clsrb   r-  r  r0  r  rp   r  r  exec_summary_strr)  metatask_rows_statswall_time_stats	cpu_statsmemory_stats	udf_statsmemory_stats_mboutput_num_rows_statsr  output_size_bytes_statsr'  node_counts_stats
node_tasksr-   node_countsr  s                              r3   r  z(OperatorStatsSummary.from_block_metadatax  s   " UTKTTT
/3,_ 	A #&%I%Ij%I%I%I"I"I!!C!C
!C!C!CCCO*-@@L 	%5<<S__MM 	& %lA 6 6 A%%$%M#>#E#E
OO]$ $   $& $+C00	 	E 	ED}(T_-H$/2333t}D333y>>A9++--..9++--..BGD)9)9););$<$<==>>Y	 O  7==I 0    ?U;L) 	>>:>>>??>>:>>>?? C C
 C C CDD>>:>>>??	 O ==*===>>==*===>> B Bz B B BCC==*===>>	 I JT  O ?++?++BGO4455 L ==*===>>==*===>> B Bz B B BCC==*===>>	 I !%UU{UUU 	?++?++BGO4455?++	% %! #'
 
"-
 
 
  	,--,--BG$56677,--	' '# ! 	$055J 6 619%))!*5555RRz?O?O?Q?QRRRK;--//00;--//00BGD););)=)=$>$>??@@[))	! !  $#'+% 3+(8%!515(%
 
 
 	
r5   c           
      8   | j         rdnd}| j        }| j        }|rl||z  }|d                    t	          |d                   t	          |d                   t	          |d                   t	          |d                             z  }| j        }|rl||z  }|d                    t	          |d                   t	          |d                   t	          |d                   t	          |d                             z  }| j        }|rl||z  }|d	                    t	          |d                   t	          |d                   t	          |d                   t	          |d                             z  }| j        }|r1||z  }|d
                    |d         |d         |d                   z  }| j        }|r8||z  }|d                    |d         |d         |d         |d                   z  }| j	        }|r8||z  }|d                    |d         |d         |d         |d                   z  }| j
        }	|	r8||z  }|d                    |	d         |	d         |	d         |	d                   z  }| j        }
|
r8||z  }|d                    |
d         |
d         |
d         |
d                   z  }| j        rc| j        r\| j        r| j        nd}|d         }||z  }|dz  }||d| dz   z  }||d| dz   z  }||d| j         dz   z  }||d| j         dz   z  }|S )E  For a given (pre-calculated) `OperatorStatsSummary` object (e.g. generated from
        `OperatorStatsSummary.from_block_metadata()`), returns a human-friendly string
        that summarizes operator execution statistics.

        Returns:
            String with summary statistics for executing the given operator.
        r  r   z6* Remote wall time: {} min, {} max, {} mean, {} total
rU   rX   r7  r  z5* Remote cpu time: {} min, {} max, {} mean, {} total
z.* UDF time: {} min, {} max, {} mean, {} total
z8* Peak heap memory usage (MiB): {} min, {} max, {} mean
z?* Output num rows per block: {} min, {} max, {} mean, {} total
zA* Output size bytes per block: {} min, {} max, {} mean, {} total
z?* Output rows per task: {} min, {} max, {} mean, {} tasks used
r8  z9* Tasks per node: {} min, {} max, {} mean; {} nodes used
r   z* Operator throughput:
z	* Total input num rows: z rows
z	* Total output num rows: r  r  z%	* Estimated single task throughput: )r  r%  r  r  r4   r  r&  r  r  r'  r)  r(  r  r,  r  )rG   r  r  r^  r_  ra  r`  rc  rd  r)  node_count_statstotal_num_in_rowsr  s                r3   __str__zOperatorStatsSummary.__str__  s     -52.. 	6MCLSSOE*++OE*++OF+,,OE*++	  C M	 	6MCKRRIe$%%Ie$%%If%&&Ie$%%	  C M	 	6MCDKKIe$%%Ie$%%If%&&Ie$%%	  C { 	6MCNUUU#U#V$  C !% 4  		6MCRf%e,%e,%f-%e,	 C #'"8" 		6MCTf'.'.'/'.	 C N	 		6MCRf% % &!'"	 C  ? 	6MCOVV ' ' ( )	  C  	4#; 	-1-FM))A  "7u!=6MC--CV9JVVVVC X:LXXXXC  '   C
  ,   C
 
r5   r   c                 r   t          |          }|| j        rt          d          ndz  }d | j        pi                                 D             }d | j        pi                                 D             }d | j        pi                                 D             }d | j        pi                                 D             }d | j        pi                                 D             }d | j        pi                                 D             }d	                    g | d	| d
| j
         d| d| j         d| dt          | j                   d| d| j         | d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d|pd d| d          }	|	S )ri  r+   r   c                 4    i | ]\  }}|t          |          S r;   r  r   rs   rt   s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s$    PPPA1c!ffPPPr5   c                 4    i | ]\  }}|t          |          S r;   r  ro  s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s$    III41aQAIIIr5   c                 4    i | ]\  }}|t          |          S r;   r  ro  s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s$    JJJda3q66JJJr5   c                 4    i | ]\  }}|t          |          S r;   r  ro  s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s1     !
 !
 !
!QAs1vv!
 !
 !
r5   c                 4    i | ]\  }}|t          |          S r;   r  ro  s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s1     #
 #
 #
!QAs1vv#
 #
 #
r5   c                 4    i | ]\  }}|t          |          S r;   r  ro  s      r3   r1  z1OperatorStatsSummary.__repr__.<locals>.<dictcomp>  s$    RRR$!QAs1vvRRRr5   zOperatorStatsSummary(
z   operator_name='z',
z   is_suboperator=r  z   time_total_s=z   block_execution_summary_str=z   wall_time=Nz   cpu_time=z
   memory=z   output_num_rows=z   output_size_bytes=z   node_count=r   )r<   r  r  rm   r  r  r  r'  r(  r   rb   r4   rp   r%  )
rG   r   r  r^  r_  r`  rc  rd  node_conut_statsr  s
             r3   r  zOperatorStatsSummary.__repr__  s     &&t';C.###CPP$.2FB1M1M1O1OPPPIIDM,?R+F+F+H+HIII	JJt{/@b.G.G.I.IJJJ!
 !
#'#7#=2"D"D"F"F!
 !
 !
#
 #
#'#9#?R"F"F"H"H#
 #
 #
 SR4?3Hb2O2O2Q2QRRR   v       )-);      )-)=        (+4+<'='=   
    
 7;6V      %4$;t        $-#4        ".!5        +@*G4        -D,Kt        &6%=          	 
r5   r	  )r\   r]   r^   r1   r"  r#  rB   r  r   r   r  r&  r  r  r  r  r'  r(  r)  r  r  r,  classmethodr	   r   r  rl  r  r;   r5   r3   r  r  E  sP         
 !$$$$ -1IxS%Z()000+/HhtCJ'(///+/HhtCJ'(///)-FHT#u*%&---*.(3-...26OXd3:./66648xS%Z 01888-1Jc5j)*111,0IxS%Z()000? ? ? ? X? CU C C C XC R
R
 *%R
 	R

 
 R
 R
 R
 [R
hv v v v vp$ $3 $ $ $ $ $ $r5   r  c                       e Zd ZU eed<   eed<   eed<   eed<   eed<   eed<   eed<   eed<   eed	<   eed
<   eed<   eed<   eed<   eed<   eed<   eed<   eed<   defdZdefdZddefdZ	dS )r  	wait_timeget_ref_bundles_timeget_time	next_timeformat_timecollate_timefinalize_batch_timetime_to_first_batch
block_time	user_timeinitialize_time
total_timestreaming_split_coord_timer   r   r   r   r)   c                 *    |                                  S rJ   )r  rF   s    r3   rl  zIterStatsSummary.__str__  s    ~~r5   c           
      <   d}| j                                         s| j                                        s| j                                        s| j                                        s~| j                                        se| j                                        sL| j                                        s3| j                                        s| j	                                        r7|dz  }| j                                        r<|d
                    t          | j                                                            z  }| j                                        r<|d
                    t          | j                                                            z  }| j                                         r<|d
                    t          | j                                                             z  }| j                                        r<|d
                    t          | j                                                            z  }| j                                        r<|d
                    t          | j                                                            z  }|dz  }| j                                        r|d	
                    t          | j                                                  t          | j                                                  t          | j                                                  t          | j                                                            z  }| j                                        r|d

                    t          | j                                                  t          | j                                                  t          | j                                                  t          | j                                                            z  }| j                                        rd}||
                    t          | j                                                  t          | j                                                  t          | j                                                  t          | j                                                            z  }| j                                        rd}||
                    t          | j                                                  t          | j                                                  t          | j                                                  t          | j                                                            z  }| j                                        r|d
                    t          | j                                                  t          | j                                                  t          | j                                                  t          | j                                                            z  }| j	                                        rd}||
                    t          | j	                                                  t          | j	                                                  t          | j	                                                  t          | j	                                                            z  }t#          j                    j        r\|dz  }|d
                    | j                  z  }|d
                    | j                  z  }|d
                    | j                  z  }| j        r|d
                    | j                  z  }| j                                        dk    r1|dz  }|t          | j                                                   dz  }|S )Nr   z"
Dataset iterator time breakdown:
z* Total time overall: {}
z>    * Total time in Ray Data iterator initialization code: {}
zE    * Total time user thread is blocked by Ray Data iter_batches: {}
zP    * Total time spent waiting for the first batch after starting iteration: {}
z/    * Total execution time for user thread: {}
zC* Batch iteration time breakdown (summed across prefetch threads):
z:    * In get RefBundles: {} min, {} max, {} avg, {} total
z5    * In ray.get(): {} min, {} max, {} avg, {} total
z:    * In batch creation: {} min, {} max, {} avg, {} total
z<    * In batch formatting: {} min, {} max, {} avg, {} total
z6    * In collate_fn: {} min, {} max, {} avg, {} total
zA    * In host->device transfer: {} min, {} max, {} avg, {} total
zBlock locations:
z    * Num blocks local: {}
z    * Num blocks remote: {}
z&    * Num blocks unknown location: {}
z    * Prefetched bytes: {}
r   z+Streaming split coordinator overhead time: r  )r  rS   r  r  ry  rz  r{  r|  r}  r~  r  r4   r  r  rU   rX   rZ   r   rw  'enable_get_object_locations_for_metricsr   r   r   r   r  )rG   r  batch_creation_str
format_strs       r3   r  zIterStatsSummary.to_string  sB   O!!b	I'++--b	I ""$$b	I (,,..	b	I
 }  ""b	I ~!!##b	I ##%%b	I  $$&&b	I '++--b	I 99C""$$ W3::3t?R?R?T?T;U;UVVV#'')) !6#d&:&>&>&@&@"A"ABB ""$$ !6#do&9&9&;&;"<"<== '++-- !6#d&>&B&B&D&D"E"EFF ~!!## IPP**,,--   VC (,,.. T[[1557788155778815577881557788	   }  "" OVV))++,,))++,,))++,,))++,,	   ~!!## 	Q # )00**,,--**,,--**,,--**,,--	   ##%% 	S  z(((,,..//(,,..//(,,..//(,,..//	    $$&& PWW)--//00)--//00)--//00)--//00	   '++-- 	X  z((0446677044667704466770446677	   &((P ++5<<T=STTT6==d>UVVV@GG.   ) Y5<<T=WXXX.224499DD#d=AACCDDHHHH
r5   r   c                    t          |          }d                    g d| dt          | j                                                  pd  d| dt          | j                                                  pd  d| dt          | j                                                  pd  d| d| j        pd  d| d| j        pd  d| d	| j	        pd  d| d
| j
        pd  d| dt          | j                                                  pd  d| dt          | j                                                  pd  d| dt          | j                                                  pd  d| dt          | j                                                  pd  d| d          S )Nr   zIterStatsSummary(
z   wait_time=r  z   get_ref_bundles_time=z   get_time=z   iter_blocks_local=z   iter_blocks_remote=z   iter_unknown_location=z   iter_prefetched_bytes=z   next_time=z   format_time=z   user_time=z   total_time=r   )r<   r   r4   rx  rS   ry  rz  r   r   r   r   r{  r|  r  r  )rG   r   r  s      r3   r  zIterStatsSummary.__repr__;  sS   &&        $'(:(:(<(<$=$=$E      /243L3P3P3R3R/S/S/[W[        $'t}'8'8':':#;#;#Ct    	    -1,B,Jd	   
    
 .2-D-L        150J0Rd        150J0Rd        %((:(:(<(<$=$=$E        '*$*:*>*>*@*@&A&A&IT        %((:(:(<(<$=$=$E        &))<)<)>)>%?%?%G4         	
r5   Nr	  )
r\   r]   r^   r>   r"  r  r1   rl  r  r  r;   r5   r3   r  r    s=         OOO %%%%         f3 f f f fP
 
3 
 
 
 
 
 
r5   r  )r   r6   )Or   rR  loggingrK   r   
contextlibr   dataclassesr   r   typingr   r   r	   r
   r   r   r   r   uuidr   numpyrY  r!  	ray.actorr   ray.data._internal.block_listr   *ray.data._internal.execution.dataset_stater   .ray.data._internal.execution.interfaces.commonr   :ray.data._internal.execution.interfaces.op_runtime_metricsr   r   r   r   r   $ray.data._internal.metadata_exporterr   r   r   ray.data._internal.utilr   ray.data.blockr   ray.data.contextr   ray.util.annotationsr   ray.util.metricsr    r!   r"   r#   ray.util.scheduling_strategiesr$   	getLoggerr\   rh  rm  rn  r<  r1   r|   rB   r4   r  r<   r>   ra   ro  r   rq  rs  rd   r  r  r  r;   r5   r3   <module>r     s          # # # # # # % % % % % % ) ) ) ) ) ) ) )	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	           



 ! ! ! ! ! ! 3 3 3 3 3 3 C C C C C C R R R R R R                      
 - , , , , , % % % % % % ( ( ( ( ( ( - - - - - - > > > > > > > > > > > > I I I I I I		8	$	$) . 
 d:&&'	; ;3 ; ; ; ;+ + +C + + + + +#V #V #V #V #V #V #V #VL* * * * * * * *Z Ql	 l	 l	 l	 l	 l	 l	 l	^;{#;    <k k k k k k k k\y3 y3 y3 y3 y3 y3 y3 y3x 
k
 k
 k
 k
 k
 k
 k
  k
\ c c c c c c c cL ^
 ^
 ^
 ^
 ^
 ^
 ^
 ^
 ^
 ^
r5   