
    &`i&                        d dl Z d dlZd dlmZmZ d dlmZmZmZm	Z	m
Z
mZmZ d dlZd dlmZmZ  e j        e          Z G d d          Z	 	 dd	e
eeeeef                  d
e	e         dedeeee         f         fdZ	 	 ddee         dee         dede	ee                  d
e	e         dee         fdZ	 ddee         d
e	e         deee         ee         f         fdZdee         dee         fdZdS )    N)defaultdictdeque)AnyDictListOptionalSequenceTupleType)
ActorClassActorHandlec                   J    e Zd ZdZd Zd ZddZddZd Ze	d	             Z
d
S )TaskPoolzCHelper class for tracking the status of many in-flight actor tasks.c                 H    i | _         i | _        t                      | _        d S N)_tasks_objectsr   	_fetchingselfs    j/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/rllib/utils/actors.py__init__zTaskPool.__init__   s        c                 n    t          |t                    r	|d         }n|}|| j        |<   || j        |<   d S )Nr   )
isinstancelistr   r   )r   workerall_obj_refsobj_refs       r   addzTaskPool.add   sB    lD)) 	#"1oGG"G%G!-gr   Fc              #   :  K   t          | j                  }|rt          j        |t	          |          d          \  }}|s|rt          j        |dd          \  }}|D ]:}| j                            |          | j                            |          fV  9d S d S )Nr   )num_returnstimeout   g      $@)r   r   raywaitlenpopr   )r   blocking_waitpendingready_r   s         r   	completedzTaskPool.completed   s      t{## 	MxS\\1MMMHE1 J] J8GDIIIq  M M{w//1B1B71K1KLLLLL	M 	MM Mr     c              #      K   |                      |          D ]!\  }}| j                            ||f           "t          |          D ]'}| j        s dS | j                                        V  (dS )zhSimilar to completed but only returns once the object is local.

        Assumes obj_ref only is one id.)r)   N)r-   r   appendrangepopleft)r   r)   	max_yieldr   r   r,   s         r   completed_prefetchzTaskPool.completed_prefetch%   s      
  $~~M~JJ 	5 	5OFGN!!67"34444y!! 	+ 	+A> .((******		+ 	+r   c                 P   | j                                                                         D ]\  }}||vr| j         |= | j        |= t	          t          | j                            D ]>}| j                                        \  }}||v r| j                            ||f           ?dS )z(Notify that some workers may be removed.N)	r   copyitemsr   r1   r'   r   r2   r0   )r   workersr   evr,   s        r   reset_workerszTaskPool.reset_workers3   s    ;++--3355 	+ 	+KGR  K(M'* s4>**++ 	5 	5A.0022KBW}}%%r7m444		5 	5r   c                 *    t          | j                  S r   )r'   r   r   s    r   countzTaskPool.countB   s    4;r   N)F)Fr.   )__name__
__module____qualname____doc__r   r    r-   r4   r:   propertyr<    r   r   r   r      s        MM! ! !
. . .M M M M+ + + +5 5 5     X     r   r   	localhost
   actor_specsnodemax_attemptsreturnc           
         |dk    rt          j                    }d t          t          |                     D             }t          |          D ]#}d}t	          |           D ]\  }\  }}}	}
|pg }|	pi }	t          ||                   |
k     rt          |||	|
|dz   z  |          }|1t          j        |d         j        	                                          }||         
                    |           t          ||                   |
k     rd}t          ||                   |
k    r?||         |
d         D ]}|j        	                                 ||         d|
         ||<   	|r|c S %t          d	          )
aO  Create co-located actors of any type(s) on any node.

    Args:
        actor_specs: Tuple/list with tuples consisting of: 1) The
            (already @ray.remote) class(es) to construct, 2) c'tor args,
            3) c'tor kwargs, and 4) the number of actors of that class with
            given args/kwargs to construct.
        node: The node to co-locate the actors on. By default ("localhost"),
            place the actors on the node the caller of this function is
            located on. Use None for indicating that any (resource fulfilling)
            node in the cluster may be used.
        max_attempts: The maximum number of co-location attempts to
            perform before throwing an error.

    Returns:
        A dict mapping the created types to the list of n ActorHandles
        created (and co-located) for that type.
    rC   c                     g | ]}g S rB   rB   ).0r,   s     r   
<listcomp>z+create_colocated_actors.<locals>.<listcomp>b   s    	.	.	."	.	.	.r   Tr$   )clsargskwargsr<   rF   Nr   Fz5Unable to create enough colocated actors -> aborting.)platformrF   r1   r'   	enumeratetry_create_colocatedr%   getget_hostremoteextend__ray_terminate__	Exception)rE   rF   rG   okattemptall_goodityprN   rO   r<   
co_locatedas                r   create_colocated_actorsr`   G   s   . {} 
/	.eC,,--	.	.	.B && & & -6{-C-C 	& 	&)A)T65:2D\rF 2a5zzE!!1!7Q;/  
 <7:a=#9#@#@#B#BCCD1Z((( r!u::%%$H 2a5zzE!!Auvv 1 1A'..00001fuf1  	III	 K
L
LLr   rM   rN   r<   rO   c                 Z    |dk    rt          j                    }pi  fdt          |          D             }t          ||          \  }}t                              d                    t          |          |                     |D ]}|j        	                                 |S )a  Tries to co-locate (same node) a set of Actors of the same type.

    Returns a list of successfully co-located actors. All actors that could
    not be co-located (with the others on the given node) will not be in this
    list.

    Creates each actor via it's remote() constructor and then checks, whether
    it has been co-located (on the same node) with the other (already created)
    ones. If not, terminates the just created actor.

    Args:
        cls: The Actor class to use (already @ray.remote "converted").
        args: List of args to pass to the Actor's constructor. One item
            per to-be-created actor (`count`).
        count: Number of actors of the given `cls` to construct.
        kwargs: Optional list of kwargs to pass to the Actor's constructor.
            One item per to-be-created actor (`count`).
        node: The node to co-locate the actors on. By default ("localhost"),
            place the actors on the node the caller of this function is
            located on. If None, will try to co-locate all actors on
            any available node.

    Returns:
        List containing all successfully co-located actor handles.
    rC   c                 *    g | ]} j         i S rB   )rU   )rK   r,   rN   rM   rO   s     r   rL   z(try_create_colocated.<locals>.<listcomp>   s*    @@@ajcj$)&))@@@r   )rF   zGot {} colocated actors of {})
rP   rF   r1   split_colocatedloggerinfoformatr'   rW   rU   )	rM   rN   r<   rO   rF   actorsr^   non_co_locatedr_   s	   `` `     r   rR   rR      s    @ {}\rF@@@@@@5<<@@@F!0d!C!C!CJ
KK/66s:NNOOO % %	""$$$$r   rg   c                    |dk    rt          j                    }t          j        d | D                       }|t	          t
                    }t          ||           D ] \  }}||                             |           !d}d}|D ]2}|t          ||                   k     rt          ||                   }|}3g }|D ]0}||k    r(|	                    t          ||                              1t          ||                   |fS g }	g }t          ||           D ]6\  }}
||k    r|	                    |
           !|                    |
           7|	|fS )aM  Splits up given actors into colocated (on same node) and non colocated.

    The co-location criterion depends on the `node` given:
    If given (or default: platform.node()): Consider all actors that are on
    that node "colocated".
    If None: Consider the largest sub-set of actors that are all located on
    the same node (whatever that node is) as "colocated".

    Args:
        actors: The list of actor handles to split into "colocated" and
            "non colocated".
        node: The node defining "colocation" criterion. If provided, consider
            thos actors "colocated" that sit on this node. If None, use the
            largest subset within `actors` that are sitting on the same
            (any) node.

    Returns:
        Tuple of two lists: 1) Co-located ActorHandles, 2) non co-located
        ActorHandles.
    rC   c                 @    g | ]}|j                                         S rB   )rT   rU   )rK   r_   s     r   rL   z#split_colocated.<locals>.<listcomp>   s&    999QQZ&&((999r   N)rP   rF   r%   rS   r   setzipr    r'   rV   r   r0   )rg   rF   hostsnode_groupshostactormax_largest_grouprh   r^   r_   s              r   rc   rc      s   0 {} G99&999::E |!#&&uf-- 	) 	)KD%!!%(((( 	% 	%Dc+d+,,,,;t,-- $ 	? 	?D}$$%%d;t+<&=&=>>>K.//??
 
5&)) 	) 	)GD!t||!!!$$$$ %%a((((>))r   c                 f    t          |           \  }}|D ]}|j                                         |S r   )rc   rW   rU   )rg   	colocatednon_colocatedr_   s       r   drop_colocatedrw      sA    .v66I} % %	""$$$$r   )rC   rD   )NrC   )rC   )loggingrP   collectionsr   r   typingr   r   r   r   r	   r
   r   r%   	ray.actorr   r   	getLoggerr=   rd   r   intstrr`   rR   rc   rw   rB   r   r   <module>r      s@     * * * * * * * * C C C C C C C C C C C C C C C C C C 



 - - - - - - - -		8	$	$8  8  8  8  8  8  8  8 z &IM IM%c3 345IM
3-IM IM 
$[!
!"	IM IM IM IM` #'%) )	j	)
s)) ) T#Y	)
 3-) 
+) ) ) )\ &<* <*<*
3-<* 4d;//0<* <* <* <*~4, k1B      r   