
    &`i.                     \   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZ d dlmZ d dlmZmZmZmZmZmZmZ  ej        e          Z ej        d          Ze                    ej                    G d d	          Z G d
 de          Z G d de          ZdeddfdZ dS )    N)RLock)FileLock)LOCAL_CLUSTER_NODE_TYPEbootstrap_localget_lock_pathget_state_path)NodeProvider)NODE_KIND_HEADNODE_KIND_WORKERSTATUS_UP_TO_DATETAG_RAY_NODE_KINDTAG_RAY_NODE_NAMETAG_RAY_NODE_STATUSTAG_RAY_USER_NODE_TYPEfilelockc                        e Zd Zd Zd Zd ZdS )ClusterStatec           	         t                      | _        t          j        t          j                            |          d           t          |          | _        || _        | j        5  | j        5  t          j        	                    | j                  rt          j        t          | j                                                            }|                    |d                   }|r7|                    di                               t                    t           k    ri }t"                              d           ni }t"                              d                    t)          |                               |d         D ]?}||vrt          t*          idd	||<   ||         d         t                   t*          k    sJ @|d         |vrt          t           idd	||d         <   n*||d                  d         t                   t           k    sJ t)          |d                   }|                    |d                    t)          |          D ]	}||vr||= 
|                    d
          }|r||d                  }	||	d<   t/          |          t/          |d                   dz   k    sJ t          | j        d          5 }
t"                              d                    |                     |
                    t          j        |                     d d d            n# 1 swxY w Y   d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S )NT)exist_okhead_iptagsz%Head IP changed - recreating cluster.z&ClusterState: Loaded cluster state: {}
worker_ips
terminatedr   stateexternal_head_ipexternal_ip   w'ClusterState: Writing cluster state: {})r   lockosmakedirspathdirnamer   	file_lock	save_pathexistsjsonloadsopenreadgetr   r
   loggerinfoformatlistr   appendlendebugwritedumps)self	lock_pathr'   provider_configworkershead_config	worker_iplist_of_node_ipsr   headfs              /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/autoscaler/_private/local/node_provider.py__init__zClusterState.__init__!   s!   GG	
BGOOI..>>>>!),,"Y ;	1 ;	1 :1 :17>>$.11 !"jdn)=)=)B)B)D)DEEG")++oi.H"I"IK'M&??6266::;LMM)* * #%$KLLL G<CCDMMRR   "1!> 
 
I //%68H$I%1. .	** $I.v67HI/0 0 0 0 0 #9-W<<!2N C!-; ;GOI677  	 :;FCDUV)* * * *
 $((E#F#F  ''	(BCCC!%g / /I (888#I.
 $3#6#67I#J#J # ;"?9#=>D*:D'7||s?<+H'I'IA'MMMMM$.#.. 1!LLAHHQQ   GGDJw//000	1 1 1 1 1 1 1 1 1 1 1 1 1 1 1m:1 :1 :1 :1 :1 :1 :1 :1 :1 :1 :1 :1 :1 :1 :1;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1 ;	1s\   )M1IL=AL&L=&L**L=-L*.L=1M=M	MM	MMMc                    | j         5  | j        5  t          j        t	          | j                                                            }|cd d d            cd d d            S # 1 swxY w Y   	 d d d            d S # 1 swxY w Y   d S N)r!   r&   r)   r*   r+   r'   r,   )r7   r:   s     r@   r-   zClusterState.getd   s    Y 	 	  *T$.%9%9%>%>%@%@AA      	 	 	 	 	 	 	 	        	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s4   A<:A#
A<#A'	'A<*A'	+A<<B B c           
         d|v sJ d|v sJ | j         5  | j        5  |                                 }|||<   t          | j        d          5 }t
                              d                    t          |                               |	                    t          j        |                     d d d            n# 1 swxY w Y   d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S )Nr   r   r   r    )r!   r&   r-   r+   r'   r.   r/   r0   r1   r5   r)   r6   )r7   	worker_idr/   r:   r?   s        r@   putzClusterState.putj   s   ~~~~$Y 		1 		1 1 1((**%)	"$.#.. 1!KK44:F4==4I4I   GGDJw//0001 1 1 1 1 1 1 1 1 1 1 1 1 1 11 1 1 1 1 1 1 1 1 1 1 1 1 1 1		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1 		1sY   C(/CA"B9-C9B==C B=CC(C	C(C	C((C,/C,N)__name__
__module____qualname__rA   r-   rF        r@   r   r       sE        A1 A1 A1F  1 1 1 1 1rK   r   c                       e Zd ZdZd ZdS )OnPremCoordinatorStatea[  Generates & updates the state file of CoordinatorSenderNodeProvider.

    Unlike ClusterState, which generates a cluster specific file with
    predefined head and worker ips, OnPremCoordinatorState overwrites
    ClusterState's __init__ function to generate and manage a unified
    file of the status of all the nodes for multiple clusters.
    c                    t                      | _        t          |          | _        || _        | j        5  | j        5  t
          j                            | j                  r9t          j	        t          | j                                                            }ni }t                              d                    |                     t          |          D ]	}||vr||= 
|D ]}||vri dd||<   t!          |          t!          |          k    sJ t          | j        d          5 }t                              d                    |                     |                    t          j        |                     d d d            n# 1 swxY w Y   d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S )Nz<OnPremCoordinatorState: Loaded on prem coordinator state: {}r   r   r   z=OnPremCoordinatorState: Writing on prem coordinator state: {})r   r!   r   r&   r'   r"   r$   r(   r)   r*   r+   r,   r.   r/   r0   r1   r3   r5   r6   )r7   r8   r'   r=   nodesnode_ipr?   s          r@   rA   zOnPremCoordinatorState.__init__   s}   GG	!),,"Y 	/ 	/ / /7>>$.11  JtDN';';'@'@'B'BCCEEE;;A6%==    $E{{ + +G&666!'N/  Ge++$&%1* *g 5zzS)9%:%:::::$.#.. /!KK@@Fu   GGDJu--.../ / / / / / / / / / / / / / /-/ / / / / / / / / / / / / / /	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/ 	/sZ   F<C.F$,AFF$FF$FF$F<$F(	(F<+F(	,F<<G G N)rG   rH   rI   __doc__rA   rJ   rK   r@   rM   rM   y   s-         !/ !/ !/ !/ !/rK   rM   c                   d    e Zd ZdZd Zd Zd Zd Zd Zd Z	d Z
d	 Zd
 Zd Zed             ZdS )LocalNodeProvidera  NodeProvider for private/local clusters.

    `node_id` is overloaded to also be `node_ip` in this class.

    When `cluster_name` is provided, it manages a single cluster in a cluster
    specific state file. But when `cluster_name` is None, it manages multiple
    clusters in a unified state file that requires each node to be tagged with
    TAG_RAY_CLUSTER_NAME in create and non_terminated_nodes function calls to
    associate each node with the right cluster.

    The current use case of managing multiple clusters is by
    OnPremCoordinatorServer which receives node provider HTTP requests
    from CoordinatorSenderNodeProvider and uses LocalNodeProvider to get
    the responses.
    c                     t          j        | ||           |r=t          |          }t          |          }t	          |||          | _        d| _        d S t          dd|d                   | _        d| _        d S )NFz/tmp/coordinator.lockz/tmp/coordinator.stater=   T)r	   rA   r   r   r   r   use_coordinatorrM   )r7   r9   cluster_namer8   
state_paths        r@   rA   zLocalNodeProvider.__init__   s    dO\BBB 	(%l33I'55J% DJ
 $)D    0'( 23 DJ
 $(D   rK   c                 6   | j                                         }g }|                                D ]h\  }}|d         dk    rd}|                                D ](\  }}|d                             |          |k    rd} n)|r|                    |           i|S )Nr   r   Tr   F)r   r-   itemsr2   )	r7   tag_filtersr:   matching_ipsr<   r/   okkvs	            r@   non_terminated_nodesz&LocalNodeProvider.non_terminated_nodes   s    *..""&}} 		/ 		/OItG},,B#))++  1<##A&&!++BE ,  /##I...rK   c                 T    | j                                         |         d         dk    S )Nr   runningr   r-   r7   node_ids     r@   
is_runningzLocalNodeProvider.is_running   s#    z~~(1Y>>rK   c                 .    |                      |           S rC   )re   rc   s     r@   is_terminatedzLocalNodeProvider.is_terminated   s    ??7++++rK   c                 L    | j                                         |         d         S Nr   rb   rc   s     r@   	node_tagszLocalNodeProvider.node_tags   s    z~~(00rK   c                     | j                                         |         }|                    d          }|r|S t          j        |          S )a  Returns an external ip if the user has supplied one.
        Otherwise, use the same logic as internal_ip below.

        This can be used to call ray up from outside the network, for example
        if the Ray cluster exists in an AWS VPC and we're interacting with
        the cluster from a laptop (where using an internal_ip will not work).

        Useful for debugging the local node provider with cloud VMs.r   )r   r-   socketgethostbyname)r7   rd   
node_stateext_ips       r@   r   zLocalNodeProvider.external_ip   sI     Z^^%%g.
.. 	1M'000rK   c                 *    t          j        |          S rC   )rl   rm   rc   s     r@   internal_ipzLocalNodeProvider.internal_ip   s    #G,,,rK   c                 B   | j         j        5  | j         j        5  | j                                         |         }|d                             |           | j                             ||           d d d            n# 1 swxY w Y   d d d            d S # 1 swxY w Y   d S ri   )r   r!   r&   r-   updaterF   )r7   rd   r   r/   s       r@   set_node_tagszLocalNodeProvider.set_node_tags   s   Z_ 	. 	.% . .z~~''0V##D)))
w---. . . . . . . . . . . . . . .	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	. 	.s5   BAA<0B<B 	 BB 	BBBc                     |t                    }| j        j        5  | j        j        5  | j                                        }|                                D ]x\  }}|d         dk    rg| j        s|d         t                    |k    rI||d<   d|d<   | j                            ||           |dz
  }|dk    r ddd           ddd           dS y	 ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS )z.Creates min(count, currently available) nodes.r   r   r   ra   r   r   N)r   r   r!   r&   r-   rY   rU   rF   )r7   node_configr   count	node_typer:   rd   r/   s           r@   create_nodezLocalNodeProvider.create_node   s   *+	Z_ 	# 	#% # #*..**%,]]__ 
# 
#MGTG}44, 5<(9:iGG'+V(1W
w555 %	 A::"# # # # # #	# 	# 	# 	# 	# 	# 	# 	#
## # # # # # # # # # # # # # #	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	# 	#sA   C3BC5C3CC3C	C3"C	#C33C7:C7c                     | j                                         }||         }d|d<   | j                             ||           d S )Nr   r   )r   r-   rF   )r7   rd   r:   r/   s       r@   terminate_nodez LocalNodeProvider.terminate_node  sB    *..""w$W
w%%%%%rK   c                      t          |           S rC   )r   )cluster_configs    r@   bootstrap_configz"LocalNodeProvider.bootstrap_config  s    ~...rK   N)rG   rH   rI   rQ   rA   r_   re   rg   rj   r   rq   rt   ry   r{   staticmethodr~   rJ   rK   r@   rS   rS      s          ( ( (*  ? ? ?, , ,1 1 11 1 1"- - -. . .# # #$& & & / / \/ / /rK   rS   local_providerreturnc                 B   | j         d         }| j        }||                     i           vrqt          t          t
          t          t          d                    |          t          t          i}|                     i |d           ||                     i           v sJ dS dS )aB  This function is called on the Ray head from StandardAutoscaler.reset
    to record the head node's own existence in the cluster state file.

    This is necessary because `provider.create_node` in
    `commands.get_or_create_head_node` records the head state on the
    cluster-launching machine but not on the head.
    r   zray-{}-headr   )rv   r   rw   N)r9   rV   r_   r   r
   r   r   r   r0   r   r   ry   )r   r   rV   	head_tagss       r@   !record_local_head_state_if_neededr     s     ,Y7G!.Ln99"==== ~"$;}33LAA!2	
	 	""r	"KKK.==bAAAAAA >= BArK   )!r)   loggingr"   rl   	threadingr   r   r   $ray.autoscaler._private.local.configr   r   r   r   ray.autoscaler.node_providerr	   ray.autoscaler.tagsr
   r   r   r   r   r   r   	getLoggerrG   r.   filelock_loggersetLevelWARNINGr   rM   rS   r   rJ   rK   r@   <module>r      s     				                         6 5 5 5 5 5                  
	8	$	$#'#J//    ) ) )V1 V1 V1 V1 V1 V1 V1 V1r*/ */ */ */ */\ */ */ */Zs/ s/ s/ s/ s/ s/ s/ s/lB6G BD B B B B B BrK   