
    &`i                        U d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' e	rd dl(m)Z)m*Z*  e j+        e,          Z- edd          Z.ee/         e0d<   eefZ1 G d d          Z2ddde3fdZ4dede/fdZ5 G d de          Z6 G d de6          Z7dS )    N)ABCabstractmethod)defaultdict)TYPE_CHECKINGCallableDictIterableListOptional)env_bool	env_float)ExecutionOptionsExecutionResources)PhysicalOperatorReportsExtraResourceUsage)AllToAllOperator)HashShufflingOperatorBase)InputDataBuffer)ZipOperator)memory_string)GiB)DataContextlog_once)OpStateTopologyRAY_DATA_DEBUG_RESOURCE_MANAGER1LOG_DEBUG_TELEMETRY_FOR_RESOURCE_MANAGER_OVERRIDEc                      e Zd ZdZdZ edd          ZdZddded	e	g e
f         d
efdZd ZdddddefdZd Zde
fdZde
fdZde
fdZde
fdZdede
fdZdedefdZdedefdZdededefdZdefdZed)d            Zdedefd Zdede e
         fd!Z!dedefd"Z"de#e         fd#Z$dede%e         fd$Z&dede%e         fd%Z'dedefd&Z(dedefd'Z)d(S )*ResourceManagerz@A class that manages the resource usage of a streaming executor.   +RAY_DATA_OBJECT_STORE_MEMORY_LIMIT_FRACTION      ?g      ?topologyr   optionsget_total_resourcesdata_contextc                    || _         || _        || _        t          j                    | _        d| _        t          j                    | _        t          j                    | _        t          j                    | _	        i | _
        i | _        i | _        t          t                    | _        t          t                    | _        d | _        |j        r5t'          d |D                       }|rt)          | |j                  | _        |j        |j        n!|                                 r| j        n| j        | _        |                                  d S )Nr   c              3   >   K   | ]}|                                 V  d S N)%implements_accurate_memory_accounting).0ops     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/execution/resource_manager.py	<genexpr>z+ResourceManager.__init__.<locals>.<genexpr>`   s?          ?A88::               )	_topology_options_get_total_resourcesr   zero_global_limits_global_limits_last_update_time_global_usage_global_running_usage_global_pending_usage
_op_usages_op_running_usages_op_pending_usagesr   int_mem_op_internal_mem_op_outputs_op_resource_allocatorop_resource_reservation_enabledallReservationOpResourceAllocatorop_resource_reservation_ratio+override_object_store_memory_limit_fractionop_resource_allocator_enabled*DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION9DEFAULT_OBJECT_STORE_MEMORY_LIMIT_FRACTION_NO_RESERVATION#_object_store_memory_limit_fraction)_warn_about_object_store_memory_if_needed)selfr$   r%   r&   r'   should_enables         r.   __init__zResourceManager.__init__A   sd    "$7!0577/0,/466%7%<%>%>"%7%<%>%>"FHNPNP >I=M=M =H<L<LGK#7 		      EM       M  .L,D/ /+ GS DD 5577T??S 	0 	6688888r0   c           
         ddl }ddlm} ddlm} |                                sdS |                                }|                    dd          }|                    dd          }|dk    rQ||z  }|dk     rH |d          r?t          	                    | d	|d
d|t          z  dd|t          z  dd           dS dS dS dS )zDWarn if object store memory is configured below 50% of total memory.r   N)WARN_PREFIXr   memoryobject_store_memoryr#   $ray_data_object_store_memory_warningz. Ray's object store is configured to use only z.1%z of available memory (.1fzGiB out of a"  GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.)rayray.data.contextrO   ray.util.debugr   is_initializedcluster_resourcesgetloggerwarningr   )rK   rT   rO   r   rX   total_memoryrQ   object_store_fractions           r.   rJ   z9ResourceManager._warn_about_object_store_memory_if_neededt   sK   


000000++++++!!## 	F1133(,,Xq99/334I1MM !$7,$F!$s**xx60 0* " u u,ku uH[^aHaku u*S0du u u      +***r0   r-   r   stater   returnc                    t          |t                    rdS |j        j        pd}||j        j        z  }|                                }|j        D ]}||j        j        |j        j        z   z  }|| j	        |<   || j
        |<   ||z   S )Nr   )
isinstancer   metrics"obj_store_mem_pending_task_outputsobj_store_mem_internal_outqueueoutput_queue_bytesoutput_dependenciesobj_store_mem_internal_inqueue!obj_store_mem_pending_task_inputsr>   r?   )rK   r-   r^   mem_op_internalmem_op_outputsnext_ops         r.   _estimate_object_store_memoryz-ResourceManager._estimate_object_store_memory   s    
 b/** 	1 *GL12:EE 1133- 	 	G>/CDNN
 %4b!#1R //r0   c                    t          ddd          | _        t          ddd          | _        t          ddd          | _        | j                                         | j                                         | j                                         t          | j	        
                                          D ]{\  }}|                                 |                                }|                                }|                                }|j        rJ |j        rJ |j        rJ |                     ||          }|                    |          }|                    |          }t%          |t&                    r'|                    |                                           || j        |<   || j        |<   || j        |<   | j                            |          | _        | j                            |          | _        | j                            |          | _        |j        |j        _        }| j        "| j                            | j                   dS dS )zRecalculate resource usages.r   rQ   N)limits)r   r7   r8   r9   r:   clearr;   r<   reversedr1   itemsupdate_resource_usagecurrent_processor_usagerunning_processor_usagepending_processor_usagerQ   rl   copyra   r   addextra_resource_usage_metricsobj_store_mem_usedr@   update_budgetsr5   )rK   r-   r^   op_usageop_running_usageop_pending_usageused_object_stores          r.   update_usageszResourceManager.update_usages   sn   
 01a88%71a%@%@"%71a%@%@"%%'''%%''' "$."6"6"8"899 &	J &	JIB $$&&&1133H!99;;!99;;3333';;;;';;;; $ B B2u M M}}9J}KKH/44$5  5     "788 8R4466777"*DOB*:D#B'*:D#B' "&!3!7!7!A!AD)-)C)G)G * *D& *.)C)G)G * *D& .6-IBK**&2'66* 7      32r0   c                     | j         S )z5Return the global resource usage at the current time.)r7   rK   s    r.   get_global_usagez ResourceManager.get_global_usage   s    !!r0   c                     | j         S )z=Return the global running resource usage at the current time.)r8   r   s    r.   get_global_running_usagez(ResourceManager.get_global_running_usage       ))r0   c                     | j         S )z=Return the global pending resource usage at the current time.)r9   r   s    r.   get_global_pending_usagez(ResourceManager.get_global_pending_usage   r   r0   c                    t          j                     | j        z
  | j        k     r| j        S t          j                     | _        | j        j        }| j        j        }|                                 }| j        }|	                    |j
        |z            }|                    |                              |          | _        | j        S )a  Return the global resource limits at the current time.

        This method autodetects any unspecified execution resource limits based on the
        current cluster size, refreshing these values periodically to support cluster
        autoscaling.
        rn   )timer6   GLOBAL_LIMITS_UPDATE_INTERVAL_Sr5   r2   resource_limitsexclude_resourcesr3   rI   rw   rQ   minsubtract)rK   default_limitsexcludetotal_resourcesdefault_mem_fractions        r.   get_global_limitsz!ResourceManager.get_global_limits   s     IKK$>>23 3 &&/3y{{,6-13355#G).. / C"!# / 
 
 -00AAJJ7SS""r0   c                     | j         |         S )zDReturn the resource usage of the given operator at the current time.)r:   rK   r-   s     r.   get_op_usagezResourceManager.get_op_usage  s    r""r0   c                     | j         |         S )zFReturn the memory usage of the internal buffers of the given operator.)r>   r   s     r.   get_mem_op_internalz#ResourceManager.get_mem_op_internal  s    $R((r0   c                     | j         |         S )z=Return the memory usage of the outputs of the given operator.)r?   r   s     r.   get_mem_op_outputsz"ResourceManager.get_mem_op_outputs  s    #B''r0   verbosec          	         || j         vrd}nh| j         |         j        dd}| j         |         j        r|d| j         |         j        ddz  }|d| j         |                                          dz  }t          t          }|rH|dt          |                     |                     d	t          |                     |                     d
z  }| j        | j        	                    |          }|r7|d|j        dz  }|d|j        dz  }|d|                                 d
z  }| j        
                    |          }|r|d|j        dz  }|d|j        dz  }|d|                                 z  }t          | j        t                    r6t          | j        j                            |d                    }|d	| d
z  }|S )zbReturn a human-readable string representation of the resource usage of
        the given operator.zn/arS   z CPUz, z GPUz object storeNz (in=z,out=)z, alloc=(cpu=z,gpu=z,obj_store=z, budget=(cpu=r   )r;   cpugpuobject_store_memory_strr   r   r   r   r@   get_allocation
get_budgetra   rC   _output_budgetsrY   )rK   r-   r   	usage_str
allocationbudgetreserved_for_outputs          r.   get_op_usage_strz ResourceManager.get_op_usage_str  s_    T,,,II226:DDDDI&r*. LK$"9""="AKKKKK	bd5b9QQSSbbbbI =HGG 	DEd&>&>r&B&BCC E E$T%<%<R%@%@AAE E EI *6!8GGKK
 W!E!E!E!EEI!=!=!=!==I!Vz/Q/Q/S/S!V!V!VVI4??CC D!B&*!B!B!BBI!9!9!9!99I!Qv/M/M/O/O!Q!QQI "35S  D /< 7GKKBPQRR/ /+ "%C-@%C%C%CC	r0   c                     | j         duS )z.Return whether OpResourceAllocator is enabled.Nr@   r   s    r.   rF   z-ResourceManager.op_resource_allocator_enabledD  s    *$66r0   OpResourceAllocatorc                 "    | j         J | j         S )zReturn the OpResourceAllocator.r   r   s    r.   op_resource_allocatorz%ResourceManager.op_resource_allocatorH  s     *666**r0   c                 P    | j                             || j        | j                  S )N)task_resource_usageoutput_object_store_usage)r@   max_task_output_bytes_to_readr:   r?   r   s     r.   r   z-ResourceManager.max_task_output_bytes_to_readN  s1    *HH $&*&: I 
 
 	
r0   c                 H    | j         dS | j                             |          S )z_Return the budget for the given operator, or None if the operator
        has unlimited budget.N)r@   r   r   s     r.   r   zResourceManager.get_budgetU  s)     &.4*55b999r0   c                 V    |                                  o|                                 S z2Whether the op is eligible for memory reservation.throttling_disabledhas_execution_finishedr   s     r.   is_op_eligiblezResourceManager.is_op_eligible\  s4     &&((( 0 --///		
r0   c                 *      fd j         D             S )Nc                 >    g | ]}                     |          |S  )r   )r,   r-   rK   s     r.   
<listcomp>z4ResourceManager.get_eligible_ops.<locals>.<listcomp>f  s,    GGGrt/B/B2/F/FGGGGr0   )r1   r   s   `r.   get_eligible_opsz ResourceManager.get_eligible_opse  s    GGGGT^GGGGr0   c              #      K   |j         D ]6}|                     |          s|V  |                     |          E d{V  7dS )zGet the downstream ineligible operators of the given operator.

        E.g.,
          - "cur_map->downstream_map" will return an empty list.
          - "cur_map->limit1->limit2->downstream_map" will return [limit1, limit2].
        N)rf   r   get_downstream_ineligible_opsrK   r-   rk   s      r.   r   z-ResourceManager.get_downstream_ineligible_opsh  so       - 	G 	GG&&w// G==gFFFFFFFFF	G 	Gr0   c              #      K   |j         D ]7}|                     |          r|V  |                     |          E d{V  8dS a   Get the downstream eligible operators of the given operator, ignoring
        intermediate ineligible operators.

        E.g.,
          - "cur_map->downstream_map" will return [downstream_map].
          - "cur_map->limit1->limit2->downstream_map" will return [downstream_map].
        N)rf   r   get_downstream_eligible_opsr   s      r.   r   z+ResourceManager.get_downstream_eligible_opsv  ss       - 	E 	EG""7++ E;;GDDDDDDDDDD		E 	Er0   c                       j         |         }|t           fd                     |          D                       z  }|S )zsGet the outputs memory usage of the given operator, including the downstream
        ineligible operators.
        c              3   L   K   | ]}                     |          j        V  d S r*   )r   rQ   )r,   rk   rK   s     r.   r/   zTResourceManager.get_op_outputs_object_store_usage_with_downstream.<locals>.<genexpr>  sH        
  
 g&&: 
  
  
  
  
  
r0   )r?   sumr   )rK   r-   op_outputs_usages   `  r.   1get_op_outputs_object_store_usage_with_downstreamzAResourceManager.get_op_outputs_object_store_usage_with_downstream  sf      /3C  
  
  
  
==bAA 
  
  
 
 
 	
  r0   c                 >    t          d |j        D                       S )z>Check if the operator has a downstream materializing operator.c              3   @   K   | ]}t          |t                    V  d S r*   ra   MATERIALIZING_OPERATORSr,   rk   s     r.   r/   zBResourceManager.has_materializing_downstream_op.<locals>.<genexpr>  sA       
 
 w 788
 
 
 
 
 
r0   )anyrf   r   s     r.   has_materializing_downstream_opz/ResourceManager.has_materializing_downstream_op  s3     
 
1
 
 
 
 
 	
r0   N)r_   r   )*__name__
__module____qualname____doc__r   r   rG   rH   r   r   r   r   rM   rJ   r=   rl   r   r   r   r   r   r   r   r   r   boolstrr   rF   propertyr   r   r   r   r   r
   r   r	   r   r   r   r   r   r0   r.   r    r    /   sR       JJ '(#
 2;5s2 2. AE=1919 "19 &b*<&<=	19
 "19 19 19 19f  :0$0-60	0 0 0 068 8 8t""4 " " " "**< * * * ***< * * * *##5 # # # #2#/ #4F # # # #)&6 )3 ) ) ) )(%5 (# ( ( ( (+#3 + +# + + + +Z7t 7 7 7 7 + + + X+

0@ 
S 
 
 
 
:- :(;M2N : : : :
!1 
d 
 
 
 
H$'7"8 H H H HG"G	"	#G G G GE"E	"	#E E E E  " 	       
2B 
t 
 
 
 
 
 
r0   r    r$   r   r_   c                 ~    t          |           D ],\  }}t          |          r|                                s|c S -dS )N)	enumerate_is_shuffle_op	completed)r$   idxr-   s      r.   _get_first_pending_shuffle_opr     sK    X&&  R" 	bllnn 	JJJ2r0   r-   c                 :    t          | t          t          f          S r*   )ra   r   r   r-   s    r.   r   r     s    b+-FGHHHr0   c            
          e Zd ZdZ G d d          ZddZedefd            Zed	e	d
e
fd            Zed	e	dee	ef         dee	ef         d
ee         fd            Zed	e	d
ee         fd            Zed	e	d
ee         fd            Zed	e	d
ee         fd            Zd
ee	         fdZed	e	d
e
fd            Zd	e	d
ee	         fdZd	e	d
e
fdZdS )r   a  An interface for dynamic operator resource allocation.

    This interface allows dynamically allocating available resources to each operator,
    limiting how many tasks each operator can submit, and how much data each operator
    can read from its running tasks.
    c                   P    e Zd ZdZdZdZdZd ZdefdZ	e
dedefd	            Zd
S ) OpResourceAllocator.IdleDetectora  Utility class for detecting idle operators.

        Note, stalling can happen when there are less resources than Data executor
        expects. E.g., when some resources are preempted by non-Data code, see
        `test_no_deadlock_on_resource_contention` as an example.

        This class is used to detect potential stalling and allow the execution
        to make progress.
        g      $@g      N@Fc                     t          t                    | _        t          d           | _        t          d           | _        d S )Nc                  (    t          j                     S r*   r   r   r0   r.   <lambda>z;OpResourceAllocator.IdleDetector.__init__.<locals>.<lambda>  s    	 r0   c                  (    t          j                     S r*   r   r   r0   r.   r   z;OpResourceAllocator.IdleDetector.__init__.<locals>.<lambda>  s    49;; r0   )r   r=   last_num_outputslast_output_timelast_detection_timer   s    r.   rM   z)OpResourceAllocator.IdleDetector.__init__  s@    $/$4$4D!$/0C0C$D$DD!'23F3F'G'GD$$$r0   r-   c                 6   t          j                     }|| j        |         z
  | j        k    rl|j        j        }|| j        |         k    r|| j        |<   || j        |<   || j        |<   n0|| j        |<   |                     ||| j        |         z
             dS dS NTF)r   r   DETECTION_INTERVAL_Srb   num_task_outputs_generatedr   r   "print_warning_if_idle_for_too_long)rK   r-   cur_timecur_num_outputss       r.   detect_idlez,OpResourceAllocator.IdleDetector.detect_idle  s    y{{H$22669RRR"$*"G"T%:2%>>>0?D)"-08D)"-3;D,R003;D,R0;;Ht'<R'@@    45r0   	idle_timec                     || j         k     s| j        rdS d| _        d| d| d}t                              |           dS )z4Print a warning if an operator is idle for too long.NTz	Operator z# is running but has no outputs for a   seconds. Execution may be slower than expected.
Ignore this warning if your UDF is expected to be slow. Otherwise, this can happen when there are fewer cluster resources available to Ray Data than expected. If you have non-Data tasks or actors running in the cluster, exclude their resources from Ray Data with `DataContext.get_current().execution_options.exclude_resources`. This message will only print once.)WARN_ON_IDLE_TIME_S_warn_printedrZ   r[   )clsr-   r   msgs       r.   r   zCOpResourceAllocator.IdleDetector.print_warning_if_idle_for_too_long  sh    
 3222c6G2 $C6B 6 69 6 6 6  NN3r0   N)r   r   r   r   r   r   r   rM   r   r   classmethodfloatr   r   r0   r.   IdleDetectorr     s        	 	  $"	H 	H 	H	"2 	 	 	 	" 
	 %	 27	  	  	  
	  	  	 r0   r   r$   r   c                 F    || _         |                                 | _        d S r*   )r1   r   _idle_detector)rK   r$   s     r.   rM   zOpResourceAllocator.__init__  s#    !"//11r0   ro   c                    dS )z#Callback to update resource usages.Nr   )rK   ro   s     r.   r|   z"OpResourceAllocator.update_budgets  	     	r0   r-   r_   c                     dS )z8Return whether the given operator can submit a new task.Nr   r   s     r.   can_submit_new_taskz'OpResourceAllocator.can_submit_new_task  s	     	r0   r   r   c                    dS )zqReturn the maximum bytes of pending task outputs can be read for
        the given operator. None means no limit.Nr   )rK   r-   r   r   s       r.   r   z1OpResourceAllocator.max_task_output_bytes_to_read  s	     	r0   c                     dS )zReturns the budget for the given operator or `None` if the operator
        has unlimited budget. Operator's budget is defined as:

            Budget = Allocation - Usage
        Nr   r   s     r.   r   zOpResourceAllocator.get_budget  r  r0   c                     dS )zqReturns the budget for operator's outputs (in object store bytes) or
        `None` if there's no limit.
        Nr   r   s     r.   get_output_budgetz%OpResourceAllocator.get_output_budget  s	    
 	r0   c                     dS )zbReturns allocation for the given operator or `None` if operator's
        allocation is unlimited.Nr   r   s     r.   r   z"OpResourceAllocator.get_allocation   s	     	r0   c                 p     t           j                   fdt           j                  D             S )Nc                 \    g | ](\  }}                     |          rd k    s|k    &|)S )r   )_is_op_eligible)r,   r   r-   first_pending_shuffle_op_idxrK   s      r.   r   z9OpResourceAllocator._get_eligible_ops.<locals>.<listcomp>(  sX     
 
 
R##B''

 -22666 
 766r0   )r   r1   r   )rK   r  s   `@r.   _get_eligible_opsz%OpResourceAllocator._get_eligible_ops&  sN    'DT^'T'T$
 
 
 
 
$T^44
 
 
 	
r0   c                 V    |                                   o|                                  S r   r   r   s    r.   r  z#OpResourceAllocator._is_op_eligible2  s4     &&((( 0 --///		
r0   c              #      K   |j         D ]7}|                     |          r|V  |                     |          E d{V  8dS r   )rf   r  _get_downstream_eligible_opsr   s      r.   r  z0OpResourceAllocator._get_downstream_eligible_ops<  ss       - 	F 	FG##G,, F<<WEEEEEEEEEE		F 	Fr0   c                     |j         sdS |                     |          D ]7}|                     |          s dS | j                            |          r dS 8dS r   )rf   r  r  r   r   )rK   r-   downstream_ops      r.   -_should_unblock_streaming_output_backpressurezAOpResourceAllocator._should_unblock_streaming_output_backpressureL  s    
 % 	4 ">>rBB 	 	M++M::  tt "..}== tt ur0   N)r$   r   )r   r   r   r   r   rM   r   r   r|   r   r   r  r   r=   r   r   r   r  r   r
   r  staticmethodr  r	   r  r  r   r0   r.   r   r     sV        ?  ?  ?  ?  ?  ?  ?  ? B2 2 2 2  #   ^ &6 4    ^ 		 ""24F"FG		
 $((8#(=#>	 
#	 	 	 ^	 - (;M2N    ^ $4 #    ^ !1 h?Q6R    ^


4(8#9 

 

 

 

 
, 
 
 
 
 \
F"F	"	#F F F F "	     r0   r   c            	           e Zd ZdZdedef fdZdee         fdZ	de
fdZd	edefd
Zd	edee
         fdZd	edee         fdZd	edee
         fdZd	edeee
f         deeef         dee         fdZde
fdZ xZS )rC   a  An OpResourceAllocator implementation that reserves resources for each operator.

    This class reserves memory and CPU resources for eligible operators, and considers
    runtime resource usages to limit the resources that each operator can use.

    It works in the following way:
    1. An operator is eligible for resource reservation, if it has enabled throttling
       and hasn't completed. Ineligible operators are not throttled, but
       their usage will be accounted for their upstream eligible operators. E.g., for
       such a dataset "map1->limit->map2->streaming_split", we'll treat "map1->limit" as
       a group and "map2->streaming_split" as another group.
    2. For each eligible operator, we reserve `reservation_ratio * global_resources /
        num_eligible_ops` resources, half of which is reserved only for the operator
        outputs, excluding pending task outputs.
    3. Non-reserved resources are shared among all operators.
    4. In each scheduling iteration, each eligible operator will get "remaining of their
       own reserved resources" + "remaining of shared resources / num_eligible_ops"
       resources.

    The `reservation_ratio` is set to 50% by default. Users can tune this value to
    adjust how aggressive or conservative the resource allocation is. A higher value
    will make the resource allocation more even, but may lead to underutilization and
    worse performance. And vice versa.
    resource_managerreservation_ratioc                 D   t                                          |j                   || _        || _        d| j        cxk    rdk    sn J i | _        i | _        t          j                    | _	        i | _
        i | _        i | _        |                                 | _        d S )Ng              ?)superrM   r1   _resource_manager_reservation_ratio_op_reserved_reserved_for_op_outputsr   r4   _total_shared_op_budgetsr   _reserved_min_resourcesr   r   )rK   r  r  	__class__s      r.   rM   z'ReservationOpResourceAllocator.__init__  s    )3444!1"3d-4444444444HJ HJ%/466GI>@ FH$"//11r0   r_   c                    g }g }| j         D ]I}|                                s3|j        D ]+}|                                r|                    |           ,J|D ]Q}|                    t          | j                            |                               |                    |           Rt          t          |                    S )a  
        Resource reservation is based on the number of eligible operators.
        However, there might be completed operators that still have blocks in their output queue, which we need to exclude them from the reservation.
        And we also need to exclude the downstream ineligible operators.

        E.g., for the following pipeline:
        ```
        map1 (completed, but still has blocks in its output queue) -> limit1 (ineligible, not completed) -> map2 (not completed) -> limit2 -> map3
        ```

        The reservation is based on the number of eligible operators (map2 and map3), but we need to exclude map1 and limit1 from the reservation.
        )	r1   r   input_dependenciesappendextendlistr  r   set)rK   last_completed_opsops_to_exclude_from_reservationr-   deps        r.   _get_ineligible_ops_with_usagez=ReservationOpResourceAllocator._get_ineligible_ops_with_usage  s      *,'. 	7 	7B,,.. 70 7 7C1133 7*11#666 % 	7 	7B+22T+II"MMNN   ,2226666C788999r0   ro   c           	         | j                                         }| j                                         | j                                         | j                                         t          |          dk    rd S |                                }|                    | j	        t          |          z            }t          |          D ]\  }}t          ddt          |j        dz  d                    }|                    |          }|                                \  }	}
|	|                    |	          }|
|                    |
          }|                    |                              |d          rd| j        |<   ncd| j        |<   t          dd|	j                  }|dk    r=t'          dt)          |                      rt*                              d| d	           || j        |<   |j        | j        |<   |                    |          }|                    |          }|                    t          j                              }|| _        d S )
Nr      r!   T)ignore_object_store_memoryFlow_resource_warning_z6Cluster resources are not enough to run any task from z8. The job may hang forever unless the cluster scales up.)r  r   r  rp   r  r!  lenrw   scaler  r   r   maxrQ   r   min_max_resource_requirementsr   rx   satisfies_limitr   idrZ   r[   r4   r  )rK   ro   eligible_ops	remainingdefault_reservedindexr-   reserved_for_outputsreserved_for_tasksmin_resource_usagemax_resource_usageop_total_reserveds               r.   _update_reservationz2ReservationOpResourceAllocator._update_reservation  s   ->>@@!!!%++---$**,,,|!!FKKMM	 "<<(?3|CTCT(UVV"<00 4	A 4	AIE2 $61c*>BAFF$ $  "2!:!:;O!P!P575U5U5W5W2 2!-%7%;%;<N%O%O"!-%7%;%;<N%O%O" "%%&:;;KKd L    48,R00 49,R0%7q,@& &"
 A::(+M2d88+M+M"N"N: NNRQS R R R  
 %7Db!0D0XD)"- 2 6 67K L L!**+<==I!&8&=&?&?@@II&r0   r-   c                     |                      |          }|dS |                                                    |          S )zHReturn whether the given operator can submit a new task based on budget.NT)r   incremental_resource_usager5  )rK   r-   r   s      r.   r  z2ReservationOpResourceAllocator.can_submit_new_task  s=    $$>4,,..>>vFFFr0   c                 6    | j                             |          S r*   )r   rY   r   s     r.   r   z)ReservationOpResourceAllocator.get_budget  s    ##B'''r0   c                 6    | j                             |          S r*   )r   rY   r   s     r.   r  z0ReservationOpResourceAllocator.get_output_budget  s    #''+++r0   c                 (    t          j                    S r*   )r   r4   r   s     r.   r   z-ReservationOpResourceAllocator.get_allocation  s    !&(((r0   r   r   c                r   || j         vrd S | j         |         j        }| j                            |          }|t	          | j        |         |z
  d          z  }t          j        |          r|| j        |<   d S t          |          }|dk    sJ |dk    r| 
                    |          rd}|| j        |<   |S )Nr   r!   )r   rQ   r  r   r3  r  mathisinfr   r=   r  )rK   r-   r   r   resr   s         r.   r   z<ReservationOpResourceAllocator.max_task_output_bytes_to_read  s     T%%%4r"6 "TTUWXX 	 	s4047GGKKK:c?? 	'*D $4#hhaxxxx!88JJ2NN8C#&R 
r0   c                   |                                  }|D ]W}| j                            |          }|                    |          }|                    t          j                              }X|                     |          }| j        	                                 | j        
                                }t          |          dk    rd S | j        }|D ])}d}|| j                            |          z  }| j                            |          }	|t	          |	| j        |         z
  d          z  }| j                            |                              |          }
| j        |         }|                    |
                              t          j                              }|| j        |<   |
                    |                              t          j                              }|                    |          }+|                    t          j                              }t%          t'          |                    D ]\  }}|                    dt          |          |z
  z            }|                                                    | j        |                             |                                        t          j                              }|                                s=|                    |                              |          r|                    |          }|                    |          }|                                sJ ||||f            |                                d         j        dk    r6t	          |j        | j                            |          j        z
  d          }nd}| j        |                             |                              |          | j        |<   |D ]V}t9          d |j        D                       r6| j        |                             t=          d                    | j        |<   Wd S )Nr   rn   r  r!   )r   c              3   @   K   | ]}t          |t                    V  d S r*   r   r   s     r.   r/   z@ReservationOpResourceAllocator.update_budgets.<locals>.<genexpr>  sA         7$;<<     r0   inf)r,  r  r   r   r3  r   r4   r@  r   rp   r   r1  r  r   r   r  rw   r  r   rq   r2  rB  rx   is_zeror5  is_non_negativer4  r   r   rf   r   )rK   ro   op_to_exclude_from_reservationcompleted_opcompleted_op_usageremaining_sharedr7  r-   op_mem_usager   r}   op_reservedop_reserved_remainingop_reserved_exceededi	op_shared	to_borrowtarget_num_gpus                     r.   r|   z-ReservationOpResourceAllocator.update_budgets3  sz   
 *.)L)L)N)N&: 	; 	;L!%!7!D!D\!R!R__%788FZZ 2 7 9 9::FF  33F;;   ->>@@|!!F  - 	O 	OBL D2FFrJJJL  $5gg    C 043PQS3T TVWXXXL-::2>>CC$0 D  H +B/K$/$8$8$B$B$F$F"'))% %! $9DR  $,#4#4[#A#A#E#E"'))$ $   0889MNN+//0B0G0I0IJJ x5566 )	 )	EAr(..sc,6G6G!6K/LMMI --//$*2.229==>>',..// 
 $$&& 59==+C+C+S+S , , 5 &MM)44	/88CC#3355   	8  5 //11!481<< "%J!7!D!DR!H!H!LL" "
 "#  $((3388^8LL R    	 	B  !5      (,'7';'@'@(-e (A ( ( $	 	r0   )r   r   r   r   r    r   rM   r
   r   r,  r   r@  r   r  r   r   r=   r  r   r   r   r|   __classcell__)r"  s   @r.   rC   rC   h  s        22 2U 2 2 2 2 2 2>:5E0F : : : :>E'*< E' E' E' E'NG&6 G4 G G G G(- ((;M2N ( ( ( (,$4 ,# , , , ,)!1 )h?Q6R ) ) ) ) ""24F"FG	
 $((8#(=#> 
#   6n #n n n n n n n nr0   rC   )8loggingrG  r   abcr   r   collectionsr   typingr   r   r   r	   r
   r   ray._private.ray_constantsr   r   9ray.data._internal.execution.interfaces.execution_optionsr   r   9ray.data._internal.execution.interfaces.physical_operatorr   r   =ray.data._internal.execution.operators.base_physical_operatorr   3ray.data._internal.execution.operators.hash_shuffler   8ray.data._internal.execution.operators.input_data_bufferr   3ray.data._internal.execution.operators.zip_operatorr   !ray.data._internal.execution.utilr   ray.data._internal.utilr   rU   r   rV   r   5ray.data._internal.execution.streaming_executor_stater   r   	getLoggerr   rZ   r   r   __annotations__r   r    r=   r   r   r   rC   r   r0   r.   <module>rl     s      # # # # # # # # # # # # # # J J J J J J J J J J J J J J J J : : : : : : : :                         U T T T T T K K K K K K ; ; ; ; ; ; ' ' ' ' ' ' ( ( ( ( ( ( # # # # # # XWWWWWWWW 
	8	$	$ EMH%tE E 18D>    ,[9 k
 k
 k
 k
 k
 k
 k
 k
\J 3    I' ID I I I I| | | | |# | | |~y y y y y%8 y y y y yr0   