
    &`i              
       X   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ 	 d dlmZ d dlmZmZ n# e$ r	 dZdZdZY nw xY w ej         e!          Z"dZ#d	ed
eeej$        f         dej$        fdZ%d	edeeeej$        f                  dej$        fdZ&	 	 d%d	edeeeeej$        f                           d
eeeej$        f                  dej$        fdZ'd	edefdZ(e G d de          Z)ndZ)d Z* G d de+          Z, G d dej-                  Z. G d d          Z/ G d d          Z0 G d de0          Z1 G d de0          Z2 ej3        d             G d! d"                      Z4 G d# d$          Z5dS )&    N)TimeoutError)AnyCallableDictHashableIterableListOptionalTuple)	usage_lib)log_once)SafeFunction)BatchedCallsparallel_backendRAY_ADDRESSobjregistry_hashablereturnc                 R    | |vrt          j        |           }||| <   n||          }|S N)rayput)r   r   rets      q/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/multiprocessing/pool.py_put_in_dict_registryr       s:     ###gcll!$#$J    registryc                      	 t           fd|D                       }n;# t          $ r. t          j                   }|                     |f           Y nw xY w|S )Nc              3   *   K   | ]\  }}|u 	|V  d S r    ).0orefr   s      r   	<genexpr>z(_put_in_list_registry.<locals>.<genexpr>/   s+      ;;FAs!s((C((((;;r   )nextStopIterationr   r   append)r   r   r   s   `  r   _put_in_list_registryr(   +   sw    $;;;;h;;;<< $ $ $gcllc
#####$ Js    5AAc                    t          | t          j                  st          j        |           dk     r| S | }|4	 t          | |          }n4# t          $ r |t          | |          }Y nw xY w|t          | |          }|S )zzray.put obj in object store if it's not an ObjRef and bigger than 100 bytes,
    with support for list and dict registriesd   )
isinstancer   	ObjectRefsys	getsizeofr   	TypeErrorr(   )r   r   r   r   s       r   ray_put_if_neededr0   6   s     #s}%% s););c)A)A

C$	;'->??CC 	; 	; 	;#+C::	; 
	#C22Js   A A*)A*c                 b    t          | t          j                  rt          j        |           S | S )z8If obj is an ObjectRef, do ray.get, otherwise return obj)r+   r   r,   get)r   s    r   ray_get_if_neededr3   K   s)    #s}%% ws||Jr   c                       e Zd ZdZ	 	 ddeeeeej	        f                           dee
eej	        f                  fdZd Zd ZdS )	RayBatchedCallszJoblib's BatchedCalls with basic Ray object store management

        This functionality is provided through the put_items_in_object_store,
        which uses external registries (list and dict) containing objects
        and their ObjectRefs.Nr   r   c                     g }| j         D ]N\  }}}fd|D             }fd|                                 D             }|                    |||f           O|| _         dS )aq  Puts all applicable (kw)args in self.items in object store

            Takes two registries - list for unhashable objects and dict
            for hashable objects. The registries are a part of a Pool object.
            The method iterates through all entries in items list (usually,
            there will be only one, but the number depends on joblib Parallel
            settings) and puts all of the args and kwargs into the object
            store, updating the registries.
            If an arg or kwarg is already in a registry, it will not be
            put again, and instead, the cached object ref will be used.c                 2    g | ]}t          |          S r    r0   )r!   argr   r   s     r   
<listcomp>z=RayBatchedCalls.put_items_in_object_store.<locals>.<listcomp>l   s4       LO%c85FGG  r   c                 :    i | ]\  }}|t          |          S r    r8   )r!   kvr   r   s      r   
<dictcomp>z=RayBatchedCalls.put_items_in_object_store.<locals>.<dictcomp>o   s=       1 (H6GHH  r   N)itemsr'   )selfr   r   	new_itemsfuncargskwargss    ``    r   put_items_in_object_storez)RayBatchedCalls.put_items_in_object_store[   s     I&*j 7 7"dF    SW       &     $f!56666"DJJJr   c                     t          | j        | j                  5  d | j        D             cd d d            S # 1 swxY w Y   d S )N)n_jobsc           	      r    g | ]4\  }}} |d  |D             i d |                                 D             5S )c                 ,    g | ]}t          |          S r    r3   )r!   r9   s     r   r:   z7RayBatchedCalls.__call__.<locals>.<listcomp>.<listcomp>   s!    AAAS+C00AAAr   c                 4    i | ]\  }}|t          |          S r    rJ   )r!   r<   r=   s      r   r>   z7RayBatchedCalls.__call__.<locals>.<listcomp>.<dictcomp>   s'    NNNtq!1/22NNNr   )r?   )r!   rB   rC   rD   s       r   r:   z,RayBatchedCalls.__call__.<locals>.<listcomp>   sk       
 +dF	 DAADAAANNv||~~NNN   r   )r   _backend_n_jobsr?   r@   s    r   __call__zRayBatchedCalls.__call__v   s     "$-EEE   
 /3j                   s   :>>c                 ~    | j         |                                   t          | j        | j        | j        fd | j        ffS r   )_reducer_callbackr5   r?   rL   rM   _pickle_cacherN   s    r   
__reduce__zRayBatchedCalls.__reduce__   sH     %1&&(((  dmT\:D$BTU r   NN)__name__
__module____qualname____doc__r
   r	   r   r   r   r,   r   r   rE   rO   rS   r    r   r   r5   r5   T   s        	! 	! CGIM	# 	#tE#s}*<$=>?	#  (Xs}-D(EF	# 	# 	# 	#6	 	 	"
	 
	 
	 
	 
	r   r5   c                     |  |z   S r   r    )abs     r   div_round_upr\      s    R1W:r   c                       e Zd Zd ZdS )PoolTaskErrorc                     || _         d S r   )
underlying)r@   r`   s     r   __init__zPoolTaskError.__init__   s    $r   N)rU   rV   rW   ra   r    r   r   r^   r^      s#        % % % % %r   r^   c                   t    e Zd ZdZdZ	 	 	 	 ddededededee	         f
d	Z
d
 Zd Zd Zd Zd Zd ZddZdS )ResultThreada  Thread that collects results from distributed actors.

    It winds down when either:
        - A pre-specified number of objects has been processed
        - When the END_SENTINEL (submitted through self.add_object_ref())
            has been received and all objects received before that have been
            processed.

    Initialize the thread with total_object_refs = float('inf') to wait for the
    END_SENTINEL.

    Args:
        object_refs (List[RayActorObjectRefs]): ObjectRefs to Ray Actor calls.
            Thread tracks whether they are ready. More ObjectRefs may be added
            with add_object_ref (or _add_object_ref internally) until the object
            count reaches total_object_refs.
        single_result: Should be True if the thread is managing function
            with a single result (like apply_async). False if the thread is managing
            a function with a List of results.
        callback: called only once at the end of the thread
            if no results were errors. If single_result=True, and result is
            not an error, callback is invoked with the result as the only
            argument. If single_result=False, callback is invoked with
            a list of all the results as the only argument.
        error_callback: called only once on the first result
            that errors. Should take an Exception as the only argument.
            If no result errors, this callback is not called.
        total_object_refs: Number of ObjectRefs that this thread
            expects to be ready. May be more than len(object_refs) since
            more ObjectRefs can be submitted after the thread starts.
            If None, defaults to len(object_refs). If float("inf"), thread runs
            until END_SENTINEL (submitted through self.add_object_ref())
            has been received and all objects received before that have
            been processed.
    NFobject_refssingle_resultcallbackerror_callbacktotal_object_refsc                 x   t           j                            | d           d| _        g | _        d| _        g | _        t          j                    | _	        || _
        || _        || _        |pt          |          | _        i | _        t          j                    | _        |D ]}|                     |           d S )NT)daemonFr   )	threadingThreadra   
_got_error_object_refs
_num_ready_resultsqueueQueue_ready_index_queue_single_result	_callback_error_callbacklen_total_object_refs_indices_new_object_refs_add_object_ref)r@   rd   re   rf   rg   rh   
object_refs          r   ra   zResultThread.__init__   s     	!!$t!444"'+--+!-"3"Gs;7G7G !&% 	- 	-J  ,,,,	- 	-r   c                     t          | j                  | j        |<   | j                            |           | j                            d            d S r   )rw   rn   ry   r'   rp   r@   r|   s     r   r{   zResultThread._add_object_ref   sL    $'(9$:$:j!  ,,,T"""""r   c                 :    | j                             |           d S r   )rz   r   r~   s     r   add_object_refzResultThread.add_object_ref   s    !!*-----r   c                 n   t          j         | j                  }g }| j        | j        k     rd }|	 t	          |          dk    }| j                            |          }|| j        u rt	          | j                  | _        n*|                     |           |	                    |           n# t          j        $ r Y nw xY wt          j        |dd          \  }}t	          |          dk    r|d         }|	 t          j        |          }n$# t          j        j        $ r}|g}Y d }~nd }~ww xY w| j        sT|D ]Q}	t#          |	t$                    r%d| _        | j        |                     |	            n|	                    |	           R| xj        dz  c_        || j        | j        |         <   | j                            | j        |                    | j        | j        k     | j        sB| j        =| j        s|                     |           d S |                     |d                    d S d S d S )Nr   )block   g?num_returnstimeoutT)copyrn   ro   rx   rw   rz   r2   END_SENTINELr{   r'   rq   Emptyr   wait
exceptionsRayErrorrm   r+   	Exceptionrv   rp   ry   rs   r   ru   rt   )
r@   unreadyaggregated_batch_resultsready_idr   new_object_refreadybatcheresults
             r   runzResultThread.run   s   )D-..#% 
 o 777 H"LLA-E%)%:%>%>U%>%K%KN%)::: 36d6G2H2H//,,^<<<~666{   D "%'q#!N!N!Nwu::>>$QxH/ "2))>*   
 ? @# @ @F!&)44 @*./; 00888077????OOq OO5:DM$-12#''h(?@@@a o 777r  	<4>#=& <788888
 7:;;;;;	< 	<#=#=s*   A;B. .C ?C ;D D1$D,,D1c                     | j         S r   )rm   rN   s    r   	got_errorzResultThread.got_error2  s
    r   c                     | j         |         S r   rp   )r@   indexs     r   r   zResultThread.result6  s    }U##r   c                     | j         S r   r   rN   s    r   resultszResultThread.results:  s
    }r   c                 n    	 | j                             |          S # t          j        $ r t          w xY w)Nr   )rs   r2   rq   r   r   r@   r   s     r   next_ready_indexzResultThread.next_ready_index>  sD    	*..w.???{ 	 	 		s    4)FNNNr   )rU   rV   rW   rX   r   listboolcallabler
   intra   r{   r   r   r   r   r   r   r    r   r   rc   rc      s        " "H L
 $!#'+/- -- - 	-
 !- $C=- - - -2# # #
. . .G< G< G<R  $ $ $       r   rc   c                   8    e Zd ZdZ	 d	dZd
dZd
dZd Zd ZdS )AsyncResultz]An asynchronous interface to task results.

    This should not be constructed directly.
    NFc                 t    || _         t          ||||          | _        | j                                         d S r   )rt   rc   _result_threadstart)r@   chunk_object_refsrf   rg   re   s        r   ra   zAsyncResult.__init__L  sD     ,*}h
 
 	!!#####r   c                 :    | j                             |           dS )z
        Returns once the result is ready or the timeout expires (does not
        raise TimeoutError).

        Args:
            timeout: timeout in milliseconds.
        N)r   joinr   s     r   r   zAsyncResult.waitU  s!     	  )))))r   c                 d   |                      |           | j                                        rt          g }| j                                        D ]O}|D ]5}t          |t                    r|j        t          |t                    r|6|	                    |           P| j
        r|d         S |S Nr   )r   r   is_aliver   r   r+   r^   r`   r   extendrt   )r@   r   r   r   r   s        r   r2   zAsyncResult.get`  s    		''')) 	(0022 	" 	"E ! !fm44 ! ++	22 ! L!NN5!!!! 	1:r   c                 6    | j                                          S )zi
        Returns true if the result is ready, else false if the tasks are still
        running.
        )r   r   rN   s    r   r   zAsyncResult.readys  s     &//1111r   c                     |                                  st          | d          | j                                         S )z
        Returns true if none of the submitted tasks errored, else false. Should
        only be called once the result is ready (can be checked using `ready`).
        z
 not ready)r   
ValueErrorr   r   rN   s    r   
successfulzAsyncResult.successful{  sC     zz|| 	4222333&002222r   )NNFr   )	rU   rV   rW   rX   ra   r   r2   r   r   r    r   r   r   r   F  s          TY$ $ $ $	* 	* 	* 	*   &2 2 23 3 3 3 3r   r   c                   2    e Zd ZdZddZd Zd Zd Zd ZdS )	IMapIteratorz=Base class for OrderedIMapIterator and UnorderedIMapIterator.Nc                 ^   || _         || _        d| _        d| _        g | _        t          j                    | _        t          |          | _	        t          |t
          j        j                  r|pd| _        t          d          }n9|p|                    |          | _        t!          t#          |          |          }t%          g |          | _        | j                                         t+          t#          | j         j                            D ]}|                                  d S )Nr   Fr   inf)rh   )_pool_func_next_chunk_index_finished_iterating_submitted_chunkscollectionsdeque_ready_objectsiter	_iteratorr+   abcIterator
_chunksizefloat_calculate_chunksizer\   rw   rc   r   r   range_actor_pool_submit_next_chunk)r@   poolrB   iterable	chunksizeresult_list_size_s          r   ra   zIMapIterator.__init__  s   

!"#(  "$)/11hh 899 	F (n1DO$U||'N4+D+DX+N+NDO+CMM9EE*2AQRRR!!###s4:12233 	& 	&A##%%%%	& 	&r   c                 f   | j         rd S t          | j                  t          | j        j                  z  }t          j        | j        | j                  }t          |          }t          |          | j        k     rd| _         t          |          dk    rd S t          |          }| j                            | j        || j        |          }| j                            d           | j                            |           | j         r&| j                            t           j                   d S d S )NTr   F)r   rw   r   r   r   	itertoolsislicer   r   r   r   _submit_chunkr   r'   r   r   rc   r   )r@   actor_indexchunk_iterator
chunk_listnew_chunk_ids        r   r   zIMapIterator._submit_next_chunk  s#   # 	F$011C
8N4O4OO")$.$/JJ .))
z??T_,,'+D$:!##j))z//J
 
 	%%e,,,**<888# 	J..|/HIIIII	J 	Jr   c                     | S r   r    rN   s    r   __iter__zIMapIterator.__iter__  s    r   c                 *    |                                  S r   )r%   rN   s    r   __next__zIMapIterator.__next__  s    yy{{r   c                     t           r   )NotImplementedErrorrN   s    r   r%   zIMapIterator.next  s    !!r   r   )	rU   rV   rW   rX   ra   r   r   r   r%   r    r   r   r   r     sm        GG& & & &4J J J:    " " " " "r   r   c                       e Zd ZdZddZdS )OrderedIMapIteratoraO  Iterator to the results of tasks submitted using `imap`.

    The results are returned in the same order that they were submitted, even
    if they don't finish in that order. Only one batch of tasks per actor
    process is submitted at a time - the rest are submitted as results come in.

    Should not be constructed directly.
    Nc                 *   t          | j                  dk    rb| j        r$| j        t          | j                  k    rt
          d}|| j        k    rt          j                    }| j                            |          }| 	                                 d| j        |<   |'t          d|t          j                    |z
  z
            }|| j        k    | j        t          | j                  k     r| j        | j                 r{| j                            | j                  D ]}| j                            |           | xj        dz  c_        | j        t          | j                  k     r| j        | j                 {| j                                        S )Nr   r   Tr   )rw   r   r   r   r   r&   timer   r   r   maxr   r'   popleft)r@   r   r   r   r   s        r   r%   zOrderedIMapIterator.next  s   t"##q((' $&#d.D*E*EEE $# E4111	+<<W<MM'')))04&u-&!!W	e0C%DEEG 4111 &T-C)D)DDD*4+AB E #1889OPP 7 7F'..v6666&&!+&& &T-C)D)DDD*4+AB E "**,,,r   r   rU   rV   rW   rX   r%   r    r   r   r   r     s2         - - - - - -r   r   c                       e Zd ZdZddZdS )UnorderedIMapIteratora  Iterator to the results of tasks submitted using `imap`.

    The results are returned in the order that they finish. Only one batch of
    tasks per actor process is submitted at a time - the rest are submitted as
    results come in.

    Should not be constructed directly.
    Nc                    t          | j                  dk    r| j        r$| j        t          | j                  k    rt
          | j                            |          }|                                  | j        	                    |          D ]}| j        
                    |           | xj        dz  c_        | j                                        S )Nr   r   r   )rw   r   r   r   r   r&   r   r   r   r   r'   r   )r@   r   r   r   s       r   r%   zUnorderedIMapIterator.next  s    t"##q((' $&#d.D*E*EEE $#'888IIE##%%%-44U;; 3 3#**62222""a'"""**,,,r   r   r   r    r   r   r   r     s2         - - - - - -r   r   num_cpusc                   &    e Zd ZdZddZd Zd ZdS )	PoolActorz0Actor used to process tasks submitted to a Pool.Nc                      |r|pd} ||  d S d S Nr    r    )r@   initializerinitargss      r   ra   zPoolActor.__init__  s0     	#~2HK""""	# 	#r   c                     d S r   r    rN   s    r   pingzPoolActor.ping  s    r   c                     g }|D ]b\  }}|pd}|pi }	 |                      ||i |           *# t          $ r,}|                     t          |                     Y d }~[d }~ww xY w|S r   )r'   r   r^   )r@   rB   r   r   rC   rD   r   s          r   	run_batchzPoolActor.run_batch  s    ! 	1 	1LD&:2D\rF1ttT4V445555 1 1 1}Q//000000001s   /
A%"A  A%rT   )rU   rV   rW   rX   ra   r   r   r    r   r   r   r     sL        ::# # # #
  	 	 	 	 	r   r   c                      e Zd ZdZ	 	 	 	 	 	 	 d/dee         dee         dee         dee         dedee	         d	ee
e	ef                  fd
Zd0dZd Zd1dZd Zd Zd Zd Z	 	 d0dedee         dee
         fdZ	 	 	 	 d2dedee         dee
         deegdf         deegdf         f
dZdedefdZd Zd3dZd4dZ	 	 	 	 d5dZd1ded ed!ee         fd"Z	 	 	 d6ded ed!ee         deegdf         deegdf         f
d#Zd1d$Z	 	 d0ded edeegdf         deegdf         fd%Z d7ded ed!ee         fd'Z!	 d7ded ed!ee         fd(Z"d) Z#d* Z$d+ Z%d, Z&d- Z'd. Z(dS )8PoolaA  A pool of actor processes that is used to process tasks in parallel.

    Args:
        processes: number of actor processes to start in the pool. Defaults to
            the number of cores in the Ray cluster if one is already running,
            otherwise the number of cores on this machine.
        initializer: function to be run in each actor when it starts up.
        initargs: iterable of arguments to the initializer function.
        maxtasksperchild: maximum number of tasks to run in each actor process.
            After a process has executed this many tasks, it will be killed and
            replaced with a new one.
        ray_address: address of the Ray cluster to run on. If None, a new local
            Ray cluster will be started on this machine. Otherwise, this will
            be passed to `ray.init()` to connect to a running cluster. This may
            also be specified using the `RAY_ADDRESS` environment variable.
        ray_remote_args: arguments used to configure the Ray Actors making up
            the pool. See :func:`ray.remote` for details.
    N	processesr   r   maxtasksperchildcontextray_addressray_remote_argsc                 n   t          j        d           d| _        || _        || _        |pd| _        g | _        g | _        i | _        d| _	        |pi | _
        d | _        |r)t          d          rt                              d           |                     ||          }|                     |           d S )Nzutil.multiprocessing.PoolFr   r   context_argument_warningz{The 'context' argument is not supported using ray. Please refer to the documentation for how to control ray initialization.)r   record_library_usage_closed_initializer	_initargs_maxtasksperchild_actor_deletion_ids	_registry_registry_hashable_current_index_ray_remote_args_pool_actorr   loggerwarning	_init_ray_start_actor_pool)r@   r   r   r   r   r   r   r   s           r   ra   zPool.__init__?  s     	&'BCCC'!!1!7R#% :<AC / 52 	x :;; 	NN1   NN9k::	y)))))r   c                 ~   t          j                    s|wt          t          j        v s#t           j        j                                        Ai }t          j                            t                    dk    r||d<   t          j	        d	i | n8|!i }|dk    r||d<   t          j	        d	d|i| nt          j	        |           t          t           j        j                                        d                   }||}|dk    rt          d          ||k     r#t          d                    ||                    |S )
Nlocalr   addressr   CPUr   z!Processes in the pool must be >0.zrTried to start a pool with {} processes on an existing ray cluster, but there are only {} CPUs in the ray cluster.r    )r   is_initializedRAY_ADDRESS_ENVosenviron_privateutilsread_ray_addressr2   initr   statecluster_resourcesr   format)r@   r   r   init_kwargsray_cpuss        r   r  zPool._init_ray`  sR    !## 	-"2:--<%6688D :>>/22g==.7K
+'';''''( ')).7K
+<<<<<<< ),,,,s|);;==eDEE I>>@AAAi++16)X+F+F   r   c                      d  _          fdt          |          D              _        t          j        d  j        D                        d S )Nc                 8    g | ]}                                 S r    )_new_actor_entry)r!   r   r@   s     r   r:   z*Pool._start_actor_pool.<locals>.<listcomp>  s%    NNND1133NNNr   c                 F    g | ]\  }}|j                                         S r    )r   remote)r!   actorr   s      r   r:   z*Pool._start_actor_pool.<locals>.<listcomp>  s*    FFF""$$FFFr   )r	  r   r   r   r2   )r@   r   s   ` r   r  zPool._start_actor_pool  sW    NNNNU9=M=MNNNFFT5EFFFGGGGGr   c                     t          | j                  dk    rd S |t          |          }t          j        | j        t          | j                  |          \  }}|| _        d S )Nr   r   )rw   r  r   r   r   )r@   r   r   deletings       r   _wait_for_stopping_actorszPool._wait_for_stopping_actors  so    t'((A--FGnnGh$D455
 
 
8
 $,   r   c                     |                      d           | j                            |j                                                   d S )Ng        r   )r'  r  r'   __ray_terminate__r#  )r@   r$  s     r   _stop_actorzPool._stop_actor  sG    &&s&333 	 ''(?(F(F(H(HIIIIIr   c                     | j         st          j        di | j        | _         | j                             | j        | j                  dfS )Nr   r    )r	  r   optionsr  r#  r  r  rN   s    r   r!  zPool._new_actor_entry  sO    
  	J(0II43HIID ''(94>JJANNr   c                     | j         t          | j                  dz
  k    rd| _         n| xj         dz  c_         | j         S )Nr   r   )r  rw   r   rN   s    r   _next_actor_indexzPool._next_actor_index  sH    #d&6"7"7!";;;"#D1$""r   c                    | j         |         \  }}|j                            ||          }|dz  }| j        dk    s|| j        k    sJ || j        k    r,|                     |           |                                 \  }}||f| j         |<   |S )Nr   r   )r   r   r#  r  r*  r!  )r@   r   rB   r   r$  countr|   s          r   
_run_batchzPool._run_batch  s    '4u_++D%88

%++u8N/N/N/NND***U###0022LE5).%r   rB   rC   rD   c                 T    |                      |||                                          S )a8  Run the given function on a random actor process and return the
        result synchronously.

        Args:
            func: function to run.
            args: optional arguments to the function.
            kwargs: optional keyword arguments to the function.

        Returns:
            The result.
        )apply_asyncr2   )r@   rB   rC   rD   s       r   applyz
Pool.apply  s(    $ dF3377999r   rf   rg   c                     |                                   |                     |          }|                     |                                 |||fg          }t	          |g||d          S )a  Run the given function on a random actor process and return an
        asynchronous interface to the result.

        Args:
            func: function to run.
            args: optional arguments to the function.
            kwargs: optional keyword arguments to the function.
            callback: callback to be executed on the result once it is finished
                only if it succeeds.
            error_callback: callback to be executed the result once it is
                finished only if the task errors. The exception raised by the
                task will be passed as the only argument to the callback.

        Returns:
            AsyncResult containing the result.
        T)re   )_check_running'_convert_to_ray_batched_calls_if_neededr1  r.  r   )r@   rB   rC   rD   rf   rg   r|   s          r   r3  zPool.apply_async  sk    2 	;;DAA__T%;%;%=%=ttVnEUVV
J<>QUVVVVr   r   c                    t           |S |}t          |t                    r|j        }t          |t                    rNt          |j        |j        |j        f|j        |j	                  }|
                    | j        | j                   n|}|S )a  Convert joblib's BatchedCalls to RayBatchedCalls for ObjectRef caching.

        This converts joblib's BatchedCalls callable, which is a collection of
        functions with their args and kwargs to be ran sequentially in an
        Actor, to a RayBatchedCalls callable, which provides identical
        functionality in addition to a method which ensures that common
        args and kwargs are put into the object store just once, saving time
        and memory. That method is then ran.

        If func is not a BatchedCalls instance, it is returned without changes.

        The ObjectRefs are cached inside two registries (_registry and
        _registry_hashable), which are common for the entire Pool and are
        cleaned on close.)r5   r+   r   rB   r   r?   rL   rM   rQ   rR   rE   r  r  )r@   rB   orginal_funcs      r   r7  z,Pool._convert_to_ray_batched_calls_if_needed  s     "K dL)) 	9DdL)) 	 "
-&"	 D **4>4;RSSSSDr   c                 ~    t          t          |          t          | j                  dz            \  }}|r|dz  }|S )N   r   )divmodrw   r   )r@   r   r   extras       r   r   zPool._calculate_chunksize  sC    !#h--T5E1F1F1JKK	5 	NIr   Fc                 $   g }t          |          |k     rP	 t          |          }|s|f}|                    |i f           n# t          $ r Y nw xY wt          |          |k     Pt          |          dk    sJ |                     |||          S r   )rw   r%   r'   r&   r1  )r@   rB   iteratorr   r   unpack_argschunkrC   s           r   r   zPool._submit_chunk  s    %jj9$$H~~" # 7DdBZ((((     %jj9$$ 5zzA~~~~{D%888s   +A 
AAc           	         t          |d          st          |          }||                     |          }t          |          }g }t	          |          |z  t	          |          k     rtt	          |          t	          | j                  z  }|                    |                     |||||                     t	          |          |z  t	          |          k     t|S )N__len__)r@  )hasattrr   r   r   rw   r   r'   r   )r@   rB   r   r   r@  r?  r   r   s           r   _chunk_and_runzPool._chunk_and_run&  s    x++ 	&H~~H11(;;I>>#$$y03x==@@/003t7G3H3HHK$$""(I{ #     #$$y03x==@@ ! r   c                 ~    |                                   |                     ||||          }t          |||          S )Nr   r@  )r6  rE  r   )r@   rB   r   r   r@  rf   rg   rd   s           r   
_map_asynczPool._map_async9  sM     	))(i[ * 
 
 ;.AAAr   r   r   c                 X    |                      |||d                                          S )a  Run the given function on each element in the iterable round-robin
        on the actor processes and return the results synchronously.

        Args:
            func: function to run.
            iterable: iterable of objects to be passed as the sole argument to
                func.
            chunksize: number of tasks to submit as a batch to each actor
                process. If unspecified, a suitable chunksize will be chosen.

        Returns:
            A list of results.
        FrG  rH  r2   r@   rB   r   r   s       r   mapzPool.mapH  s1     (iU  
 

#%%	r   c                 8    |                      |||d||          S )a  Run the given function on each element in the iterable round-robin
        on the actor processes and return an asynchronous interface to the
        results.

        Args:
            func: function to run.
            iterable: iterable of objects to be passed as the only argument to
                func.
            chunksize: number of tasks to submit as a batch to each actor
                process. If unspecified, a suitable chunksize will be chosen.
            callback: Will only be called if none of the results were errors,
                and will only be called once after all results are finished.
                A Python List of all the finished results will be passed as the
                only argument to the callback.
            error_callback: callback executed on the first errored result.
                The Exception raised by the task will be passed as the only
                argument to the callback.

        Returns:
            AsyncResult
        F)r   r@  rf   rg   rH  )r@   rB   r   r   rf   rg   s         r   	map_asynczPool.map_async[  s1    : )  
 
 	
r   c                 X    |                      |||d                                          S )zSame as `map`, but unpacks each element of the iterable as the
        arguments to func like: [func(*args) for args in iterable].
        TrG  rJ  rK  s       r   starmapzPool.starmap  s1    
 (iT  
 

#%%	r   c                 6    |                      ||d||          S )zSame as `map_async`, but unpacks each element of the iterable as the
        arguments to func like: [func(*args) for args in iterable].
        T)r@  rf   rg   rN  )r@   rB   r   rf   rg   s        r   starmap_asynczPool.starmap_async  s.     )  
 
 	
r   r   c                 P    |                                   t          | |||          S )a  Same as `map`, but only submits one batch of tasks to each actor
        process at a time.

        This can be useful if the iterable of arguments is very large or each
        task's arguments consumes a large amount of resources.

        The results are returned in the order corresponding to their arguments
        in the iterable.

        Returns:
            OrderedIMapIterator
        r   )r6  r   rK  s       r   imapz	Pool.imap  s-     	"4x9MMMMr   c                 P    |                                   t          | |||          S )ag  Same as `map`, but only submits one batch of tasks to each actor
        process at a time.

        This can be useful if the iterable of arguments is very large or each
        task's arguments consumes a large amount of resources.

        The results are returned in the order that they finish.

        Returns:
            UnorderedIMapIterator
        rU  )r6  r   rK  s       r   imap_unorderedzPool.imap_unordered  s-     	$T4YOOOOr   c                 2    | j         rt          d          d S )NzPool not running)r   r   rN   s    r   r6  zPool._check_running  s%    < 	1/000	1 	1r   c                 .    |                                   | S r   )r6  rN   s    r   	__enter__zPool.__enter__  s    r   c                 .    |                                   d S r   )	terminate)r@   exc_typeexc_valexc_tbs       r   __exit__zPool.__exit__  s    r   c                     | j                                          | j                                         | j        D ]\  }}|                     |           d| _        t          j                     dS )zClose the pool.

        Prevents any more tasks from being submitted on the pool but allows
        outstanding work to finish.
        TN)r  clearr  r   r*  r   gccollectr@   r$  r   s      r   closez
Pool.close  sp     	%%'''( 	$ 	$HE1U####

r   c                 ~    | j         s|                                  | j        D ]\  }}t          j        |           dS )z~Close the pool.

        Prevents any more tasks from being submitted on the pool and stops
        outstanding work.
        N)r   rg  r   r   killrf  s      r   r]  zPool.terminate  sJ     | 	JJLLL( 	 	HE1HUOOOO	 	r   c                 Z    | j         st          d          |                                  dS )zWait for the actors in a closed pool to exit.

        If the pool was closed using `close`, this will return once all
        outstanding work is completed.

        If the pool was closed using `terminate`, this will return quickly.
        zPool is still runningN)r   r   r'  rN   s    r   r   z	Pool.join  s5     | 	64555&&(((((r   )NNNNNNNrT   r   )NNNN)F)NF)NFNN)NNN)r   ))rU   rV   rW   rX   r
   r   r   r   r   strr   ra   r  r  r'  r*  r!  r.  r1  r   r4  r   r3  r7  r   r   rE  rH  rL  r	   rO  rQ  rS  rV  rX  r6  r[  ra  rg  r]  r   r    r   r   r   r   +  sd        * $(*.'+*.%)48* *C=* h'* 8$	*
 #3-* * c]* "$sCx.1* * * *B# # # #JH H H
, , , ,J J JO O O# # #	 	 	 !%!%	: :: uo: 	: : : :. !%!%*.6:W WW uoW 	W
 C5$;'W !)d!23W W W W<"H " " " " "H  9 9 9 9 ! ! ! !. B B B B  H #    . $(+/6:$
 $
$
 $
 C=	$

 D64<($
 !)d!23$
 $
 $
 $
L    ,06:
 

 
 D64<(	

 !)d!23
 
 
 
&N N NX N(3- N N N N$ NOP PP(0P=Ec]P P P P$1 1 1      
 
 
) ) ) ) )r   r   rT   )6r   r   rd  r   loggingr  rq   r-   rk   r   multiprocessingr   typingr   r   r   r   r   r	   r
   r   r   ray._common.usager   ray.utilr   joblib._parallel_backendsr   joblib.parallelr   r   ImportError	getLoggerrU   r
  r  r,   r   r(   r0   r3   r5   r\   r   r^   rl   rc   r   r   r   r   r#  r   r   r    r   r   <module>ru     sC        				      				  



      ( ( ( ( ( ( Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q 



 ' ' ' ' ' '      666666>>>>>>>>>   LLLL 
	8	$	$	!%h&=!>]   	U3#567]    ;?AE 	tE#s}"4567  Xs}%< => 	]	   *3 3     = = = = =, = = = =@ O  % % % % %I % % %
b b b b b9# b b bJ=3 =3 =3 =3 =3 =3 =3 =3@B" B" B" B" B" B" B" B"J&- &- &- &- &-, &- &- &-R- - - - -L - - -: Q       2E) E) E) E) E) E) E) E) E) E)s   A# #A10A1