
    &`iz2                        d dl Z d dlZd dlZd dlmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZ  e j        e          Zd	Zd
ZdZdZdZeeeeegZ  ed           G d de                      Z!dS )    N)AnyDictListOptional)Celery)task_failuretask_unknown)get_replica_context)DEFAULT_CONSUMER_CONCURRENCYSERVE_LOGGER_NAME)CeleryAdapterConfigTaskProcessorAdapterTaskProcessorConfig
TaskResult)	PublicAPIworker_poolworker_concurrencytask_ignore_resulttask_acks_latetask_reject_on_worker_lostalpha)	stabilityc                   v    e Zd ZU dZeed<   eed<   dZee	j
                 ed<   dZee         ed<   eZeed<   def fd	Zefd
efdZd&dZ	 d'defdZdefdZd Zd(defdZd Zd Zdeeef         fdZdee         fdZ	 	 	 	 	 d)dededededef
dZ 	 	 	 	 	 d)dededed ed!ef
d"Z!d#ed$ede"fd%Z# xZ$S )*CeleryTaskProcessorAdapterz
    Celery-based task processor adapter.
    This adapter does NOT support any async operations.
    All operations must be performed synchronously.
    _app_configN_worker_thread_worker_hostname_worker_concurrencyconfigc                 v    t                      j        |i | t          |j        t                    st          d          |j        j        rbt          |j        j                                                  t          t                    z  }|r t          dt          |           d          || _        d S )NzMTaskProcessorConfig.adapter_config must be an instance of CeleryAdapterConfigzJThe following configuration keys cannot be changed via app_custom_config: zA. These are managed internally by the CeleryTaskProcessorAdapter.)super__init__
isinstanceadapter_configr   	TypeErrorapp_custom_configsetkeysCELERY_DEFAULT_APP_CONFIG
ValueErrorsortedr   )selfr    argskwargsconflicting_keys	__class__s        l/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/serve/task_processor.pyr#   z#CeleryTaskProcessorAdapter.__init__6   s    $)&)))&/1DEE 	_  
  2 	"%7<<>>   -.. /    Waghxayay W W W  
     consumer_concurrencyc           
         t          | j        j        | j        j        j        | j        j        j                  | _        t          dt          |t          dt          dt          di}| j        j        j        r$|                    | j        j        j                   | j        j                            |           | j        j        | j        j        d| j        j        di}| j        j        r'| j        j        d| j        j        d|| j        j        <   | j        j        r'| j        j        d| j        j        d|| j        j        <   | j        j                            |dd| j        j        ii	           | j        j        j        /| j        j                            | j        j        j        
           | j        j        rt%          j        | j                   | j        j        rt+          j        | j                   d S d S )N)backendbrokerthreadsFTdirect)exchangeexchange_typerouting_key*queue)task_queuestask_routes)broker_transport_options)r   r   
queue_namer%   backend_url
broker_urlr   CELERY_WORKER_POOLCELERY_WORKER_CONCURRENCYCELERY_TASK_IGNORE_RESULTCELERY_TASK_ACKS_LATE!CELERY_TASK_REJECT_ON_WORKER_LOSTr'   updateconffailed_task_queue_nameunprocessable_task_queue_namerA   r   connect_handle_task_failurer	   _handle_unknown_task)r-   r4   app_configurationqueue_configs       r2   
initializez%CeleryTaskProcessorAdapter.initializeN   s   L#L/;<.9
 
 
	 	%';%u!4-t
 <&8 	T$$T\%@%RSSS	/000 L# L3!)#|6& &
 <. 	 L?!)#|BA AL<= <5 	 LF!)#|IH HLCD 		$ gt|67 	 	
 	
 	
 <&?KIN!!)-)D)] "    <. 	< !:;;;<5 	< !:;;;;;	< 	<r3   c                    t           fd| j        j        idddd}| j        j        j        r$|                    | j        j        j                   |r  | j        j        dd|i||           d S   | j        j        di ||           d S )Nmax_retriesT<   F)autoretry_forretry_kwargsretry_backoffretry_backoff_maxretry_jittername )	Exceptionr   rU   r%   task_custom_configrJ   r   task)r-   funcr\   task_optionss       r2   register_task_handlez/CeleryTaskProcessorAdapter.register_task_handle   s    '\*DL,DE!!#!
 
 <&9 	P ; NOOO 	15NDIN55555d;;;;;*NDIN**\**400000r3   returnc                      | j         j        |f||| j        j        d|}t	          |j        |j        t          j                    |j                  S )N)r.   r/   r>   )idstatus
created_atresult)	r   	send_taskr   rB   r   rf   rg   timeri   )r-   	task_namer.   r/   optionstask_responses         r2   enqueue_task_syncz,CeleryTaskProcessorAdapter.enqueue_task_sync   ss     ,	+
,)	
 

 
 
  'y{{ '	
 
 
 	
r3   c                 x    | j                             |          }t          |j        |j        |j                  S )N)rf   ri   rg   )r   AsyncResultr   rf   ri   rg   )r-   task_idtask_detailss      r2   get_task_status_syncz/CeleryTaskProcessorAdapter.get_task_status_sync   s?    y,,W55&&
 
 
 	
r3   c                    | j         5| j                                         rt                              d           dS t	                      j        }| j        j         d| | _        dd| j         d| j	        j
        g}t          j        | j        j        |f          | _         | j                                          t                              d| j                    dS )	z Starts the Celery worker thread.Nz(Celery worker thread is already running._workerz--hostname=z-Q)targetr.   z,Celery worker thread started with hostname: )r   is_aliveloggerinfor
   replica_tagr   mainr   r   rB   	threadingThreadworker_mainstart)r-   r/   	unique_idworker_argss       r2   start_consumerz)CeleryTaskProcessorAdapter.start_consumer   s    *t/B/K/K/M/M*KKBCCCF'))5	#'9> ? ?I ? ? 1$/11L#	
 (.9(
 
 
 	!!###R4;PRR	
 	
 	
 	
 	
r3         $@timeoutc                    | j         | j                                         st                              d           dS t                              d           | j        j                            dd| j         g           | j                             |           | j                                         rt          	                    d| d	           nt                              d
           d| _         dS )zESignals the Celery worker to shut down and waits for it to terminate.Nz$Celery worker thread is not running.z+Sending shutdown signal to Celery worker...shutdownzcelery@)destination)r   z&Worker thread did not terminate after z	 seconds.z!Celery worker thread has stopped.)
r   ry   rz   r{   r   control	broadcastr   joinwarning)r-   r   s     r2   stop_consumerz(CeleryTaskProcessorAdapter.stop_consumer   s    &d.A.J.J.L.L&KK>???FABBB 		##%Ft/D%F%F$G 	$ 	
 	
 	
 	   111'')) 	=NNVGVVVWWWWKK;<<<"r3   c                     t                               d           | j        j                                         t                               d           d S )NzShutting down Celery worker...z"Celery worker shutdown complete...)rz   r{   r   r   r   r-   s    r2   r   z#CeleryTaskProcessorAdapter.shutdown   sE    4555	""$$$899999r3   c                 D    | j         j                            |           dS )z
        Cancels a task synchronously. Only supported for Redis and RabbitMQ brokers by Celery.
        More details can be found here: https://docs.celeryq.dev/en/stable/userguide/workers.html#revoke-revoking-tasks
        N)r   r   revoke)r-   rr   s     r2   cancel_task_syncz+CeleryTaskProcessorAdapter.cancel_task_sync   s#    
 		  )))))r3   c                 b    | j         j                                                                        S )z
        Returns the metrics of the Celery worker synchronously.
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Inspect.stats
        )r   r   inspectstatsr   s    r2   get_metrics_syncz+CeleryTaskProcessorAdapter.get_metrics_sync   s'    
 y ((**00222r3   c                 >    | j         j                                        S )a  
        Checks the health of the Celery worker synchronously.
        Returns a list of dictionaries, each containing the worker name and a dictionary with the health status.
        Example: [{'celery@192.168.1.100': {'ok': 'pong'}}]
        More details can be found here: https://docs.celeryq.dev/en/stable/reference/celery.app.control.html#celery.app.control.Control.ping
        )r   r   pingr   s    r2   health_check_syncz,CeleryTaskProcessorAdapter.health_check_sync   s     y %%'''r3   senderrr   r.   r/   einfoc           	         t                               d| dt          |                      |t          |j                  t          |          t          |          t          |          g}| j        j        rV|                     | j        j        |j        |           t                               d| d| d| j        j         d           dS dS )ar  Handle task failures and route them to appropriate dead letter queues.

        This method is called when a task fails after all retry attempts have been
        exhausted. It logs the failure and moves the task to failed_task_queue

        Args:
            sender: The task object that failed
            task_id: Unique identifier of the failed task
            args: Positional arguments passed to the task
            kwargs: Keyword arguments passed to the task
            einfo: Exception info object containing exception details and traceback
            **kw: Additional keyword arguments passed by Celery
        z#Task failure detected for task_id: z	, einfo: zTask z& failed after max retries. Exception: z. Moved it to the z queue.N)	rz   r{   str	exceptionr   rL   _move_task_to_queuer\   error)r-   r   rr   r.   r/   r   kwdlq_argss           r2   rO   z/CeleryTaskProcessorAdapter._handle_task_failure   s   , 	P'PPCJJPP	
 	
 	

   IIKKJJ
 <. 		$$3   LL M  M  Mu  M  M`d`l  aD  M  M  M    		 		r3   r\   rf   messageexcc                    t                               d| d| dt          |                      | j        j        rO|                     | j        j        |||t          |          t          |          t          |          g           dS dS )a  Handle unknown or unregistered tasks received by Celery.

        This method is called when Celery receives a task that it doesn't recognize
        (i.e., a task that hasn't been registered with the Celery app). These tasks
        are moved to the unprocessable task queue if configured.

        Args:
            sender: The Celery app or worker that detected the unknown task
            name: Name of the unknown task
            id: Task ID of the unknown task
            message: The raw message received for the unknown task
            exc: The exception raised when trying to process the unknown task
            **kwargs: Additional context information from Celery
        z'Unknown task detected by Celery. Name: z, ID: z, Exc: N)rz   r{   r   r   rM   r   )r-   r   r\   rf   r   r   r/   s          r2   rP   z/CeleryTaskProcessorAdapter._handle_unknown_task-  s    . 	WdWW"WWSQTXXWW	
 	
 	
 <5 	$$:LLHHKK
 
 
 
 
	 	r3   rB   rl   c           	          	 t                               d| d| d|            | j                            |||           dS # t          $ r*}t                               d| d| d|            |d}~ww xY w)z4Helper function to move a task to a specified queue.zMoving task: z to queue: z, args: )r\   r>   r.   zFailed to move task: z	, error: N)rz   r{   r   rj   r^   r   )r-   rB   rl   r.   es        r2   r   z.CeleryTaskProcessorAdapter._move_task_to_queueU  s    	KKP	PPjPP$PP   I        
  	 	 	LLV	VVjVVSTVV   G		s   A A 
A8%A33A8)N)NN)r   )NNNNN)%__name__
__module____qualname____doc__r   __annotations__r   r   r   r~   r   r   r   r   r   intr#   rS   rc   r   ro   rt   r   floatr   r   r   r   r   r   r   r   rO   rP   listr   __classcell__)r1   s   @r2   r   r   (   s          LLL    15NHY-.555&*hsm***;;;;2      0 6R ;< ;<s ;< ;< ;< ;<z1 1 1 1" ,0
 
	
 
 
 
$
z 
 
 
 

 
 
4# #U # # # #*: : :
* * *3$sCx. 3 3 3 3(4: ( ( ( ( + ++ + 	+
 + + + + +^ & && & 	&
 & & & & &Pc c         r3   r   )"loggingr~   rk   typingr   r   r   r   celeryr   celery.signalsr   r	   	ray.server
   ray.serve._private.constantsr   r   ray.serve.schemar   r   r   r   ray.util.annotationsr   	getLoggerrz   rE   rF   rG   rH   rI   r*   r   r]   r3   r2   <module>r      s         , , , , , , , , , , , ,       5 5 5 5 5 5 5 5 ) ) ) ) ) )                   + * * * * *		,	-	- # 0 0 ( $@ ! %  W{ { { { {!5 { { { { {r3   