
    &`i'                     :   d dl Z d dlZd dlmZ d dlmZmZmZmZ d dl	Z	d dl
mZ  ed           G d dej                              Z ed           G d	 d
ej                              Z ed           G d d                      Z G d d          ZdS )    N)Iterable)AnyDictListOptional)	PublicAPIbeta)	stabilityc                       e Zd ZdS )EmptyN__name__
__module____qualname__     b/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/queue.pyr   r   
           Dr   r   c                       e Zd ZdS )FullNr   r   r   r   r   r      r   r   r   c            	       d   e Zd ZdZd dedee         ddfdZdefdZdefd	Z	defd
Z
defdZdefdZ	 d!dededee         ddfdZ	 d!dededee         ddfdZd!dedee         defdZ	 d!dedee         defdZdeddfdZdeddfdZdefdZdedee         fdZd"dededdfdZdS )#Queuea  A first-in, first-out queue implementation on Ray.

    The behavior and use cases are similar to those of the asyncio.Queue class.

    Features both sync and async put and get methods.  Provides the option to
    block until space is available when calling put on a full queue,
    or to block until items are available when calling get on an empty queue.

    Optionally supports batched put and get operations to minimize
    serialization overhead.

    Args:
        maxsize (optional, int): maximum size of the queue. If zero, size is
            unbounded.
        actor_options (optional, Dict): Dictionary of options to pass into
            the QueueActor during creation. These are directly passed into
            QueueActor.options(...). This could be useful if you
            need to pass in custom resource requirements, for example.

    Examples:
        .. testcode::

            from ray.util.queue import Queue
            q = Queue()
            items = list(range(10))
            for item in items:
                q.put(item)
            for item in items:
                assert item == q.get()
            # Create Queue with the underlying actor reserving 1 CPU.
            q = Queue(actor_options={"num_cpus": 1})
    r   Nmaxsizeactor_optionsreturnc                     ddl m}  |d           |pi }|| _         t          j        t
                    j        di |                    | j                  | _        d S )Nr   )record_library_usagez
util.Queuer   )ray._common.usage.usage_libr   r   rayremote_QueueActoroptionsactor)selfr   r   r   s       r   __init__zQueue.__init__7   sq    DDDDDD\***%++CJ{##+<<m<<CCDLQQ 	


r   c                 *    |                                  S Nsizer$   s    r   __len__zQueue.__len__B   s    yy{{r   c                 b    t          j        | j        j                                                  S zThe size of the queue.)r   getr#   qsizer    r*   s    r   r)   z
Queue.sizeE   #    wtz'..00111r   c                 *    |                                  S r-   r(   r*   s    r   r/   zQueue.qsizeI   s    yy{{r   c                 b    t          j        | j        j                                                  S )zWhether the queue is empty.)r   r.   r#   emptyr    r*   s    r   r3   zQueue.emptyM   r0   r   c                 b    t          j        | j        j                                                  S )zWhether the queue is full.)r   r.   r#   fullr    r*   s    r   r5   z
Queue.fullQ   s"    wtz--//000r   Titemblocktimeoutc                 8   |sN	 t          j        | j        j                            |                     dS # t
          j        $ r t          w xY w||dk     rt          d          t          j        | j        j	                            ||                     dS )a  Adds an item to the queue.

        If block is True and the queue is full, blocks until the queue is no
        longer full or until timeout.

        There is no guarantee of order if multiple producers put to the same
        full queue.

        Raises:
            Full: if the queue is full and blocking is False.
            Full: if the queue is full, blocking is True, and it timed out.
            ValueError: if timeout is negative.
        Nr   ''timeout' must be a non-negative number)
r   r.   r#   
put_nowaitr    asyncio	QueueFullr   
ValueErrorputr$   r6   r7   r8   s       r   r?   z	Queue.putU   s       		>
-44T::;;;;;$   
 "w{{ !JKKK
--dG<<=====s	   17 Ac                   K   |sB	 | j         j                            |           d{V  dS # t          j        $ r t
          w xY w||dk     rt          d          | j         j                            ||           d{V  dS )a  Adds an item to the queue.

        If block is True and the queue is full,
        blocks until the queue is no longer full or until timeout.

        There is no guarantee of order if multiple producers put to the same
        full queue.

        Raises:
            Full: if the queue is full and blocking is False.
            Full: if the queue is full, blocking is True, and it timed out.
            ValueError: if timeout is negative.
        Nr   r:   )r#   r;   r    r<   r=   r   r>   r?   r@   s       r   	put_asynczQueue.put_asyncp   s         		;j+22488888888888$   
 "w{{ !JKKKjn++D':::::::::::s	   %- Ac                 ,   |sK	 t          j        | j        j                                                  S # t
          j        $ r t          w xY w||dk     rt          d          t          j        | j        j                            |                    S )a	  Gets an item from the queue.

        If block is True and the queue is empty, blocks until the queue is no
        longer empty or until timeout.

        There is no guarantee of order if multiple consumers get from the
        same empty queue.

        Returns:
            The next item in the queue.

        Raises:
            Empty: if the queue is empty and blocking is False.
            Empty: if the queue is empty, blocking is True, and it timed out.
            ValueError: if timeout is negative.
        Nr   r:   )	r   r.   r#   
get_nowaitr    r<   
QueueEmptyr   r>   r$   r7   r8   s      r   r.   z	Queue.get   s    "  		?wtz4;;==>>>%    "w{{ !JKKKwtz~44W==>>>s	   /4 Ac                    K   |s?	 | j         j                                         d{V S # t          j        $ r t
          w xY w||dk     rt          d          | j         j                            |           d{V S )a  Gets an item from the queue.

        There is no guarantee of order if multiple consumers get from the
        same empty queue.

        Returns:
            The next item in the queue.
        Raises:
            Empty: if the queue is empty and blocking is False.
            Empty: if the queue is empty, blocking is True, and it timed out.
            ValueError: if timeout is negative.
        Nr   r:   )r#   rD   r    r<   rE   r   r>   r.   rF   s      r   	get_asynczQueue.get_async   s        		<!Z299;;;;;;;;;%    "w{{ !JKKK!Z^227;;;;;;;;;s	   #* Ac                 0    |                      |d          S )zgEquivalent to put(item, block=False).

        Raises:
            Full: if the queue is full.
        Fr7   )r?   r$   r6   s     r   r;   zQueue.put_nowait   s     xxEx***r   itemsc                     t          |t                    st          d          t          j        | j        j                            |                     dS )zTakes in a list of items and puts them into the queue in order.

        Raises:
            Full: if the items will not fit in the queue
        z$Argument 'items' must be an IterableN)
isinstancer   	TypeErrorr   r.   r#   put_nowait_batchr    )r$   rL   s     r   rP   zQueue.put_nowait_batch   sO     %** 	DBCCC
+22599:::::r   c                 .    |                      d          S )zcEquivalent to get(block=False).

        Raises:
            Empty: if the queue is empty.
        FrJ   )r.   r*   s    r   rD   zQueue.get_nowait   s     xxex$$$r   	num_itemsc                     t          |t                    st          d          |dk     rt          d          t	          j        | j        j                            |                    S )zGets items from the queue and returns them in a
        list in order.

        Raises:
            Empty: if the queue does not contain the desired number of items
        z#Argument 'num_items' must be an intr   z'num_items' must be nonnegative)	rN   intrO   r>   r   r.   r#   get_nowait_batchr    r$   rR   s     r   rU   zQueue.get_nowait_batch   sa     )S)) 	CABBBq==>???wtz299)DDEEEr   F   forcegrace_period_sc                    | j         rs|rt          j        | j         d           nU| j         j                                        }t          j        |g|          \  }}|rt          j        | j         d           d| _         dS )a  Terminates the underlying QueueActor.

        All of the resources reserved by the queue will be released.

        Args:
            force: If True, forcefully kill the actor, causing an
                immediate failure. If False, graceful
                actor termination will be attempted first, before falling back
                to a forceful kill.
            grace_period_s: If force is False, how long in seconds to
                wait for graceful termination before falling back to
                forceful kill.
        T)
no_restart)r8   N)r#   r   kill__ray_terminate__r    wait)r$   rX   rY   done_refdonenot_dones         r   shutdownzQueue.shutdown   s     : 	: :55555:7>>@@!$8*n!M!M!Mh :HTZD9999


r   )r   N)TN)FrW   )r   r   r   __doc__rT   r   r   r%   r+   r)   r/   boolr3   r5   r   floatr?   rB   r.   rH   r;   r   rP   rD   r   rU   rb   r   r   r   r   r      s        B	
 	
 	
 	
RV 	
 	
 	
 	
    2c 2 2 2 2s    2t 2 2 2 21d 1 1 1 1
 IM> >> $>6>uo>	> > > >8 IM; ;; $;6>uo;	; ; ; ;6? ? ?x ?# ? ? ? ?: >B< <<+3E?<	< < < <4+s +t + + + +	;h 	;4 	; 	; 	; 	;%C % % % %F# F$s) F F F F d C       r   r   c                   N    e Zd Zd Zd Zd Zd ZddZddZd Z	d	 Z
d
 Zd ZdS )r!   c                 P    || _         t          j        | j                   | _        d S r'   )r   r<   r   queue)r$   r   s     r   r%   z_QueueActor.__init__  s     ]4<00


r   c                 4    | j                                         S r'   )rh   r/   r*   s    r   r/   z_QueueActor.qsize      z!!!r   c                 4    | j                                         S r'   )rh   r3   r*   s    r   r3   z_QueueActor.empty  rj   r   c                 4    | j                                         S r'   )rh   r5   r*   s    r   r5   z_QueueActor.full  s    z   r   Nc                    K   	 t          j        | j                            |          |           d {V  d S # t           j        $ r t
          w xY wr'   )r<   wait_forrh   r?   TimeoutErrorr   )r$   r6   r8   s      r   r?   z_QueueActor.put  sg      	"4:>>$#7#7AAAAAAAAAAA# 	 	 	J	s	   39 Ac                    K   	 t          j        | j                                        |           d {V S # t           j        $ r t
          w xY wr'   )r<   rn   rh   r.   ro   r   )r$   r8   s     r   r.   z_QueueActor.get  s_      	 )$*..*:*:GDDDDDDDDD# 	 	 	K	s	   16 Ac                 :    | j                             |           d S r'   )rh   r;   rK   s     r   r;   z_QueueActor.put_nowait  s    
d#####r   c           	      .   | j         dk    rjt          |          |                                 z   | j         k    r=t          dt          |           d|                                  d| j          d          |D ]}| j                            |           d S )Nr   zCannot add z items to queue of size z and maxsize .)r   lenr/   r   rh   r;   )r$   rL   r6   s      r   rP   z_QueueActor.put_nowait_batch   s    <!E

TZZ\\ 9DL H H>c%jj > >::<<> >.2l> > >    	( 	(DJ!!$''''	( 	(r   c                 4    | j                                         S r'   rh   rD   r*   s    r   rD   z_QueueActor.get_nowait*  s    z$$&&&r   c                      |                                  k    r(t          d| d                                   d           fdt          |          D             S )NzCannot get z items from queue of size rs   c                 B    g | ]}j                                         S r   rv   ).0_r$   s     r   
<listcomp>z0_QueueActor.get_nowait_batch.<locals>.<listcomp>2  s'    BBBA
%%''BBBr   )r/   r   rangerV   s   ` r   rU   z_QueueActor.get_nowait_batch-  sk    tzz||##ViVVtzz||VVV   CBBBy1A1ABBBBr   r'   )r   r   r   r%   r/   r3   r5   r?   r.   r;   rP   rD   rU   r   r   r   r!   r!     s        1 1 1" " "" " "! ! !      $ $ $( ( (' ' 'C C C C Cr   r!   )r<   rh   collections.abcr   typingr   r   r   r   r   ray.util.annotationsr   r   r   r   r!   r   r   r   <module>r      s{     $ $ $ $ $ $ , , , , , , , , , , , , 



 * * * * * * V	 	 	 	 	EK 	 	 	 V	 	 	 	 	5: 	 	 	 Vk k k k k k k k\/C /C /C /C /C /C /C /C /C /Cr   