
    &`iR                     4   d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZmZmZ d d	lmZm Z m!Z!m"Z"m#Z# d
Z$dZ%dZ&dZ' ej(        dd          Z) e* ej(        dd                    Z+ ej(        d          Z, ej(        dd          Z- ej(        dd          Z. ee-e.          Z/dZ0 e j1        d          Z2 ej3        e4          Z5dee6e
f         defdZ7dee6e
f         deeef         fdZ8dee6e
f         dee6         fdZ9dee6e
f         defdZ:dee6e
f         defd Z;d!e6d"ee         fd#Z<d!e6d$e*fd%Z=d&e6d'e
dee6e
f         fd(Z>deee6e6f         e6f         fd)Z?e)e/fd*e6d&e6d+e6d,e6de6f
d-Z@d.ee6e
f         d/e6de*fd0ZAd.ee6e
f         d!e*dee*         fd1ZBd.ee6e
f         d!e*fd2ZCd.ee6e
f         d!e*fd3ZD G d4 d5e          ZE G d6 d7eE          ZF G d8 d9e          ZGdS ):    N)ABCabstractmethod)defaultdict)AnyDictListOptionalTuple)build_address)WORKER_LIVENESS_CHECK_KEY)NodeIDNodeIPNodeKind
NodeStatusNodeType)BatchingNodeProviderNodeDataScaleRequest)NODE_KIND_HEADNODE_KIND_WORKERSTATUS_UP_TO_DATESTATUS_UPDATE_FAILEDTAG_RAY_USER_NODE_TYPEzray.io/node-typezray.io/groupheadworkerKUBERAY_CRD_VERv1alpha1KUBERAY_REQUEST_TIMEOUT_S<   RAY_HEAD_POD_NAMEKUBERNETES_SERVICE_HOSTzhttps://kubernetes.defaultKUBERNETES_SERVICE_PORT_HTTPS443replicaIndex   )minutespodreturnc                     t          |           \  }}t          |           }t          |           }t          |           }t	          |||||          S )zpConverts a Ray pod extracted from K8s into Ray NodeData.
    NodeData is processed by BatchingNodeProvider.
    )kindtypereplica_indexstatusip)kind_and_type
status_tagpod_ip_replica_index_labelr   )r'   r*   r+   r-   r.   r,   s         /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/autoscaler/_private/kuberay/node_provider.pynode_data_from_podr4   V   sZ     s##JD$__F	B(--MM&R       c                     | d         d         }|t                    t          k    rt          nt          }|t                   }||fS )zmDetermine Ray node kind (head or workers) and node type (worker group name)
    from a Ray pod's labels.
    metadatalabels)KUBERAY_LABEL_KEY_KINDKUBERAY_KIND_HEADr   r   KUBERAY_LABEL_KEY_TYPE)r'   r8   r*   r+   s       r3   r/   r/   c   sL     _X&F ()->>> 	 	
 ()D:r5   c                 T    | d         d         }|                     t          d          S )a  Returns the replicaIndex label for a Pod in a multi-host TPU worker group.
    The replicaIndex label is set by the GKE TPU Ray webhook and is of
    the form {$WORKER_GROUP_NAME-$REPLICA_INDEX} where $REPLICA_INDEX
    is an integer from 0 to Replicas-1.
    r7   r8   N)getREPLICA_INDEX_KEY)r'   r8   s     r3   r2   r2   q   s'     _X&F::'...r5   c                 :    | d                              dd          S )Nr-   podIPzIP not yet assignedr=   )r'   s    r3   r1   r1   {   s    x=W&;<<<r5   c                     d| d         vs| d         d         sdS | d         d         d         d         }d|v rdS d|v rt           S d|v rdS d|v rt          S t          d	          )
zConvert pod state to Ray autoscaler node status.

    See the doc string of the class
    batching_node_provider.NodeData for the semantics of node status.
    containerStatusesr-   pendingr   staterunningwaiting
terminatedzUnexpected container state.)r   r   
ValueError)r'   rE   s     r3   r0   r0      s     	3x=008}01 	1 yM-.q1':EEyE  Eyu##
2
3
33r5   group_indexworkers_to_deletec                 6    d|  d}d|i}t          ||          S )N/spec/workerGroupSpecs/z/scaleStrategyworkersToDeletereplace_patch)rJ   rK   pathvalues       r3   worker_delete_patchrS      s-    @[@@@D 12Eu%%%r5   target_replicasc                 2    d|  d}|}t          ||          S )NrM   z	/replicasrO   )rJ   rT   rQ   rR   s       r3   worker_replica_patchrV      s'    ;[;;;DEu%%%r5   rQ   rR   c                     d| |dS )Nreplace)oprQ   rR    )rQ   rR   s     r3   rP   rP      s    TE:::r5   c                      t          d          5 } |                                 }ddd           n# 1 swxY w Y   dd|z   i}d}||fS )z
    Loads secrets needed to access K8s resources.

    Returns:
        headers: Headers with K8s access token
        verify: Path to certificate
    z3/var/run/secrets/kubernetes.io/serviceaccount/tokenNAuthorizationzBearer z4/var/run/secrets/kubernetes.io/serviceaccount/ca.crt)openread)secrettokenheadersverifys       r3   load_k8s_secretsrc      s     
C	D	D                	U*G DFF?s   155	namespacekuberay_crd_versionkubernetes_hostc                 J   |                     d          rt          d          |                     d          sd|z   }|                     d          rd}n=|                     d          rd|z   }n"t          d                    |                    ||z   d	z   | z   d
z   |z   S )a  Convert resource path to REST URL for Kubernetes API server.

    Args:
        namespace: The K8s namespace of the resource
        path: The part of the resource path that starts with the resource type.
            Supported resource types are "pods" and "rayclusters".
        kuberay_crd_version: The API version of the KubeRay CRD.
            Looks like "v1alpha1", "v1".
        kubernetes_host: The host of the Kubernetes API server.
            Uses $KUBERNETES_SERVICE_HOST and
            $KUBERNETES_SERVICE_PORT to construct the kubernetes_host if not provided.

            When set by Kubernetes,
            $KUBERNETES_SERVICE_HOST could be an IP address. That's why the https
            scheme is added here.

            Defaults to "https://kubernetes.default:443".
    zhttp://z,Kubernetes host must be accessed over HTTPS.zhttps://podsz/api/v1rayclustersz/apis/ray.io/z$Tried to access unknown entity at {}z/namespaces//)
startswithrI   NotImplementedErrorformat)rd   rQ   re   rf   	api_groups        r3   url_from_resourcero      s    0 !!),, IGHHH%%j11 7$6v W				'	' W#&99		!"H"O"OPT"U"UVVVY&7)CcIDPPr5   
raycluster
group_namec                 x    d | d                              dg           D             }|                    |          S )z+Extract worker group index from RayCluster.c                     g | ]
}|d          S )	groupNamerZ   ).0specs     r3   
<listcomp>z'_worker_group_index.<locals>.<listcomp>   s*       "[  r5   rv   workerGroupSpecs)r=   index)rp   rq   group_namess      r3   _worker_group_indexr{      sM     &0&8&<&<=OQS&T&T  K Z(((r5   c                 P    | d         d         |                              d          S )zExtract the maxReplicas of a worker group.

    If maxReplicas is unset, return None, to be interpreted as "no constraint".
    At time of writing, it should be impossible for maxReplicas to be unset, but it's
    better to handle this anyway.
    rv   rx   maxReplicasrA   rp   rJ   s     r3   _worker_group_max_replicasr      s(     f01+>BB=QQQr5   c                 R    | d         d         |                              dd          S )Nrv   rx   replicasr%   rA   r~   s     r3   _worker_group_replicasr      s(    f01+>BB:qQQQr5   c                 R    | d         d         |                              dd          S )Nrv   rx   
numOfHostsr%   rA   r~   s     r3   _worker_group_num_of_hostsr      s)    f01+>BB<QRSSSr5   c            	           e Zd ZdZededeeef         fd            Zedede	eeef                  deeef         fd            Z
dS )IKubernetesHttpApiClientz
    An interface for a Kubernetes HTTP API client.

    This interface could be used to mock the Kubernetes API client in tests.
    rQ   r(   c                     dS )5Wrapper for REST GET of resource with proper headers.NrZ   selfrQ   s     r3   r=   zIKubernetesHttpApiClient.get  	     	r5   payloadc                     dS )7Wrapper for REST PATCH of resource with proper headers.NrZ   r   rQ   r   s      r3   patchzIKubernetesHttpApiClient.patch
  r   r5   N)__name__
__module____qualname____doc__r   strr   r   r=   r   r   rZ   r5   r3   r   r      s           S#X    ^ # T#s(^(< c3h    ^  r5   r   c                       e Zd ZefdedefdZd Zdedeeef         fdZ	dede
eeef                  deeef         fd	Zd
S )KubernetesHttpApiClientrd   re   c                     || _         || _        t          j                                        t          z   | _        d\  | _        | _        d S )N)NN)_kuberay_crd_version
_namespacedatetimenowTOKEN_REFRESH_PERIOD_token_expires_at_headers_verify)r   rd   re   s      r3   __init__z KubernetesHttpApiClient.__init__  s@    $7!#!)!2!6!6!8!8;O!O&0#t|||r5   c                 d   t           j                                         | j        k    s| j        | j        nt
                              d           t                      \  | _        | _        t           j                                         t          z   | _        | j        | j        fS | j        | j        fS )Nz*Refreshing K8s API client token and certs.)	r   r   r   r   r   loggerinforc   r   )r   s    r3   !_get_refreshed_headers_and_verifyz9KubernetesHttpApiClient._get_refreshed_headers_and_verify  s    !!##t'===M!T\%9KKDEEE*:*<*<'DM4<%-%6%:%:%<%<?S%SD"=$,..=$,..r5   rQ   r(   c                    t          | j        || j                  }|                                 \  }}t	          j        ||t          |          }|j        dk    s|                                 |	                                S )a'  Wrapper for REST GET of resource with proper headers.

        Args:
            path: The part of the resource path that starts with the resource type.

        Returns:
            The JSON response of the GET request.

        Raises:
            HTTPError: If the GET request fails.
        rd   rQ   re   ra   timeoutrb      )
ro   r   r   r   requestsr=   r   status_coderaise_for_statusjson)r   rQ   urlra   rb   results         r3   r=   zKubernetesHttpApiClient.get"  s      o $ 9
 
 
 @@BB-	
 
 
 !S((##%%%{{}}r5   r   c                 :   t          | j        || j                  }|                                 \  }}t	          j        |t          j        |          i |ddit          |          }|j	        dk    s|
                                 |                                S )aY  Wrapper for REST PATCH of resource with proper headers

        Args:
            path: The part of the resource path that starts with the resource type.
            payload: The JSON patch payload.

        Returns:
            The JSON response of the PATCH request.

        Raises:
            HTTPError: If the PATCH request fails.
        r   zContent-typezapplication/json-patch+jsonr   r   )ro   r   r   r   r   r   r   dumpsr   r   r   )r   rQ   r   r   ra   rb   r   s          r3   r   zKubernetesHttpApiClient.patch?  s      o $ 9
 
 

 @@BBJwNwN0MNN-
 
 
 !S((##%%%{{}}r5   N)r   r   r   r   r   r   r   r   r   r=   r   r   rZ   r5   r3   r   r     s        BQ 1 1# 1C 1 1 1 1	/ 	/ 	/ S#X    :# T#s(^(< c3h      r5   r   c            	       6   e Zd Zdeeef         defdZdeeef         fdZ	de
fdZdefdZdefd	Zde
d
eeef         deeeef                  fdZdeeeef                  fdZdedeeef         fdZdedeeeef                  deeef         fdZdS )KubeRayNodeProviderprovider_configcluster_namec                     t                               d           |d         | _        || _        t	          | j                  | _        |                    t          d          du sJ dt           d            t          j	        | ||           d S )NzCreating KubeRayNodeProvider.rd   TFz&To use KubeRayNodeProvider, must set `z:False`.)
r   r   rd   r   r   k8s_api_clientr=   r   r   r   )r   r   r   s      r3   r   zKubeRayNodeProvider.__init___  s    
 	3444(5(5dnEE  94@@EIIIW4MWWW JII%dO\JJJJJr5   r(   c           	      6   |                      d| j                   | _        |                                 }|r.t                              d| j         d| j         d| d           t          j        	                    d| j                   }d| }|r|d| d	z   z  }|                      |          }|d
         d         }t                              d| d           i }|d         D ]-}d|d
         v r|d
         d         }t          |          ||<   .|S )zQueries K8s for pods in the RayCluster. Converts that pod data into a
        map of pod name to Ray NodeData, as required by BatchingNodeProvider.
        zrayclusters/zListing pods for RayCluster  in namespace z at pods resource version >= .zray.io/cluster=zpods?labelSelector=z&resourceVersion=z"&resourceVersionMatch=NotOlderThanr7   resourceVersionz%Fetched pod data at resource version itemsdeletionTimestampname)_getr   _raycluster_get_pods_resource_versionr   r   rd   r   utilsquoter4   )	r   resource_versionlabel_selectorresource_pathpod_listfetched_resource_versionnode_data_dictr'   pod_names	            r3   get_node_dataz!KubeRayNodeProvider.get_node_datao  s   
  99%GD4E%G%GHH
  ::<< 	KKDt/@ D D!%D D0@D D D   "--.S@Q.S.STT>n>> 	6$46667M
 99]++#+J#78I#J S8PSSS	
 	
 	

 G$ 		? 		?C
 #c*o55:v.H'9#'>'>N8$$r5   scale_requestc                     |                      || j                  }t                              d| j         d| j         d           t                              |           |                     |           dS )zConverts the scale request generated by BatchingNodeProvider into
        a patch that modifies the RayCluster CR's replicas and/or workersToDelete
        fields. Then submits the patch to the K8s API server.
        z;Autoscaler is submitting the following patch to RayCluster r   r   N)_scale_request_to_patch_payloadr   r   r   r   rd   _submit_raycluster_patch)r   r   patch_payloads      r3   submit_scale_requestz(KubeRayNodeProvider.submit_scale_request  s     <<4+
 

 	B B B04B B B	
 	
 	
 	M"""%%m44444r5   c                    t          | j                                                  }| j        d                             dg           }g }t          |          D ]q\  }}|                    di                               dg           }|r|                    |           |D ](}||v r"t                              d| d             dS )rg }|D ](}t          |g           }	|                    |	           )|rMt          
                    d	           t          
                    d
| d           |                     |           dS )a  Returns False iff non_terminated_nodes contains any pods in the RayCluster's
        workersToDelete lists.

        Explanation:
        If there are any workersToDelete which are non-terminated,
        we should wait for the operator to do its job and delete those
        pods. Therefore, we back off the autoscaler update.

        If, on the other hand, all of the workersToDelete have already been cleaned up,
        then we patch away the workersToDelete lists and return True.
        In the future, we may consider having the operator clean up workersToDelete
        on it own:
        https://github.com/ray-project/kuberay/issues/733

        Note (Dmitri):
        It is stylistically bad that this function has a side effect.
        rv   rx   scaleStrategyrN   z&Waiting for operator to remove worker r   F)rK   zCleaning up workers to delete.zSubmitting patch T)setr   keysr   r=   	enumerateappendr   warningrS   r   r   )
r   node_setworker_groupsnon_empty_worker_group_indicesrJ   worker_grouprN   r   r   r   s
             r3   safe_to_scalez!KubeRayNodeProvider.safe_to_scale  s   & t*//1122(0445GLL *,&)2=)A)A 	! 	!%K*..CCGG!2 O  C.55kBBB) ! !X%% NN#UF#U#U#UVVV 555	 &! 9 	( 	(K'rJJJE  '''' 	9KK8999KK<M<<<===))-888 tr5   c                 j    t           sdS |                     dt                      }|d         d         S )z
        Extract a recent pods resource version by reading the head pod's
        metadata.resourceVersion of the response.
        Nzpods/r7   r   )r    r   )r   pod_resps     r3   r   z.KubeRayNodeProvider._get_pods_resource_version  s=    
 ! 	4998%68899
#$566r5   rp   c                    g }|j                                         D ]\  }}t          ||          }t          ||          }|8||k     r2t                              dd                    |          z              |}|t          ||          k    rtt          ||          }|	                    |           t          t                    }	|j        D ]=}
|                     |
          t                   }|	|         	                    |
           >|	                                D ]:\  }}t          ||          }t          ||          }|	                    |           ;|S )zEConverts autoscaler scale request into a RayCluster CR patch payload.NzAutoscaler attempted to create z&more than maxReplicas pods of type {}.)desired_num_workersr   r{   r   r   r   rm   r   rV   r   r   listrK   	node_tagsr   rS   )r   r   rp   r   	node_typerT   rJ   group_max_replicasr   deletion_groupsr   rK   s               r3   r   z3KubeRayNodeProvider._scale_request_to_patch_payload  s    *7*K*Q*Q*S*S 	( 	(&I-j)DDK!;J!T!T!-2D2V2V5>EEiPPQ   #5"8["Q"QQQ(oFFE  '''' &d++#5 	6 	6Fv../EFII&--f5555,;,A,A,C,C 	( 	((I(-j)DDK'5FGGE  ''''r5   r   c                 f    d                     | j                  }|                     ||           dS )z*Submits a patch to modify a RayCluster CR.zrayclusters/{}N)rm   r   _patch)r   r   rQ   s      r3   r   z,KubeRayNodeProvider._submit_raycluster_patch  s2    &&t'899D-(((((r5   rQ   c                 6    | j                             |          S )r   )r   r=   r   s     r3   r   zKubeRayNodeProvider._get  s    "&&t,,,r5   r   c                 8    | j                             ||          S )r   )r   r   r   s      r3   r   zKubeRayNodeProvider._patch  s    "((w777r5   N)r   r   r   r   r   r   r   r   r   r   r   r   boolr   r   r   r   r   r   r   rZ   r5   r3   r   r   ^  s       Kc3hK K K K K .tFH$45 . . . .`5, 5 5 5 5$2t 2 2 2 2h7C 7 7 7 7#)#7;CH~#	d38n	# # # #J)d4S>6J ) ) ) )
- -c3h - - - -83 8d38n)= 8$sCx. 8 8 8 8 8 8r5   r   )Hr   r   loggingosabcr   r   collectionsr   typingr   r   r   r	   r
   r   ray._common.network_utilsr   !ray.autoscaler._private.constantsr   ray.autoscaler._private.utilr   r   r   r   r   %ray.autoscaler.batching_node_providerr   r   r   ray.autoscaler.tagsr   r   r   r   r   r9   r;   r:   KUBERAY_KIND_WORKERgetenvr   intr   r    r!   KUBERNETES_SERVICE_PORTKUBERNETES_HOSTr>   	timedeltar   	getLoggerr   r   r   r4   r/   r2   r1   r0   rS   rV   rP   rc   ro   r{   r   r   r   r   r   r   rZ   r5   r3   <module>r      su      				 # # # # # # # # # # # # # # 3 3 3 3 3 3 3 3 3 3 3 3 3 3  3 3 3 3 3 3 G G G G G G W W W W W W W W W W W W W W         
              ,  ( 
    ")-z::C		*Er J JKK BI122  $");   $")$CUKK - 79PQQ" )x)!444 0 
	8	$	$
DcN 
x 
 
 
 
tCH~ %(0B*C    /d38n /# / / / /=S#X =6 = = = =4DcN 4z 4 4 4 42&S &T&\ & & & &&c &C & & & &; ;C ;DcN ; ; ; ;%S#X 34    ,  /*	"Q "Q"Q
"Q "Q 	"Q
 	"Q "Q "Q "QJ)DcN ) ) ) ) ) )	RS#X	R-0	Rc]	R 	R 	R 	RRtCH~ RC R R R R
T4S> T T T T T
    s   $K K K K K6 K K K\A8 A8 A8 A8 A8. A8 A8 A8 A8 A8r5   