
    &`i0                        d dl Z d dlZd dlZd dlZd dlmZmZ d dlZd dlm	c m
Z d dlmZ d dlmZmZmZ d dlmZmZmZ 	  ej        e          Zdeeeef         ej        e         f         deeef         fdZ G d	 d
          ZdS )    N)OptionalUnion)aiohttp)SubprocessModuleSubprocessModuleConfig
run_module)ResponseTypeget_http_session_to_modulemodule_logging_filenameheadersreturnc                     h dt          | t          j                  rt          |           } fd|                                 D             }|S )z>
    Filter out hop-by-hop headers from the headers dict.
    >   
keep-alivetransfer-encodingproxy-authenticateproxy-authorizationteupgradetrailers
connectionc                 H    i | ]\  }}|                                 v||S  )lower).0keyvalueHOP_BY_HOP_HEADERSs      u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/dashboard/subprocesses/handle.py
<dictcomp>z-filter_hop_by_hop_headers.<locals>.<dictcomp>0   s=       C99;;000 	U000    )
isinstance	multidictCIMultiDictProxydictitems)r   filtered_headersr   s     @r   filter_hop_by_hop_headersr'      sp    	 	 	 '9566  w--   !--//  
 r    c                      e Zd ZdZ ej        d          Zdej        de	e
         defdZdedee         fd	Zd
 Zd Zd Zd Zdej        j        fdZd Zd Zej        fdej        j        dedej        j        fdZdej        j        dej        j        fdZdej        j        dej        j        fdZ dej        j        dej        j        fdZ!dS )SubprocessModuleHandlea_  
    A handle to a module created as a subprocess. Can send messages to the module and
    receive responses. It only acts as a proxy to the aiohttp server running in the
    subprocess. On destruction, the subprocess is terminated.

    Lifecycle:
    1. In SubprocessModuleHandle creation, the subprocess is started and runs an aiohttp
       server.
    2. User must call start_module() and wait_for_module_ready() first.
    3. SubprocessRouteTable.bind(handle)
    4. app.add_routes(routes=SubprocessRouteTable.bound_routes())
    5. Run the app.

    Health check (_do_periodic_health_check):
    Every 1s, do a health check by _do_once_health_check. If the module is
    unhealthy:
      1. log the exception
      2. log the last N lines of the log file
      3. fail all active requests
      4. restart the module

    TODO(ryw): define policy for health check:
    - check period (Now: 1s)
    - define unhealthy. (Now: process exits. TODO: check_health() for event loop hang)
    - check number of failures in a row before we deem it unhealthy (Now: N/A)
    - "max number of restarts"? (Now: infinite)
    spawnloop
module_clsconfigc                 v    || _         || _        || _        d| _        d | _        d | _        d | _        d | _        d S )Nr   )r+   r,   r-   incarnationparent_connprocesshttp_client_sessionhealth_check_task)selfr+   r,   r-   s       r   __init__zSubprocessModuleHandle.__init__Y   sJ     	$   DH !%r    r/   pidc                 .    d| j         j         d| d| dS )Nz"SubprocessModuleHandle(module_cls=z, incarnation=z, pid=))r,   __name__)r4   r/   r6   s      r   str_for_statez$SubprocessModuleHandle.str_for_statel   s*    uDO4Luu\guuoruuuur    c                 ^    |                      | j        | j        r| j        j        nd           S )N)r:   r/   r1   r6   r4   s    r   __str__zSubprocessModuleHandle.__str__o   s3    !!$,Hdl..D
 
 	
r    c                    | j                                         \  | _        }t          j                            | j        j                  st          j        | j        j                   | j         	                    t          | j        | j        | j        |fd| j        j         d| j                   | _        | j                                         |                                 dS )z;
        Start the module. Should be non-blocking.
        T-)targetargsdaemonnameN)
mp_contextPiper0   ospathexistsr-   
socket_dirmakedirsProcessr   r,   r/   r9   r1   startclose)r4   
child_conns     r   start_modulez#SubprocessModuleHandle.start_modulet   s     (,';';'='=$*w~~dk455 	0K.///.. 	 O,AAt/?AA / 

 

 	r    c                 >   | j                             t          j                  rg	 | j                                          n+# t
          $ r t          d| j        j         d          w xY w| j         	                                 d| _         n*t          d| j        j         dt          j         d          | j        j        }t          || j        j        | j        j                  | _        | j                            |                                           | _        dS )zs
        Wait for the module to be ready. This is called after start_module()
        and can be blocking.
        Module z) failed to start. Received EOF from pipe.Nz  failed to start. Timeout after z	 seconds.)r0   polldashboard_consts$SUBPROCESS_MODULE_WAIT_READY_TIMEOUTrecvEOFErrorRuntimeErrorr,   r9   rM   r
   r-   rI   session_namer2   r+   create_task_do_periodic_health_checkr3   )r4   module_names     r   wait_for_module_readyz,SubprocessModuleHandle.wait_for_module_ready   sF   
   !1!VWW 	 %%''''   ".do6 . . .  
 ""$$$#Db$/2 b b!1!Vb b b  
 o.#=/1I$
 $
  "&!6!6t7U7U7W7W!X!Xs   A   (A(c                   K   | xj         dz  c_         | j        r | j                                         d| _        | j        r9| j                                         | j                                         d| _        | j        r&| j                                         d{V  d| _        | j        r"| j                                         d| _        dS dS )zR
        Destroy the module. This is called when the module is unhealthy.
           N)	r/   r0   rM   r1   killjoinr2   r3   cancelr<   s    r   destroy_modulez%SubprocessModuleHandle.destroy_module   s       	A 	$""$$$#D< 	 LLDL# 	,*00222222222'+D$! 	*"))+++%)D"""	* 	*r    r   c                    K   | j                             d           d{V }t          j                            |j        t          |j                  |                                 d{V           S )aT  
        Do internal health check. The module should respond immediately with a 200 OK.
        This can be used to measure module responsiveness in RTT, it also indicates
        subprocess event loop lag.

        Currently you get a 200 OK with body = b'success'. Later if we want we can add more
        observability payloads.
        zhttp://localhost/api/healthzNstatusr   body)	r2   getr   webResponsere   r'   r   readr4   resps     r   _health_checkz$SubprocessModuleHandle._health_check   s       -112PQQQQQQQQ{##;-dl;;yy{{"""""" $ 
 
 	
r    c                    K   | j         j        t          d| j         j                   |                                  d{V }|j        dk    rt          d|j                   dS )z
        Do a health check once. We check for:
        1. if the process exits, it's considered died.
        2. if the health check endpoint returns non-200, it's considered unhealthy.

        NzProcess exited with code    z$Health check failed: status code is )r1   exitcoderW   rm   re   rk   s     r   _do_once_health_checkz,SubprocessModuleHandle._do_once_health_check   s       < ,R4<;PRRSSS''))))))));#SdkSSTTT r    c           
        K   	 	 |                                   d{V  n# t          $ r t          | j        j        | j        j                  }t                              d| j        j         d| j        j	         d| d           | 
                                 d{V  |                                  |                                  Y dS w xY wt          j        d           d{V  )z
        Every 1s, do a health check. If the module is unhealthy:
        1. log the exception
        2. log the last N lines of the log file
        3. restart the module
        TNrQ   z is unhealthy. Please refer to /z/ for more details. Failing all active requests.r^   )rq   	Exceptionr   r,   r9   r-   logging_filenamelogger	exceptionlog_dirrb   rO   r\   asynciosleep)r4   filenames     r   rZ   z0SubprocessModuleHandle._do_periodic_health_check   s>     	#002222222222   2O,dk.J    'do6 ' '{*' '-5' ' '  
 ))+++++++++!!###**,,, -"""""""""!	#s     B(CCrequest	resp_typec                 ,  K   |t           j        k    r|                     |           d{V S |t           j        k    r|                     |           d{V S |t           j        k    r|                     |           d{V S t          d|           )zQ
        Sends a new request to the subprocess and returns the response.
        NzUnknown response type: )r	   HTTP
proxy_httpSTREAMproxy_stream	WEBSOCKETproxy_websocket
ValueError)r4   r|   r}   s      r   proxy_requestz$SubprocessModuleHandle.proxy_request   s       )))111111111+++**7333333333...--g666666666>9>>???r    c                   K   d|j          }|                                 d{V }| j                            |j        ||t          |j                  d          4 d{V 	 }|                                 d{V }t          j        	                    |j
        t          |j                  |          cddd          d{V  S # 1 d{V swxY w Y   dS )z
        Proxy handler for non-streaming HTTP API
        It forwards the method, query string, headers, and body to the backend.
        http://localhostNF)datar   allow_redirectsrd   )path_qsrj   r2   r|   methodr'   r   r   rh   ri   re   )r4   r|   urlrf   backend_resp	resp_bodys         r   r   z!SubprocessModuleHandle.proxy_http   s     
 322\\^^######+33N-go>>! 4 
 
 	 	 	 	 	 	 	 	 *//11111111I;''#*1,2FGG (  	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   $AC


CCc                 p  K   d|j          }|                                 d{V }| j                            |j        ||t          |j                            4 d{V 	 }t          j        	                    |j
        t          |j                            }|                    |           d{V  |j                                        2 3 d{V \  }}|                    |           d{V  &6 |                                 d{V  |cddd          d{V  S # 1 d{V swxY w Y   dS )z~
        Proxy handler for streaming HTTP API.
        It forwards the method, query string, and body to the backend.
        r   N)r   r   )re   r   )r   rj   r2   r|   r   r'   r   r   rh   StreamResponsere   preparecontentiter_chunkswrite	write_eof)r4   r|   r   rf   r   
proxy_respchunk_s           r   r   z#SubprocessModuleHandle.proxy_stream  s      322\\^^######+33N-go>>	 4 
 
 	 	 	 	 	 	 	 	
  33#*1,2FGG 4  J $$W---------"."6"B"B"D"D . . . . . . .heQ &&u---------- #E&&(((((((((	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	s   #A-D%C6<D%%
D/2D/c                 2  K   d|j          }	 | j                            |t          |j                            4 d{V 	 }t
          j                                        }|                    |           d{V  |2 3 d{V }|j	        t
          j
        j        k    r!|                    |j                   d{V  B|j	        t
          j
        j        k    r!|                    |j                   d{V  }t                               d|j	                    6 |                                 d{V  |cddd          d{V  S # 1 d{V swxY w Y   dS # t
          j        $ rO}t                               dt+          |                      |                     |           d{V cY d}~S d}~wt.          $ rO}t                               dt+          |                      t
          j                            d          d}~ww xY w)	a=  
        Proxy handler for WebSocket API
        It establishes a WebSocket connection with the client and simultaneously connects
        to the backend server's WebSocket endpoint. Messages are forwarded in single
        direction from the backend to the client.
        If the backend responds with normal HTTP response, then try to treat it as a normal
        HTTP request and calls proxy_http instead.

        TODO: Support bidirectional communication if needed. We only support one direction
              because it's sufficient for the current use case.
        r   )r   NzUnknown msg type: zWebSocket handshake error: zWebSocket proxy error: zWebSocket proxy error)reason)r   r2   
ws_connectr'   r   r   rh   WebSocketResponser   type	WSMsgTypeTEXTsend_strr   BINARY
send_bytesrv   errorrM   WSServerHandshakeErrorwarningreprr   rt   HTTPInternalServerError)r4   r|   r   ws_to_backendws_from_clientmsges          r   r   z&SubprocessModuleHandle.proxy_websocket0  s      322	V/::6wGG ;   & & & & & & & &!(!>!>!@!@$,,W555555555!. F F F F F F F#x7#4#999,55ch??????????W%6%===,77AAAAAAAAAA%D#(%D%DEEEE "/ %**,,,,,,,,,%& & & & & & & & & & & & & & & & & & & & & & & & & & & & & & - 	2 	2 	2NNBaBBCCC11111111111111 	V 	V 	VLL<477<<===+55=T5UUU	Vs\   4E! <E?DB6E;E! 
EE! EE! !H0AF:4H:HA
HHN)"r9   
__module____qualname____doc__multiprocessingget_contextrD   ry   AbstractEventLoopr   r   r   r5   intr   r:   r=   rO   r\   rb   r   rh   ri   rm   rq   rZ   r	   r   Requestr   r   r   r   r   r   r    r   r)   r)   8   s        < -,W55J&'& )*& '	& & & &&v v8C= v v v v
 
 

  *Y Y Y8* * *.
W[%9 
 
 
 
 U U U# # #4 GSFW@ @{*@7C@		#@ @ @ @(; @T    ,{*		#   4%V{*%V		#%V %V %V %V %V %Vr    r)   )ry   loggingr   rF   typingr   r   r"   ray.dashboard.consts	dashboardconstsrS   ray.dashboard.optional_depsr   !ray.dashboard.subprocesses.moduler   r   r    ray.dashboard.subprocesses.utilsr	   r
   r   	getLoggerr9   rv   r$   strr#   r'   r)   r   r    r   <module>r      s         				 " " " " " " " "     / / / / / / / / / / / / / / /         
         
 
	8	$	$4S>9#=c#BBC	#s(^   4]V ]V ]V ]V ]V ]V ]V ]V ]V ]Vr    