
    &`i&                         d dl Z d dlmZmZ d dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZ  e j        e          Z G d de          Z G d	 d
          ZdS )    N)ABCabstractmethod)ListOptional)InstanceUtil)InstanceStorage)GetInstanceManagerStateReplyGetInstanceManagerStateRequestInstanceInstanceUpdateEventNodeKind
StatusCodeUpdateInstanceManagerStateReply!UpdateInstanceManagerStateRequestc                   >    e Zd ZdZedee         ddfd            ZdS )InstanceUpdatedSubscriberz'Subscribers to instance status changes.eventsreturnNc                     d S N )selfr   s     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/autoscaler/v2/instance_manager/instance_manager.pynotifyz InstanceUpdatedSubscriber.notify   s        )__name__
__module____qualname____doc__r   r   r   r   r   r   r   r   r      sL        11T"56 4    ^  r   r   c            
           e Zd ZdZdedeee                  fdZde	de
fdZdedefdZe	 dd
ededede
fd            Zededefd            Zededefd            Zedededefd            ZdS )InstanceManagera  
    See `InstanceManagerService` in instance_manager.proto

    This handles updates to an instance, or inserts a new instance if
    it's an insert update. We should only be inserting new instances
    of the below statuses:
        1. ALLOCATED: For unmanaged instance not initialized by InstanceManager,
            e.g. head node
        2. QUEUED: For new instance being queued to launch.
        3. TERMINATING: For leaked cloud instance that needs to be terminated.

    For full status transitions, see:
    https://docs.google.com/document/d/1NzQjA8Mh-oMc-QxXOa529oneWCoA8sDiVoNkBqqDb4U/edit#heading=h.k9a1sp4qpqj4

    Not thread safe, should be used as a singleton.
    instance_storage"instance_status_update_subscribersc                 &    || _         |pg | _        d S r   )_instance_storage_status_update_subscribers)r   r"   r#   s      r   __init__zInstanceManager.__init__/   s     
 "2*L*RPR'''r   requestr   c                    d |j         D             }| j                            |                                          \  }}|j        dk    rS|j        |k    rHd|j         d| }t
                              |           |                     t          j	        ||          S g }|
                                D ]P\  }}||v r|                     ||         |          }	n|                     |          }	|                    |	           Q| j                            ||          }
|
j        s|
j        |k    rMd| d|
j         }t
                              |           |                     t          j	        |
j        |          S d}t
                              |           |                     t          j        |
j        |          S | j        D ]}|                    |j                    |                     t          j        |
j                  S )ao  
        Updates the instance manager state.

        If there's any failure, no updates would be made and the reply
        would contain the latest version of the instance manager state,
        and the error info.

        Args:
            request: The request to update the instance manager state.

        Returns:
            The reply to the request.
        c                     i | ]
}|j         |S r   )instance_id).0updates     r   
<dictcomp>zAInstanceManager.update_instance_manager_state.<locals>.<dictcomp>I   s    SSS&,fSSSr   )instance_idsr   zVersion mismatch: expected: z
, actual: )updatesexpected_storage_versionz"Failed to update instance storage.)r0   r%   get_instanceskeysexpected_versionloggerwarning_get_update_im_state_replyr   VERSION_MISMATCHitems_update_instance_create_instanceappendbatch_upsert_instancessuccessversionerrorUNKNOWN_ERRORSr&   r   OK)r   r(   ids_to_updatesto_update_instancesr?   err_strto_upsert_instancesr+   r-   instanceresult
subscribers               r   update_instance_manager_statez-InstanceManager.update_instance_manager_state7   s]   $ TS7?SSS'+'='K'K',,.. (L (
 (
$W #q((W-E-P-P%w/G % %"% %  NN7###22+   !#1#7#7#9#9 	1 	1K11100'4f   0088&&x0000 '>>'%, ? 
 

 ~ 	~((V7VVfnVV  w'''66/   ?W%%%66-v~w  
 9 	/ 	/Jgo......z}fnMMMr   c                     t                      }| j                                        \  }}|j        j                            |                                           ||j        _        t          j	        |j
        _        |S )z
        Gets the instance manager state.

        Args:
            request: The request to get the instance manager state.

        Returns:
            The reply to the request.
        )r	   r%   r2   state	instancesextendvaluesr?   r   rB   statuscode)r   r(   replyrM   r?   s        r   get_instance_manager_statez*InstanceManager.get_instance_manager_state   sh     -..!3AACC	7$$Y%5%5%7%7888%&Mr    status_coder?   error_messagec                 d    t                      }| |j        _        ||_        |r||j        _        |S )a$  
        Returns a UpdateInstanceManagerStateReply with the given status code and
        version.

        Args:
            status_code: The status code.
            version: The version.
            error_message: The error message if any.

        Returns:
            The reply.
        )r   rP   rQ   r?   message)rU   r?   rV   rR   s       r   r7   z*InstanceManager._get_update_im_state_reply   s7      011' 	1#0EL r   rG   r-   c                    |j         t          j        k    r|j        s
J d            |j        t
          j        t
          j        fv s
J d            |j        s
J d            |j        s
J d            |j        | _        |j        | _        |j        | _        |j	        | _
        dS |j         t          j        k    r|j	        s
J d            |j	        | _
        dS |j         t          j        k    r<|j        s
J d            |j        s
J d            |j        | _        |j        | _        dS |j         t          j        k    r|j        sJ d            dS dS )	z
        Apply status specific update to the instance.

        Args:
            instance: The instance to update.
            update: The update to apply.
        z,ALLOCATED update must have cloud_instance_idz6ALLOCATED update must have node_kind as WORKER or HEADz(ALLOCATED update must have instance_typez(RAY_RUNNING update must have ray_node_idz,REQUESTED update must have launch_request_idz(REQUESTED update must have instance_typez.TERMINATING update must have cloud instance idN)new_instance_statusr   	ALLOCATEDcloud_instance_id	node_kindr   WORKERHEADinstance_typeray_node_idnode_idRAY_RUNNING	REQUESTEDlaunch_request_idTERMINATINGrG   r-   s     r   _apply_updatezInstanceManager._apply_update   s    %);;;(> >=> >(#(    H   'SS)SSS'(> >=> >()/)AH&!'!1H%+%9H"%1H'8+???%QQ'QQQ%%1H'8+===(> >=> >('SS)SSS')/)AH&%+%9H"""'8+???(@ @?@ @( @?@ @r   c                 x   | j         s
J d            | j        t          j        t          j        t          j        fv s
J d            t          j        | j        | j	        | j        | j
                  }t                              t          j        ||                      t                              ||            |S )z>
        Create a new instance from the given update.
        z.upsert must be true for creating new instance.zPInvalid status for new instance, must be one of [ALLOCATED, QUEUED, TERMINATING])r+   r`   rP   details)upsertrZ   r   r[   QUEUEDrf   r   new_instancer+   r`   rj   r5   infoget_log_str_for_updater!   rh   )r-   rG   s     r   r;   z InstanceManager._create_instance   s     }NNNNN}) O .
 
 
 
/
 
 
  ,* .-N	
 
 
 	L7&IIJJJ%%h777r   c                 r   t                               t          j        | |                     t          j        | |j                  sTJ dt          j                            | j	                   dt          j                            |j                               t                              | |           | S )z
        Update the instance with the given update.

        Args:
            instance: The instance to update.
            update: The update to apply.

        Returns:
            The updated instance.
        zInvalid status transition from z to )r5   rn   r   ro   
set_statusrZ   r   InstanceStatusNamerP   r!   rh   rg   s     r   r:   z InstanceManager._update_instance   s     	L7&IIJJJ&x1KLL 	
 	
J&++HO<<J J&++F,FGGJ J	
 	
L
 	%%h777r   N)rT   )r   r   r   r   r   r   r   r   r'   r   r   rJ   r
   r	   rS   staticmethodr   intstrr7   r   r   rh   r;   r:   r   r   r   r!   r!      s        "S)S -5T:S5T,US S S SIN8IN	(IN IN IN INV5	%   0 DF *->A	(   \, %@ %@2E %@ %@ %@ \%@N !4     \B 8 5H X    \  r   r!   )loggingabcr   r   typingr   r   )ray.autoscaler.v2.instance_manager.commonr   3ray.autoscaler.v2.instance_manager.instance_storager   'ray.core.generated.instance_manager_pb2r	   r
   r   r   r   r   r   r   	getLoggerr   r5   r   r!   r   r   r   <module>r~      s@    # # # # # # # # ! ! ! ! ! ! ! ! B B B B B B O O O O O O	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 
	8	$	$       r r r r r r r r r rr   