
    &`i                       d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZmZmZmZmZmZ d d	lmZ d d
lm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4 d dl5m6Z6m7Z8m9Z:m;Z;m<Z<m=Z=  e j>        e?          Z@ G d d          ZAdS )    N)defaultdict)DictListOptionalSetTuple)binary_to_hex)InstanceUtil)AutoscalingConfigInstanceReconcileConfigProvider)InstanceManager)CloudInstanceCloudInstanceIdCloudInstanceProviderErrorICloudInstanceProviderLaunchNodeErrorTerminateNodeError)CloudResourceMonitor)RayStopError)RayInstallError)AutoscalerMetricsReporter)IResourceSchedulerSchedulingRequest)AutoscalerInstanceNodeType)is_head_node)AutoscalingStateClusterResourceStateFailedInstanceRequest	NodeState
NodeStatusPendingInstancePendingInstanceRequest)GetInstanceManagerStateRequestInstanceInstanceUpdateEventNodeKind
StatusCode!UpdateInstanceManagerStateRequestc                      e Zd ZdZe	 	 	 	 	 dFdedededede	de
eef         d	ed
eee                  deee                  deee                  dee         deej                 defd            Zededee         de
eef         d
ee         dee         dee         d	efd            Ze	 dGdededededede	de
eef         d	edeej                 fd            Zedede
eef         d
ee         fd            Zedede
eee         f         de
eef         dee         fd            Z ededee         dee         fd            Z!ededee         fd            Z"edede
eef         fd            Z#eded
ee         fd            Z$edede%ee         e&f         fd            Z'edede&d e
eef         ddfd!            Z(ededee         d	efd"            Z)ed#e*d$ej+        dd%fd&            Z,eded	efd'            Z-ed(ee         d)ee         d*ee         d+e.d,e&de
e/ee         f         fd-            Z0eded.e1dej        fd/            Z2ed0ee         d1ej+        d2e&d3ej        fd4            Z3edede4fd5            Z5edededed6e	ded	eddfd7            Z6edefd8            Z7edede
eef         ddfd9            Z8edededdfd:            Z9ed;ed<e&d=e&dee         fd>            Z:ed;ed<e&d?ej+        d@e
dee         f
dA            Z;edede
eef         dee         fdB            Z<edede
eef         fdC            Z=ededee         fdD            Z>e	 dGded	edee         fdE            Z?dS )H
Reconcilerzl
    A singleton class that reconciles the instance states of the instance manager
    for autoscaler.

    Ninstance_manager	schedulercloud_providercloud_resource_monitorray_cluster_resource_statenon_terminated_cloud_instancesautoscaling_configcloud_provider_errorsray_install_errorsray_stop_errorsmetrics_reporter_loggerreturnc                    |pg }|pg }|	pg }	t                      }|j        |_        t                              | |j        ||||	|           t                              || |||||||	  	         t                              | ||
           |S )a.  
        The reconcile method computes InstanceUpdateEvents for the instance manager
        by:

        1. Reconciling the instance manager's instances with external states like
        the cloud provider's, the ray cluster's states, the ray installer's results.
        It performs "passive" status transitions for the instances (where the status
        transition should only be reflecting the external states of the cloud provider
        and the ray cluster, and should not be actively changing them)

        2. Stepping the instances to the active states by computing instance status
        transitions that are needed and updating the instance manager's state.
        These transitions should be "active" where the transitions have side effects
        (through InstanceStatusSubscriber) to the cloud provider and the ray cluster.

        Args:
            instance_manager: The instance manager to reconcile.
            cloud_resource_monitor: The cloud resource monitor for monitoring resource
                availability of all node types.
            ray_cluster_resource_state: The ray cluster's resource state.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
            cloud_provider_errors: The errors from the cloud provider.
            ray_install_errors: The errors from RayInstaller.
            ray_stop_errors: The errors from RayStopper.
            metrics_reporter: The metric reporter to report the autoscaler metrics.
            _logger: The logger (for testing).

        r-   	ray_nodesr2   r4   r5   r6   r3   	autoscaling_stater-   r.   r/   r0   r1   r2   r3   r8   )r-   r3   r7   )r   cluster_resource_state_version(last_seen_cluster_resource_state_versionr,   
_sync_fromnode_states
_step_next_report_metrics)r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r>   s                /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/autoscaler/v2/instance_manager/reconciler.py	reconcilezReconciler.reconcile?   s    X !6 ;/52)/R,..&E 	B 	-0<+I"71+1 	 	
 	
 	
 	/-)#9'A+I1 	 
	
 
	
 
	
 	""-1- 	# 	
 	
 	
 !     r<   c                    t                               | ||           t                               | |           t                               | |           t                               | ||           t                               | ||           t                               | |           t                               | ||           dS )a
  
        Reconcile the instance states of the instance manager from external states like
        the cloud provider's, the ray cluster's states, the ray installer's results,
        etc.

        For each instance, we try to figure out if we need to transition the instance
        status to a new status, and if so, what the new status should be.

        These transitions should be purely "passive", meaning they should only be
        reflecting the external states of the cloud provider and the ray cluster,
        and should not be actively changing the states of the cloud provider or the ray
        cluster.

        More specifically, we will reconcile status transitions for:
            1.  QUEUED/REQUESTED -> ALLOCATED:
                When an instance with launch request id (indicating a previous launch
                request was made) could be assigned to an unassigned cloud instance
                of the same instance type.
            2.  REQUESTED -> ALLOCATION_FAILED:
                When there's an error from the cloud provider for launch failure so
                that the instance becomes ALLOCATION_FAILED.
            3. ALLOCATED -> ALLOCATION_TIMEOUT:
                When an instance has been allocated to a cloud instance, but is stuck in
                this state. For example, a kubernetes pod remains pending due to
                insufficient resources.
            4.  * -> RAY_RUNNING:
                When a ray node on a cloud instance joins the ray cluster, we will
                transition the instance to RAY_RUNNING.
            5.  * -> TERMINATED:
                When the cloud instance is already terminated, we will transition the
                instance to TERMINATED.
            6.  TERMINATING -> TERMINATION_FAILED:
                When there's an error from the cloud provider for termination failure.
            7.  * -> RAY_STOPPED:
                When ray was stopped on the cloud instance, we will transition the
                instance to RAY_STOPPED.
            8.  * -> RAY_INSTALL_FAILED:
                When there's an error from RayInstaller.
            9. RAY_STOP_REQUESTED -> RAY_RUNNING:
                When requested to stop ray, but failed to stop/drain the ray node
                (e.g. idle termination drain rejected by the node).

        Args:
            instance_manager: The instance manager to reconcile.
            ray_nodes: The ray cluster's states of ray nodes.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
            cloud_provider_errors: The errors from the cloud provider.
            ray_install_errors: The errors from RayInstaller.
            ray_stop_errors: The errors from RayStopper.

        N)r,   !_handle_cloud_instance_allocation!_handle_cloud_instance_terminated)_handle_cloud_instance_termination_errors_handle_extra_cloud_instances_handle_ray_status_transition_handle_ray_install_failed_handle_ray_stop_failedr;   s          rE   rA   zReconciler._sync_from   s    @ 	44*!	
 	
 	

 	44<	
 	
 	
 	<<3	
 	
 	
 	00<i	
 	
 	
 	00i);	
 	
 	
 	--.>@RSSS**+;_iXXXXXrG   r>   c	                    t                               ||                                |pt                     t                               | |||||           t                               ||           t                               |           |                                st                               ||           t           	                    ||            dS )ax  
        Step the reconciler to the next state by computing instance status transitions
        that are needed and updating the instance manager's state.

        Specifically, we will:
            1. Shut down leak cloud instances
                Leaked cloud instances that are not managed by the instance manager.
            2. Terminating instances with ray stopped or ray install failure.
            3. Scale down the cluster:
              (* -> RAY_STOP_REQUESTED/TERMINATING)
                b. Extra cloud due to max nodes config.
                c. Cloud instances with outdated configs.
            4. Scale up the cluster:
              (new QUEUED)
                Create new instances based on the IResourceScheduler's decision for
                scaling up.
            5. Request cloud provider to launch new instances.
              (QUEUED -> REQUESTED)
            6. Install ray
              (ALLOCATED -> RAY_INSTALLING)
                When ray could be installed and launched.
            7. Handle any stuck instances with timeouts.

        Args:
            instance_manager: The instance manager to reconcile.
            scheduler: The resource scheduler to make scaling decisions.
            cloud_resource_monitor: The cloud resource monitor for monitoring resource
                availability of all node types.
            ray_cluster_resource_state: The ray cluster's resource state.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
            autoscaling_config: The autoscaling config.
            _logger: The logger (for testing).

        )r-   reconcile_configr8   )r>   r-   r0   	ray_stater.   r3   )r-   r3   )r-   )r-   r2   )r-   r>   N)
r,   _handle_stuck_instancesget_instance_reconcile_configlogger_scale_cluster_handle_instances_launch_terminate_instancesdisable_node_updaters_install_ray_fill_autoscaling_stater=   s	            rE   rC   zReconciler._step_next   s   ` 	**-/MMOO%v 	+ 	
 	
 	
 	!!/-#901 	" 	
 	
 	
 	++-BT 	, 	
 	
 	
 	''9I'JJJ!7799 	##!1/M $   
 	**-AR 	+ 	
 	
 	
 	
 	
rG   c                 `   t                               |           \  }}i }g }|D ]>}|j        t          j        k    r|j        s
J d            |                    |           ?d |D             }d |D             }	t          t                    }
|	                                D ])\  }}||vr |
|j
                                     |           *|                    d            |D ]+}t                               ||
|	          }|s!|||j        <   ,t                               | ||           d S )Nz?Instance in REQUESTED status should have launch_request_id set.c                 h    h | ]/}|j         r&|j        t          j        t          j        fv(|j         0S  cloud_instance_idstatus
IMInstance
TERMINATEDALLOCATION_FAILED.0instances     rE   	<setcomp>z?Reconciler._handle_cloud_instance_allocation.<locals>.<setcomp>R  sZ     =
 =
 =
)=
 ):+GHI I &I I IrG   c                 H    i | ]}t          |t                    |j        | S r^   )
isinstancer   
request_idrf   errors     rE   
<dictcomp>z@Reconciler._handle_cloud_instance_allocation.<locals>.<dictcomp>Y  s=     5
 5
 5
%115
e5
 5
 5
rG   c                 @    t          j        | t          j                  S N)r
   get_status_transition_times_nsrb   	REQUESTED)rg   s    rE   <lambda>z>Reconciler._handle_cloud_instance_allocation.<locals>.<lambda>j  s    !L*." " rG   key)r,   _get_im_instancesra   rb   rr   launch_request_idappendr   listitems	node_typesort_try_resolve_pending_allocationinstance_id_update_instance_manager)r-   r2   r4   im_instancesversionupdatesinstances_with_launch_requestsrg   assigned_cloud_instance_idslaunch_errors"unassigned_cloud_instances_by_typer`   cloud_instanceupdate_events                 rE   rI   z,Reconciler._handle_cloud_instance_allocation=  s    !+ < <=M N Ng <>&$ 	< 	<H*"666 *Q QPQ Q**11(;;;;=
 =
(=
 =
 =
#5
 5
.5
 5
 5
  	+ 2P1U1U1W1W 	 	-~ (CCC2>3KLSS"  
 	'++  	, 	
 	
 	
 7 	9 	9H%EE<m L   ,8GH()) 	++,<gwOOOOOrG   im_instancer   r   c           	         d}t          |                    | j        g                     r|| j                                                 }|r;t	          | j        t          j        |j        |j	        |j
        d|j                   S |                    | j                  }|rA|j
        | j        k    r1t	          | j        t          j        dt          |                     S dS )ae  
        Allocate, or fail the cloud instance allocation for the instance.

        Args:
            im_instance: The instance to allocate or fail.
            unassigned_cloud_instances_by_type: The unassigned cloud instances by type.
            launch_errors: The launch errors from the cloud provider.

        Returns:
            Instance update to ALLOCATED: if there's a matching unassigned cloud
                instance with the same type.
            Instance update to ALLOCATION_FAILED: if the instance allocation failed
                with errors.
            None: if there's no update.

        Nz$allocated unassigned cloud instance )r~   new_instance_statusr`   	node_kindinstance_typedetailszlaunch failed with r~   r   r   )lengetr   popIMInstanceUpdateEventr~   rb   	ALLOCATEDr`   r   r{   rw   rd   str)r   r   r   unassigned_cloud_instancelaunch_errors        rE   r}   z*Reconciler._try_resolve_pending_allocation}  s   , %)! 155k6OQSTTUU 	(J))cee & % 	('3$.$8";"M3=7AE0BE E
 
 
 
 %(()FGG 	L2k6OOO('3$.$@Ac,.?.?AA    trG   c                    t                               |           \  }}i }d |D             }d |D             }d |D             }|                                D ]\  }	}
|                    |	          }|s|
j        sJ |                    |
j                  }||j        t          j        t          j        fv s
J d            t          |	t          j        d|
j                  ||	<   t                               | ||           dS )aq  
        The instance requested to stop ray, but failed to stop/drain the ray node.
        E.g. connection errors, idle termination drain rejected by the node.

        We will transition the instance back to RAY_RUNNING.

        Args:
            instance_manager: The instance manager to reconcile.
            ray_stop_errors: The errors from RayStopper.

        c                     i | ]
}|j         |S r^   im_instance_idrl   s     rE   rn   z6Reconciler._handle_ray_stop_failed.<locals>.<dictcomp>  s*     *
 *
 *
,1E %*
 *
 *
rG   c                 8    i | ]}t          |j                  |S r^   r	   node_id)rf   ns     rE   rn   z6Reconciler._handle_ray_stop_failed.<locals>.<dictcomp>  s$    #S#S#SAM!)$<$<a#S#S#SrG   c                 H    i | ]}|j         t          j        k    |j        | S r^   )ra   rb   RAY_STOP_REQUESTEDr~   re   s     rE   rn   z6Reconciler._handle_ray_stop_failed.<locals>.<dictcomp>  s7     (
 (
 (
*"???  (???rG   NzOThere should be a running ray node for instance with ray stop requested failed.zfailed to stop/drain ray)r~   r   r   ray_node_id)r,   rv   rz   r   r   ra   r"   RUNNINGIDLEr   rb   RAY_RUNNINGr   )r-   r6   r<   	instancesr   r   ray_stop_errors_by_instance_idray_nodes_by_ray_node_idray_stop_requested_instancesr~   rg   
stop_errorray_nodes                rE   rO   z"Reconciler._handle_ray_stop_failed  sl   " (99:JKK	7*
 *
5D*
 *
 *
& $T#S#S#S#S (
 (
%(
 (
 (
$ &B%G%G%I%I 	 	!K7;;KHHJ ####/33H4DEEH'HO"@ - - -$	- -  $9'$.$:2$,	$ $ $GK   	++,<gwOOOOOrG   c                 \   t                               |           \  }}i }d |D             }d |D             }|                                D ]C\  }}|                    |          }	|	r't	          |t
          j        d|	j                   ||<   Dt                               | ||           d S )Nc                 H    i | ]}|j         t          j        k    |j        | S r^   )ra   rb   RAY_INSTALLINGr~   re   s     rE   rn   z9Reconciler._handle_ray_install_failed.<locals>.<dictcomp>  s7     )
 )
 )
*";;;  (;;;rG   c                     i | ]
}|j         |S r^   r   rl   s     rE   rn   z9Reconciler._handle_ray_install_failed.<locals>.<dictcomp>  s    VVV%%.VVVrG   z#failed to install ray with errors: r   )	r,   rv   rz   r   r   rb   RAY_INSTALL_FAILEDr   r   )
r-   r5   r   r   r   instances_with_ray_installinginstall_errorsr~   rg   install_errors
             rE   rN   z%Reconciler._handle_ray_install_failed  s    
 (99:JKK	7)
 )
%)
 )
 )
% WVCUVVV &C%H%H%J%J 		 		!K*..{;;M '< +(2(EUm>SUU	( ( ($ 	++,<gwOOOOOrG   c                 P   i }t                               |           \  }}d |D             }|                                D ]I\  }}||                                v rt	          |j        t          j        d| d          ||j        <   Jt                               | ||           dS )ax  
        For any IM (instance manager) instance with a cloud node id, if the mapped
        cloud instance is no longer running, transition the instance to TERMINATED.

        Args:
            instance_manager: The instance manager to reconcile.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
        c                 V    i | ]&}|j         r|j        t          j        k    |j         |'S r^   )r`   ra   rb   rc   re   s     rE   rn   z@Reconciler._handle_cloud_instance_terminated.<locals>.<dictcomp>   sH     A
 A
 A
)A
 /7oAV.V.V &.V.V.VrG   zcloud instance z no longer foundr   N)	r,   rv   rz   keysr   r~   rb   rc   r   )r-   r2   r   r   r   5non_terminated_instances_with_cloud_instance_assignedr`   rg   s           rE   rJ   z,Reconciler._handle_cloud_instance_terminated  s     '99:JKK	7A
 A
%A
 A
 A
= CHHJJ	 	 
 $B$G$G$I$III -B$0$.$9M*;MMM- - -GH()) 	++,<gwOOOOOrG   c           	         t                               |           \  }}i }d |D             }d |D             }|                                D ]V\  }}|                    |          }	|	st	          |	j        t          j        dt          |                     ||	j        <   Wt           	                    | ||           dS )a  
        If any TERMINATING instances have termination errors, transition the instance to
        TERMINATION_FAILED.

        We will retry the termination for the TERMINATION_FAILED instances in the next
        reconciler step.

        Args:
            instance_manager: The instance manager to reconcile.
            cloud_provider_errors: The errors from the cloud provider.

        c                 H    i | ]}t          |t                    |j        | S r^   )rj   r   r`   rl   s     rE   rn   zHReconciler._handle_cloud_instance_termination_errors.<locals>.<dictcomp>K  s>     
 
 
%!344
#U
 
 
rG   c                 H    i | ]}|j         t          j        k    |j        | S r^   )ra   rb   TERMINATINGr`   re   s     rE   rn   zHReconciler._handle_cloud_instance_termination_errors.<locals>.<dictcomp>Q  s7     6
 6
 6
*"888 &888rG   ztermination failed: r   N)
r,   rv   rz   r   r   r~   rb   TERMINATION_FAILEDr   r   )
r-   r4   r   r   r   termination_errors*terminating_instances_by_cloud_instance_idr`   failurerg   s
             rE   rK   z4Reconciler._handle_cloud_instance_termination_errors7  s    " (99:JKK	7
 
.
 
 
6
 6
%6
 6
 6
2 +=*B*B*D*D 
	 
	&wAEEFWXXH ,A$0$.$A=s7||==- - -GH()) 	++,<gwOOOOOrG   c                     |                      t                                }|j        j        t          j        k    sJ |j        }|j        |j        fS )Nrequest)	get_instance_manager_stater%   ra   coder)   OKstater   r   )r-   replyim_states      rE   rv   zReconciler._get_im_instancese  sY     !;;244 < 
 
 | JM1111;!8#333rG   r   r   c                     |sd S t          |                                          pg }|                     t          ||                    }|j        j        t          j        k    sJ d|             d S )N)expected_versionr   r   z#Failed to update instance manager: )ry   valuesupdate_instance_manager_stater*   ra   r   r)   r   )r-   r   r   r   s       rE   r   z#Reconciler._update_instance_managerp  s      	Fw~~''((.B >>5!(   ? 
 
  L...888 /....rG   c                    t                               |           \  }}i }d |D             }d |D             }|D ]}d}	t          |j                  }
|
|v r	||
         }	nR|j        t
          j        k    r	||
         }	n4|j        r||j                 }	nt          	                    d|
 d           w|	J d|
 d|j         d            t           
                    |j        |	j                  }||	j        k    r>t          |	j        |d|
 d	t          j        |j                   |
|	j        
          ||
<   t                               | ||           dS )a  
        Handle the ray status transition for the instance manager.

        If a new ray node running on the instance, transition it to RAY_RUNNING.
        If a ray node stopped, transition it to RAY_STOPPED.
        If a ray node is draining, transition it to RAY_STOPPING.

        Args:
            instance_manager: The instance manager to reconcile.
            ray_nodes: The ray cluster's states of ray nodes.
        c                 j    i | ]0}|j         r'|j        t          j        t          j        fv(|j         |1S r^   r_   re   s     rE   rn   z<Reconciler._handle_ray_status_transition.<locals>.<dictcomp>  s\     -
 -
 -
)-
 ):+GHI I &I I IrG   c                 ,    i | ]}|j         	|j         |S r^   r   re   s     rE   rn   z<Reconciler._handle_ray_status_transition.<locals>.<dictcomp>  s5     '
 '
 '
+3HDT'
h'
 '
 '
rG   Nz	Ray node z has no instance id. This only happens to a ray node not managed by autoscaler. If not, please file a bug at https://github.com/ray-project/rayz1 has no matching instance with cloud instance id=z. We should not see a ray node with cloud instance id not found in IM since we have reconciled all cloud instances, and ray nodes by now.z	ray node z is )r~   r   r   r   r   )r,   rv   r	   r   providerr   	READ_ONLYr~   rU   warning%_reconciled_im_status_from_ray_statusra   r   r"   Namer   r   )r-   r<   r3   r   r   r   !im_instances_by_cloud_instance_idim_instances_by_ray_node_idr   r   r   reconciled_im_statuss               rE   rM   z(Reconciler._handle_ray_status_transition  s   " (99:JKK	7-
 -
%-
 -
 -
)'
 '
7@'
 '
 '
# " -	 -	HK'(899K9999+F%.(2DDD #DK"PKK) "C ,#KK
 NN=K = = =   **PK P P3;3GP P P +** $.#S#S!3$ $  ${'999'< + 7(<>K > >%?8?;;> > !,"-";	( 	( 	($ 	++,<gwOOOOOrG   
ray_statuscur_im_statuszIMInstance.InstanceStatusc                 ,   d}| t           j        t           j        fv rt          j        }nL| t           j        k    rt          j        }n/| t           j        k    rt          j        }nt          d|            ||k    s|t          j        |          v r|S |S )aM  
        Reconcile the instance status from the ray node status.
        Args:
            ray_status: the current ray node status.
            cur_im_status: the current IM instance status.
        Returns:
            The reconciled IM instance status

        Raises:
            ValueError: If the ray status is unknown.
        NzUnknown ray status: )r"   r   r   rb   r   DEADRAY_STOPPEDDRAININGRAY_STOPPING
ValueErrorr
   get_reachable_statuses)r   r   r   s      rE   r   z0Reconciler._reconciled_im_status_from_ray_status  s      $*,jo>>>#-#9  :?**#-#9  :...#-#:  @J@@AAA 11123GHHI I
 ! ##rG   c                 @   t                               |           \  }}g }g }g }|D ]}|j        t          j        k    r|                    |           -|j        t          j        k    r|                    |           X|j        t          j        k    r|                    |           |sd S t                               ||||	                                |
                                          }i }	t          t          j                              }
|                                D ]\\  }}|D ]T}t          |j                  dk    r|
n|j        }t#          |j        t          j        ||d| d|           |	|j        <   U]t                               | ||	           d S )Nr   zrequested to launch z with request id )r~   r   rw   r   r   )r,   rv   ra   rb   QUEUEDrx   rr   r   _compute_to_launchget_upscaling_speedget_max_concurrent_launchesr   uuiduuid4rz   r   rw   r   r~   r   )r-   r3   r   r   queued_instancesrequested_instancesrunning_instancesrg   	to_launchr   new_launch_request_idr   rw   s                rE   rW   z#Reconciler._handle_instances_launch  s   
 (99:JKK	7 ! 	3 	3H*"333 ''1111J$888#**84444J$:::!((222 	F112244::<<
 
	  #DJLL 1 1(1(9(9 	 	$M9%  
 8566!;; *)!3 "
 1F ( 4(2(<&7"//} / /,/ /	1 	1 	1,--& 	++,<gwOOOOOrG   r   r   r   upscaling_speedmax_concurrent_launchesc                    d }dt           dt          t                   fd} ||           } ||          }t          |          }	t	          t
                    }
|                                D ]\  }}|                    |g           }t          dt          j
        |t          t          |          d          z                      }t          ||	z
  |          }t          d|          }t          t          |          |          }t          ||          d |         }|
|                             |           |	|z  }	|
S )Nc                 x    t          t                    }| D ]"}||j                                     |           #|S rp   )r   ry   r   rx   )r   instances_by_typerg   s      rE   _group_by_typez5Reconciler._compute_to_launch.<locals>._group_by_typeC  sF     +D 1 1% K K!("89@@JJJJ$$rG   rg   r9   c                 ^    t          j        | t          j                  }t	          |          S rp   )r
   rq   rb   r   sorted)rg   queue_timess     rE   _sort_by_earliest_queuedz?Reconciler._compute_to_launch.<locals>._sort_by_earliest_queuedJ  s,    &E*+ K +&&&rG      r   rt   )rb   r   intr   r   ry   rz   r   maxmathceilminr   extend)r   r   r   r   r   r   r   queued_instances_by_typerunning_instances_by_typetotal_num_requested_to_launchall_to_launchr   queued_instances_for_typerunning_instances_for_typenum_desired_to_upscalenum_to_launchr   s                    rE   r   zReconciler._compute_to_launch;  s   	% 	% 	%	'z 	'd3i 	' 	' 	' 	' $2>2B#C#C $2N3D$E$E!(+,?(@(@%;Ft;L;L
 &++--	; 	; 
%)B)F)Fr* *&
 &)	/C4N0O0OQR,S,SSTT& &"  '*GG& M  =11M$= > >NNM8>VWWWI -(//	:::)]:))rG   rQ   c                    t                               |           \  }}t          t                    }|D ]"}||j                                     |           #i }|t          j                 D ]4}t                               ||j	        |j
                  }|r
|||j        <   5|t          j                 D ]W}|j        s
J d            t                               ||j        t          j        |j        |j                  }|r
|||j        <   X|t          j                 D ]:}t                               ||j        t          j                  }|r
|||j        <   ;|t          j                 D ]:}t                               ||j        t          j                  }|r
|||j        <   ;|t          j                 D ]@}t                               ||j        t          j        |j                  }|r
|||j        <   At          j        t          j        t          j        t          j        t          j        fD ]+}	t                               ||	         |	|j         |           ,t           !                    | ||           dS )a  
        Handle stuck instances with timeouts.

        Instances could be stuck in the following status and needs to be updated:
            - REQUESTED: cloud provider is slow/fails to launch instances.
            - ALLOCATED: ray fails to be started on the instance.
            - RAY_INSTALLING: ray fails to be installed on the instance.
            - TERMINATING: cloud provider is slow/fails to terminate instances.

        Instances could be in the following status which could be unbounded or
        transient, and we don't have a timeout mechanism to handle them. We would
        warn if they are stuck for too long:
            - RAY_STOPPING: ray taking time to drain.
            - QUEUED: cloud provider is slow to launch instances, resulting in long
                queue.

            Reconciler should handle below statuses, if not, could be slow
                reconcilation loop or a bug:
            - RAY_INSTALL_FAILED
            - RAY_STOPPED
            - TERMINATION_FAILED


        Args:
            instance_manager: The instance manager to reconcile.
            reconcile_config: The instance reconcile config.
            _logger: The logger to log the warning messages. It's used for testing.

        z5cloud instance id should be set on ALLOCATED instance)
new_statusr`   r   )r  )r  r   )ra   warn_interval_srU   N)"r,   rv   r   ry   ra   rx   rb   rr    _handle_stuck_requested_instancerequest_status_timeout_s!max_num_retry_request_to_allocater~   r   r`   _handle_stuck_instanceallocate_status_timeout_sALLOCATION_TIMEOUTr   r   ray_install_status_timeout_sr   r   terminating_status_timeout_sr   r   #ray_stop_requested_status_timeout_sr   r   r   r   r   _warn_stuck_instances transient_status_warn_interval_sr   )
r-   rQ   r8   r   r   instances_by_statusrg   
im_updatesupdatera   s
             rE   rS   z"Reconciler._handle_stuck_instancesw  s   F (99:JKK	7)$//! 	B 	BH077AAAA
 ,J,@A 	: 	:H@@ 9 B F
  :39
8/0
 ,J,@A 	: 	:H*G GFG G*66 :%8"*"<&4 7  F  :39
8/0
 ,J,EF 	: 	:H66 =%8 7  F
  :39
8/0
 ,J,BC 	: 	:H66 =%8 7  F
  :39
8/0 ,J,IJ 	: 	:H66 D%1$,	 7  F  :39
8/0 # )") 
 	 	F ,,#F+ 0 Q	 -     	++,<gzRRRRRrG   r   ra   r	  rU   c           
         | D ]}t          j        ||          }t          |          dk    sJ t          |          d         }t	          j                    |z
  |dz  k    r|                    d                    |j        t          j
                            |j                  t          j
                            |          t	          j                    |z
  dz                       dS )z\Warn if any instance is stuck in a transient/unbounded status for too
        long.
        select_instance_statusr       eAz.Instance {}({}) is stuck in {} for {} seconds.N)r
   rq   r   r   timetime_nsr   formatr~   rb   InstanceStatusr   ra   )r   ra   r	  rU   rg   status_times_nsstatus_time_nss          rE   r  z Reconciler._warn_stuck_instances  s     " 	 	H*I  O ''1,,,,#O44R8N|~~.31FFFDKK ,"166xGG"166v>>.8S@	   	 	rG   c                     t                               |           \  }}|D ]/}|j        t          j        k    r|j        t          j        k    r dS 0dS )a  
        Check if the head node is running and ready.

        If we scale up the cluster before head node is running,
        it would cause issues when launching the worker nodes.

        There are corner cases when the GCS is up (so the ray cluster resource
        state is retrievable from the GCS), but the head node's raylet is not
        running so the head node is missing from the reported nodes. This happens
        when the head node is still starting up, or the raylet is not running
        due to some issues, and this would yield false.

        Args:
            instance_manager: The instance manager to reconcile.

        Returns:
            True if the head node is running and ready, False otherwise.
        TF)r,   rv   r   r(   HEADra   rb   r   )r-   r   _rg   s       rE   _is_head_node_runningz Reconciler._is_head_node_running  sX    * %667GHHa$ 	  	 H!X]22?j&<<<44urG   rR   c                    t                               |          \  }}d |D             }g }	d |j        D             }
|D ]O}|
                    |j                  }|	                    t          |||j        r|j        nd                     Pt          |	                                |
                                |j        |j        |j        |	|                                |                                |                                	  	        }|                    |          }| j                            |j                   | j                            |j                   | j                            |j                   t                               |          sdS |j        t0          j        k    rdS |j        }|j        }i }|D ]}|j        }|j        t<          j        k    r<||         }tA          |t<          j!        |j        |d|j"                   ||j        <   ZtA          |t<          j#        |d|j"         	          ||j        <   |D ][}tI          |j%                  D ]D}tM          j'                    }tA          |t<          j(        |j)        d
d|j)         d          ||<   E\t           *                    |||           dS )a  
        Scale the cluster based on the resource state and the resource scheduler's
        decision:

        - It launches new instances if needed.
        - It terminates extra ray nodes if they should be shut down (preemption
            or idle termination)

        Args:
            autoscaling_state: The autoscaling state to reconcile.
            instance_manager: The instance manager to reconcile.
            cloud_resource_monitor: The cloud resource monitor for monitoring resource
                availability of all node types.
            ray_state: The ray cluster's resource state.
            scheduler: The resource scheduler to make scaling decisions.
            autoscaling_config: The autoscaling config.

        c                 ,    i | ]}|j         	|j         |S r^   )r~   )rf   is     rE   rn   z-Reconciler._scale_cluster.<locals>.<dictcomp>S  s3     '
 '
 '
!"am'
M1'
 '
 '
rG   c                 8    i | ]}t          |j                  |S r^   r   )rf   nodes     rE   rn   z-Reconciler._scale_cluster.<locals>.<dictcomp>W  s1     
 
 
26M$,''
 
 
rG   N)r   r   r`   )	node_type_configsmax_num_nodesresource_requestsgang_resource_requestscluster_resource_constraintscurrent_instancesidle_timeout_sdisable_launch_config_checkcloud_resource_availabilitieszterminating ray: )r~   r   r`   termination_requestr   zdraining ray: )r~   r   r5  r   Tzqueuing new instance of z from scheduler)r~   r   r   upsertr   )+r,   rv   rB   r   r   rx   r   r`   r   get_node_type_configsget_max_num_nodespending_resource_requestspending_gang_resource_requestsr0  get_idle_timeout_sr3  get_resource_availabilitiesscheduleinfeasible_resource_requestsr   !infeasible_gang_resource_requests'infeasible_cluster_resource_constraintsr&  r   r   r   r   to_terminater~   instance_statusrb   r   r   r   r   r   rangecountr
   random_instance_idr   r   r   )r>   r-   r0   rR   r.   r3   r   r   im_instances_by_instance_idautoscaler_instancesray_nodes_by_idr   r   sched_requestr   r   rA  r   terminate_requestr~   im_instance_to_terminatelaunch_requestr%  s                          rE   rV   zReconciler._scale_cluster4  sw   : !+ < <=M N Ng'
 '
&2'
 '
 '
#  "
 
:C:O
 
 
 ( 	 	K&**;+>??H ''"% + '8"55!  
 
 
 
 *0FFHH,>>@@'A#,#K)2)O2-@@BB">>@@ 'BBDD
 
 
" ""=11 	6==.	
 	
 	
 	;BB3	
 	
 	
 	AHH9	
 	
 	
 //0@AA 	 F&(*<<<F O	)!- 	 	+7K 0J4HHH ,G{+S(9N +(2(>&>&P(9K0A0IKK: : :)566 :O +(2(E(9H->-FHH	: : :)566 ( 	 	N>/00  *=??'< +(2(9"0">)>3O ) ) )	( 	( 	($$ 	++,<gwOOOOOrG   c           
         t                               |           \  }}i }|D ]}|j        t          j        t          j        t          j        t          j        fvr8t          |j	        t          j
        |j        dt          j                            |j                             ||j	        <   t                               | ||           dS )aA  
        Terminate instances with the below statuses:
            - RAY_STOPPED: ray was stopped on the cloud instance.
            - ALLOCATION_TIMEOUT: cloud provider timed out to allocate a running cloud instance.
            - RAY_INSTALL_FAILED: ray installation failed on the cloud instance,
                we will not retry.
            - TERMINATION_FAILED: cloud provider failed to terminate the instance
                or timeout for termination happened, we will retry again.

        Args:
            instance_manager: The instance manager to reconcile.
        zterminating instance from )r~   r   r`   r   N)r,   rv   ra   rb   r   r  r   r   r   r~   r   r`   r   r   r   )r-   r   r   r   rg   s        rE   rX   zReconciler._terminate_instances  s     !+ < <=M N Ng$ 	 	H&---	'    -B$0$.$:"*"<E,11(/BBE E	- - -GH()) 	++,<gwOOOOOrG   c                    t                               |           \  }}i }|D ]}|j        t          j        k    r|j        t          j        k    r.|                    |j	                  }|sJ d|j	         d            |j
        set          |j        t          j        d          ||j        <   t                               | ||           dS )a&  
        Install ray on the allocated instances when it's ready (cloud instance
        should be running)

        This is needed if ray installation needs to be performed by
        the instance manager.

        Args:
            instance_manager: The instance manager to reconcile.
        zCloud instance z0 is not found in non_terminated_cloud_instances.zinstalling rayr   N)r,   rv   ra   rb   r   r   r(   r$  r   r`   
is_runningr   r~   r   r   )r-   r2   r   r   r   rg   r   s          rE   rZ   zReconciler._install_ray  s    !+ < <=M N Ng$ 	 	H*"666!X]22;??* N "  5("< 5 5 5 >
 ",  -B$0$.$=(- - -GH()) 	++,<gwOOOOOrG   c                    t                               |           \  }}||_        t          t                    }|D ]"}||j                                     |           #t          t                    }g }|t          j                 |t          j	                 z   D ]?}|j
        r!||j
                                     |           *|                    |           @|                                D ]\  }}t          t                    }	|D ]}|	|j        xx         dz  cc<   t          j        |d         t          j                  }
|
r|
j        nd}|	                                D ]L\  }}|j                            t%          |t          |          t          |dz                                 M|t          j                 |t          j                 z   D ]Y}t+          |j        d d          }|j                            t1          |j        |j        |d         j                             Z|t          j                 D ]}t          j        |t          j                  }t          j        |t          j                  }|r|j        nd}|r|j        nd}|j                            t;          |j        t          |dz            t          |dz            |j        d	                     d S )
Nr   r   r  )ray_node_type_namerD  
request_tsc                     | j         S rp   )timestamp_ns)xs    rE   rs   z4Reconciler._fill_autoscaling_state.<locals>.<lambda>K  s    q~ rG   T)ru   reverse)r~   rQ  r   )rQ  start_ts	failed_tsreasonrD  )r,   rv   autoscaler_state_versionr   ry   ra   rx   rb   rr   r   rw   rz   r   r   r
   get_last_status_transitionrT  pending_instance_requestsr$   r   r   r   status_historypending_instancesr#   r~   r   rd   failed_instance_requestsr    )r-   r>   r   r   r  rg   instances_by_launch_requestr   r%  num_instances_by_typerequest_updaterequest_time_nsr   rD  r]  request_status_updatefailed_status_updatefailed_timerequest_times                      rE   r[   z"Reconciler._fill_autoscaling_state  s~    (99:JKK	75<2 *$//! 	B 	BH077AAAA '2$&7&7#
 45!*"345	2 	2H ) 2+H,FGNNxXXXX ''11117==?? 	 	LAy$/$4$4!% C C%h&<===B==== *D!j2 N >LRn99QRO(=(C(C(E(E  $u!;BB*+8!%jj#&#'=#>#>       
 45!*";<=	 	H
 $'-E-Et  N /66 ( 4'/'=*1-5      ,J,HI 	 	H$0$K*.% %! $0#J*6$ $  6JP$11q  7LR%22QR  6==%'/'= !455!#s*  07  
 
 
 
	 	rG   rg   	timeout_sr  c                 b   t          j        | |          sdS t          t          j        | t          j                            }t          |          |k    r4t          | j        t          j	        dt          |           d|           S t          | j        t          j
        d| d          S )a  
        Fail the cloud instance allocation if it's stuck in the REQUESTED state.

        Args:
            instance: The instance to handle.
            timeout_s: The timeout in seconds.
            max_num_retry_request_to_allocate: The maximum number of times an instance
                could be requested to allocate.

        Returns:
            Instance update to ALLOCATION_FAILED: if the instance allocation failed
                with errors.
            None: if there's no update.

        Nr  z(failed to allocate cloud instance after z. attempts > max_num_retry_request_to_allocate=r   z$queue again to launch after timeout=s)r
   has_timeoutr   rq   rb   rr   r   r   r~   rd   r   )rg   rh  r  all_request_times_nss       rE   r
  z+Reconciler._handle_stuck_requested_instanceo  s    & ')<< 	4%71E   
  
 #$$'HHH($0$.$@]/00] ]9Z] ]	    % , * 1G9GGG
 
 
 	
rG   r  update_kwargsc                     t          j        | |          sdS t          d| j        |d| dt          j                            | j                   d|S )a  
        Fail the instance if it's stuck in the status for too long.

        Args:
            instance: The instance to handle.
            timeout_s: The timeout in seconds.
            new_status: The new status to transition to.
            update_kwargs: The update kwargs for InstanceUpdateEvent

        Returns:
            Instance update to the new status: if the instance is stuck in the status
                for too long.
            None: if there's no update.

        Nztimeout=zs at status r   r^   )r
   rk  r   r~   rb   r   r   ra   )rg   rh  r  rm  s       rE   r  z!Reconciler._handle_stuck_instance  s    , ')<< 	4$ 
 , *Ay A A(--ho>>A A
 

 
 
 	
rG   c                 r    t                               | |           t                               | |           dS )a  
        For extra cloud instances (i.e. cloud instances that are non terminated as
        returned by cloud provider, but not managed by the instance manager), we
        will create new IM instances with ALLOCATED status.

        Such instances could either be:
            1. Leaked instances that are incorrectly started by the cloud instance
            provider, and they would be terminated eventually if they fail to
            transition to RAY_RUNNING by stuck instances reconciliation, or they
            would join the  ray cluster and be terminated when the cluster scales down.
            2. Instances that are started by the cloud instance provider intentionally
            but not yet discovered by the instance manager. This could happen for
               a. Head node that's started before the autoscaler.
               b. Worker nodes that's started by the cloud provider upon users'
               actions: i.e. KubeRay scaling up the cluster with ray cluster config
               change.
            3. Ray nodes with cloud instance id not in the cloud provider. This could
            happen if there's delay in the Ray's state (i.e. cloud instance already
            terminated, but the ray node is still not dead yet).

        Args:
            instance_manager: The instance manager to reconcile.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
            ray_nodes: The ray cluster's states of ray nodes.
        N)r,   ,_handle_extra_cloud_instances_from_ray_nodes1_handle_extra_cloud_instances_from_cloud_provider)r-   r2   r<   s      rE   rL   z(Reconciler._handle_extra_cloud_instances  sL    @ 	??i	
 	
 	
 	DD<	
 	
 	
 	
 	
rG   c                    i }t                               |           \  }}d |D             }|                                D ]k\  }}||v r
t          t	          j                    |t          j        |j        |j	        d|j
         dt          j        |j                   dd          ||<   lt                               | ||           dS )a  
        For extra cloud instances that are not managed by the instance manager but
        are running in the cloud provider, we will create new IM instances with
        ALLOCATED status.

        Args:
            instance_manager: The instance manager to reconcile.
            non_terminated_cloud_instances: The non-terminated cloud instances from
                the cloud provider.
        c                 h    h | ]/}|j         r&|j        t          j        t          j        fv(|j         0S r^   r_   re   s     rE   rh   zOReconciler._handle_extra_cloud_instances_from_cloud_provider.<locals>.<setcomp>  sZ     ,
 ,
 ,
),
 ):+GHI I &I I IrG   z$allocated unmanaged cloud instance :z (z) from cloud providerT)r~   r`   r   r   r   r   r6  N)r,   rv   rz   r   r
   rE  rb   r   r   r{   r`   r(   r   r   )r-   r2   r   r   r    cloud_instance_ids_managed_by_imr`   r   s           rE   rq  z<Reconciler._handle_extra_cloud_instances_from_cloud_provider  s    '99:JKK	7,
 ,
%,
 ,
 ,
( 2P1U1U1W1W 	 	-~ $DDD)>(;=="3$.$8(2,6W%7W W n&>??W W W * * *G%&& 	++,<gwOOOOOrG   c                    i }t                               |           \  }}d |D             }d |D             }|D ]}|j        s
t          |j                  }||v r#|j        }	|	|v r/t          |          }
t          t          j                    |	t          j
        |
rt          j        nt          j        ||j        d| d          ||<   t                               | ||           dS )a1  
        For extra cloud instances reported by Ray but not managed by the instance
        manager, we will create new IM instances with ALLOCATED status.

        Args:
            instance_manager: The instance manager to reconcile.
            ray_nodes: The ray cluster's states of ray nodes.
        c                 v    h | ]6}|j         r-|j        s&|j        t          j        t          j        fv/|j         7S r^   )r`   r   ra   rb   rc   rd   re   s     rE   rh   zJReconciler._handle_extra_cloud_instances_from_ray_nodes.<locals>.<setcomp>#  sg     ,
 ,
 ,
),
 $	,

 ):+GHI I	 &I I IrG   c                 *    h | ]}|j         	|j         S r^   r   re   s     rE   rh   zJReconciler._handle_extra_cloud_instances_from_ray_nodes.<locals>.<setcomp>+  s3     &
 &
 &
!)(:J&
&
 &
 &
rG   z9allocated unmanaged worker cloud instance from ray node: T)r~   r`   r   r   r   r   r   r6  N)r,   rv   r~   r	   r   r   r   r
   rE  rb   r   r(   r$  WORKERrQ  r   )r-   r<   r   r   r   rt  ray_node_ids_managed_by_imr   r   r`   is_heads              rE   rp  z7Reconciler._handle_extra_cloud_instances_from_ray_nodes  sE    '99:JKK	7,
 ,
%,
 ,
 ,
(&
 &
-6&
 &
 &
" " 	 	H' '(899K888 ( 4 $DDD"8,,G#8(;=="3$.$8+2G(--'&9%"% % $ $ $GK   	++,<gwOOOOOrG   c                     |sd S t                               |           \  }}|                                }|                    ||           |                    ||           d S rp   )r,   rv   r7  report_instancesreport_resources)r-   r3   r7   r   r%  r,  s         rE   rD   zReconciler._report_metricsL  so       	F!334DEE	1.DDFF)))5FGGG)))5FGGGGGrG   )NNNNNrp   )@__name__
__module____qualname____doc__staticmethodr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   loggingLoggerr   rF   r!   rA   rC   rI   rb   r   r   r   r}   rO   rN   rJ   rK   r   r   rv   r   rM   r"   r   r   rW   floatr   r   r   rS   r  boolr&  rV   rX   rZ   r[   r
  r  rL   rq  rp  rD   r^   rG   rE   r,   r,   8   s	          MQ>B8<@D,0P! P!)P!%P! /P! !5	P!
 %9P! )-_m-K(LP! .P!  (-G(HIP! %T/%:;P! "$|"45P! ##<=P! '.)P! 
P! P! P! \P!d VY)VY	?VY )-_m-K(LVY  $$>?	VY
 !1VY l+VY .VY VY VY \VYp  -1K
 K
+K
)K
 &K
 /	K

 !5K
 %9K
 )-_m-K(LK
 .K
 '.)K
 K
 K
 \K
b =P)=P(,_m-K(L=P  $$>?=P =P =P \=P~ 66,0d=6I1I,J6 C016 
'	(	6 6 6 \6p 5P)5Pl+5P 	?5P 5P 5P \5Pn P)P?CO?TP P P \P@ %P)%P(,_m-K(L%P %P %P \%PN +P)+P#$>?+P +P +P \+PZ 4)4	tJ$	%4 4 4 \4 9)99 c0019 
	9 9 9 \9: MP)MP	?MP .MP MP MP \MP^ !$!$/9/H!$	$!$ !$ !$ \!$F 6P)6P?P6P 6P 6P \6Pp 9z*9!*-9  
+9 	9
 "%9 
hZ((	)9 9 9 \9v BS)BS1BS BS BS BS \BSH 
#)  	   \4  D    \8 IP+IP)IP !5IP (	IP
 &IP .IP 
IP IP IP \IPV "P "P "P "P \"PH ,P),P(,_m-K(L,P 
,P ,P ,P \,P\ X)X+X 
X X X \Xt ,
,
),,
QT,
	'	(,
 ,
 ,
 \,
\ 


 -
 	

 
'	(
 
 
 \
B $
)$
(,_m-K(L$
 	?$
 $
 $
 \$
L *P)*P(,_m-K(L*P *P *P \*PX 5P)5P6:9o5P 5P 5P \5Pn  AEH H)H-H ##<=H H H \H H HrG   r,   )Br  r   r  r   collectionsr   typingr   r   r   r   r   ray._common.utilsr	   )ray.autoscaler.v2.instance_manager.commonr
   )ray.autoscaler.v2.instance_manager.configr   r   r   3ray.autoscaler.v2.instance_manager.instance_managerr   0ray.autoscaler.v2.instance_manager.node_providerr   r   r   r   r   r   Eray.autoscaler.v2.instance_manager.subscribers.cloud_resource_monitorr   :ray.autoscaler.v2.instance_manager.subscribers.ray_stopperr   Eray.autoscaler.v2.instance_manager.subscribers.threaded_ray_installerr   "ray.autoscaler.v2.metrics_reporterr   ray.autoscaler.v2.schedulerr   r   ray.autoscaler.v2.schemar   r   ray.autoscaler.v2.utilsr   !ray.core.generated.autoscaler_pb2r   r   r    r!   r"   r#   r$   'ray.core.generated.instance_manager_pb2r%   r&   rb   r'   r   r(   r)   r*   	getLoggerr~  rU   r,   r^   rG   rE   <module>r     s       # # # # # # 3 3 3 3 3 3 3 3 3 3 3 3 3 3 + + + + + + B B B B B B         
 P O O O O O                     T S S S S S      I H H H H H M M M M M M M M A A A A A A A A 0 0 0 0 0 0                                 
	8	$	$aH aH aH aH aH aH aH aH aH aHrG   