
    &`i                          U d dl Z d dlZd dlZd dlmZmZ d dlZd dlmZ d dl	m
Z
 dZedz  ZdZ ej        d dd	           G d
 d                      Z ej                    Zej        ed<   d ZdS )    N)DictList)DataContext)NodeAffinitySchedulingStrategy<      g333333?)num_cpusmax_restartsmax_task_retriesc                   ^    e Zd ZdZd Zd Zdee         defdZ	d Z
dee         fd	Zd
 ZdS )AutoscalingRequestera  Actor to make resource requests to autoscaler for the datasets.

    The resource requests are set to timeout after RESOURCE_REQUEST_TIMEOUT seconds.
    For those live requests, we keep track of the last request made for each execution,
    which overrides all previous requests it made; then sum the requested amounts
    across all executions as the final request to the autoscaler.
    c                      i  _         t           _        t          j                    j         _         fd}t          j        |d           _	         j	        
                                 d S )Nc                      	 t          j        t                     t          j         j        j                                                   JN)timesleepPURGE_INTERVALrayget_self_handlepurge_expired_requestsremoteselfs   /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/execution/autoscaling_requester.pypurge_thread_runz7AutoscalingRequester.__init__.<locals>.purge_thread_run)   sC    K
>*** )@GGIIJJJ	K    T)targetdaemon)_resource_requestsRESOURCE_REQUEST_TIMEOUT_timeoutr   get_runtime_contextcurrent_actorr   	threadingThread_purge_threadstart)r   r   s   ` r   __init__zAutoscalingRequester.__init__    sz    "$0355C	K 	K 	K 	K 	K '-5EdSSS  """""r   c                     |                                   t          j        j                            |                                            d S N)bundles)_purger   
autoscalersdkrequest_resources_aggregate_requestsr   s    r   r   z+AutoscalingRequester.purge_expired_requests3   s=    ,,T5M5M5O5O,PPPPPr   reqexecution_idc                     |                                   |t          j                    | j        z   f| j        |<   t          j        j                            |                                            d S r,   )	r.   r   r#   r!   r   r/   r0   r1   r2   )r   r3   r4   s      r   r1   z&AutoscalingRequester.request_resources7   sc     IKK$-'1
- 	,,T5M5M5O5O,PPPPPr   c                     t          j                     }t          | j                                                  D ](\  }\  }}||k     r| j                            |           )d S r   )r   listr!   itemspop)r   nowk_ts        r   r.   zAutoscalingRequester._purgeD   sf    ikkd5;;==>> 	/ 	/IAv13ww'++A...	/ 	/r   returnc                 f   g }| j                                         D ]\  }\  }}|                    |           d } ||          }|dk    rct          j                    }d|v rL||d         k    r@t          j        t          |d         z            |z
  }|                    ddig|z             |S )Nc                 2    d}| D ]}d|v r||d         z  }|S )Nr   CPU )r3   r
   rs      r   get_cpusz:AutoscalingRequester._aggregate_requests.<locals>.get_cpusP   s3    H ) )A::%(HOr   r   rA      )r!   r8   extendr   cluster_resourcesmathceilARTIFICIAL_CPU_SCALING_FACTOR)r   r3   r<   rC   rD   r
   totaldeltas           r   r2   z(AutoscalingRequester._aggregate_requestsK   s    06688 	 	IAv1JJqMMMM	 	 	 8C==a<<)++E~~(eEl":":I;eElJKKhV  

UAJ<%/000
r   c                     || _         dS )z&Set the timeout. This is for test onlyN)r#   )r   ttls     r   _test_set_timeoutz&AutoscalingRequester._test_set_timeouth   s    r   N)__name__
__module____qualname____doc__r*   r   r   r   strr1   r.   r2   rO   rB   r   r   r   r      s         # # #&Q Q QQT$Z Qs Q Q Q Q/ / /T$Z    :    r   r   _autoscaling_requester_lockc                  B   t          j                    } | j        }t          t	          j                                                    d          }t          5  t          	                    dddd|          
                                cd d d            S # 1 swxY w Y   d S )NF)softr   Tdetached)name	namespaceget_if_existslifetimescheduling_strategy)r   get_currentr]   r   r   r$   get_node_idrU   r   optionsr   )ctxr]   s     r   )get_or_create_autoscaling_requester_actorrb   r   s    

!
#
#C1 9!!--//   
%  #++', 3 , 
 
 &((                 s   1BBB)rH   r&   r   typingr   r   r   ray.data.contextr   ray.util.scheduling_strategiesr   r"   r   rJ   r   r   RLockrU   __annotations__rb   rB   r   r   <module>rh      s                 



 ( ( ( ( ( ( I I I I I I  )A- !$  QR"===S S S S S S S >=Sp 0?y/@/@ Y_ @ @ @    r   