
    &`i:)                        d dl mZ d dlZd dlZd dlmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZ erd dlmZ e G d d                      Ze G d	 d
                      ZdS )    )annotationsN)TYPE_CHECKINGDequeIteratorOptional)ObjectRefStreamEndOfStreamError)DeveloperAPI	PublicAPI)Workerc                  &    e Zd Zd
dZddZddZd	S )DynamicObjectRefGeneratorrefsDeque['ray.ObjectRef']c                8    t          j        |          | _        d S N)collectionsdeque_refs)selfr   s     u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/_private/object_ref_generator.py__init__z"DynamicObjectRefGenerator.__init__   s     .9->t-D-D


    returnIterator('ray.ObjectRef')c              #  `   K   | j         r$| j                                         V  | j         "d S d S r   )r   popleftr   s    r   __iter__z"DynamicObjectRefGenerator.__iter__   sJ      j 	'*$$&&&&& j 	' 	' 	' 	' 	'r   intc                *    t          | j                  S r   )lenr   r   s    r   __len__z!DynamicObjectRefGenerator.__len__   s    4:r   N)r   r   )r   r   )r   r   )__name__
__module____qualname__r   r   r"    r   r   r   r      sT        E E E E' ' ' '     r   r   c                      e Zd ZdZd"dZd#d	Zd$d
Zd Zd Zd Z	d#dZ
d Zd Zd Zd Zd$dZd%dZd%dZd$dZd&d'dZd(dZd&d)dZd  Zd! ZdS )*ObjectRefGeneratora  A generator to obtain object references from a task in a streaming manner.

    The class is compatible with the Python generator and async generator interfaces.

    The class is not thread-safe.

    Do not initialize the class and create an instance directly.
    The instance should be created by `.remote`.

    .. testcode::

        import ray
        from typing import Generator

        @ray.remote(num_returns="streaming")
        def gen() -> Generator[int, None, None]:
            for i in range(5):
                yield i

        obj_ref_gen: ray.ObjectRefGenerator = gen.remote()
        for obj_ref in obj_ref_gen:
            print("Got:", ray.get(obj_ref))
    generator_ref'ray.ObjectRef'worker'Worker'c                    || _         d| _        || _        | j                                         t	          |d          sJ d S )NFcore_worker)_generator_ref_generator_task_raisedr+   check_connectedhasattr)r   r)   r+   s      r   r   zObjectRefGenerator.__init__9   sI    +&+###%%%v}-------r   r   'ObjectRefGenerator'c                    | S r   r&   r   s    r   r   zObjectRefGenerator.__iter__E       r   c                *    |                                  S )a  Waits until a next ref is available and returns the object ref.

        Raises StopIteration if there's no more objects
        to generate.

        The object ref will contain an exception if the task fails.
        When the generator task returns N objects, it can return
        up to N + 1 objects (if there's a system failure, the
        last object will contain a system level exception).
        )
_next_syncr   s    r   __next__zObjectRefGenerator.__next__H   s        r   c                     t          d          )Nz`gen.send` is not supported.NotImplementedErrorr   values     r   sendzObjectRefGenerator.sendU   s    !"@AAAr   c                     t          d          )Nz`gen.throw` is not supported.r:   r<   s     r   throwzObjectRefGenerator.throwX       !"ABBBr   c                     t          d          )Nz`gen.close` is not supported.r:   r   s    r   closezObjectRefGenerator.close[   rA   r   c                    | S r   r&   r   s    r   	__aiter__zObjectRefGenerator.__aiter__^   r5   r   c                :   K   |                                   d {V S r   )_next_asyncr   s    r   	__anext__zObjectRefGenerator.__anext__a   s*      %%'''''''''r   c                $   K   t          d          )Nz`gen.asend` is not supported.r:   r<   s     r   asendzObjectRefGenerator.asendd   s      !"ABBBr   c                $   K   t          d          )Nz`gen.athrow` is not supported.r:   r<   s     r   athrowzObjectRefGenerator.athrowg         !"BCCCr   c                $   K   t          d          )Nz`gen.aclose` is not supported.r:   r   s    r   aclosezObjectRefGenerator.aclosej   rM   r   c                    | j         S )a   Returns an object ref that is ready when
        a generator task completes.

        If the task is failed unexpectedly (e.g., worker failure),
        the `ray.get(gen.completed())` raises an exception.

        The function returns immediately.
        )r/   r   s    r   	completedzObjectRefGenerator.completedm   s     ""r   boolc                   | j                                          | j         j        }|                                 rdS |                    | j                  \  }}|rdS t          j        |gdd          \  }}t          |          dk    S )a)  If True, it means the output of next(gen) is ready and
        ray.get(next(gen)) returns immediately. False otherwise.

        It returns False when next(gen) raises a StopIteration
        (this condition should be checked using is_finished).

        The function returns immediately.
        FTr   timeoutfetch_local)	r+   r1   r.   is_finishedpeek_object_ref_streamr/   raywaitr!   )r   r.   expected_refis_readyready_s         r   
next_readyzObjectRefGenerator.next_readyx   s     	##%%%k- 	5!,!C!CDDW!X!Xh 	48\NA5IIIq5zzA~r   c                    | j                                          | j         j        }|                    | j                  }|r6| j        rdS 	 t          j        | j                   dS # t          $ r Y dS w xY wdS )zIf True, it means the generator is finished
        and all output is taken. False otherwise.

        When True, if next(gen) is called, it will raise StopIteration
        or StopAsyncIteration

        The function returns immediately.
        TF)	r+   r1   r.   is_object_ref_stream_finishedr/   r0   rY   get	Exception)r   r.   finisheds      r   rW   zObjectRefGenerator.is_finished   s     	##%%%k-<<T=PQQ 	*  t GD/000  4 ! ! ! ! !55! 5s   A' '
A54A5c                    | j                                          | j         j        }|                    | j                  d         S )zReturn the next reference from a generator.

        Note that the ObjectID generated from a generator
        is always deterministic.
        r   )r+   r1   r.   rX   r/   )r   r.   s     r   _get_next_refz ObjectRefGenerator._get_next_ref   s>     	##%%%k-11$2EFFqIIr   N	timeout_sOptional[int | float]c                   | j         j        }|                    | j                  \  }}|sLt	          j        |g|d          \  }}t          |          dk    rt          j                                        S 	 |	                    | j                  }|
                                rJ n_# t          $ rR | j        rt          d	 t	          j        | j                   t          d# t          $ r d| _        | j        cY cY S w xY ww xY w|S )aT  Waits for timeout_s and returns the object ref if available.

        If an object is not available within the given timeout, it
        returns a nil object reference.

        If -1 timeout is provided, it means it waits infinitely.

        Waiting is implemented as busy waiting.

        Raises StopIteration if there's no more objects
        to generate.

        The object ref will contain an exception if the task fails.
        When the generator task returns N objects, it can return
        up to N + 1 objects (if there's a system failure, the
        last object will contain a system level exception).

        Args:
            timeout_s: If the next object is not ready within
                this timeout, it returns the nil object ref.

        Returns:
            ObjectRef corresponding to the next result in the stream.
        FrT   r   NT)r+   r.   rX   r/   rY   rZ   r!   	ObjectRefniltry_read_next_object_ref_streamis_nilr   r0   StopIterationrb   rc   )r   rg   r.   r[   r\   r^   unreadyrefs           r   r7   zObjectRefGenerator._next_sync   s:   2 k- "-!C!CDDW!X!Xh 	+<.)QVWWWJAw7||a}((***	.==d>QRRCzz||####. 	. 	. 	.* .#-
. +,,, $-  + + +.2+******+	.  
s0   90B* *DC%D%D=DDDrp   Nonec                >   K   	 | d {V  d S # t           $ r Y d S w xY wr   )rc   )r   rp   s     r   _suppress_exceptionsz'ObjectRefGenerator._suppress_exceptions   sB      
	IIIIIIIII 	 	 	DD	s    
c                V  K   | j         j        }|                    | j                  \  }}|svt	          j        t	          j        |                     |                    g|           d{V \  }}t          |          dk    rt          j
                                        S 	 |                    | j                  }|                                rJ nS# t          $ rF | j        rt           d	 | j         d{V  t           d# t"          $ r d| _        | j        cY cY S w xY ww xY w|S )z4Same API as _next_sync, but it is for async context.)rU   Nr   T)r+   r.   rX   r/   asynciorZ   create_taskrs   r!   rY   rj   rk   rl   rm   r   r0   StopAsyncIterationrc   )r   rg   r.   rp   r\   r^   ro   s          r   rG   zObjectRefGenerator._next_async   s     k-#::4;NOOX 	+&|$T%>%>s%C%CDDEy           JAw 7||a}((***	3==d>QRRCzz||####. 	3 	3 	3* 3(d2
3 )))))))) )d2  + + +.2+******+	3" 
s0   %0C D&0D=D&D"D&!D""D&c                |    t          | j        d          r&| j        j                            | j                   d S d S )Nr.   )r2   r+   r.   async_delete_object_ref_streamr/   r   s    r   __del__zObjectRefGenerator.__del__!  sG    4;.. 	X
 K#BB4CVWWWWW	X 	Xr   c                     t          d          )NzeYou cannot return or pass a generator to other task. Serializing a ObjectRefGenerator is not allowed.)	TypeErrorr   s    r   __getstate__zObjectRefGenerator.__getstate__)  s    ?
 
 	
r   )r)   r*   r+   r,   )r   r3   )r   r*   )r   rR   r   )rg   rh   r   r*   )rp   r*   r   rq   )rg   rh   )r#   r$   r%   __doc__r   r   r8   r>   r@   rC   rE   rH   rJ   rL   rO   rQ   r_   rW   rf   r7   rs   rG   rz   r}   r&   r   r   r(   r(      s        0. . . .   ! ! ! !B B BC C CC C C   ( ( (C C CD D DD D D	# 	# 	# 	#   .   FJ J J J6 6 6 6 6p   ! ! ! ! !FX X X
 
 
 
 
r   r(   )
__future__r   ru   r   typingr   r   r   r   rY   ray.exceptionsr   ray.util.annotationsr	   r
   ray._private.workerr   r   r(   r&   r   r   <module>r      s   " " " " " "      ; ; ; ; ; ; ; ; ; ; ; ; 



 : : : : : : 8 8 8 8 8 8 8 8 +******         M
 M
 M
 M
 M
 M
 M
 M
 M
 M
r   