
    &`i                         d dl mZmZmZmZmZmZmZ d dlZd dl	m
Z
 erd dlmZ d dlmZ dZdZdd	d
ddededeeeedf                  df         fdZ e
d          eddd
ddededeeedf                  fd            Z e
d          eedddddee         dee         dededeeedf                  fd            ZdS )    )TYPE_CHECKINGAnyIterableIteratorOptionalSequenceUnionN)	PublicAPI)	ObjectRef)RemoteFunction
   d   F)yield_obj_refsrefszSequence[ObjectRef]
chunk_sizer   returnr   zlist[ObjectRef]c          
          |dk     rt          d          |pi }t          j        | fdt          |t	          |                     i|\  }} |st          j        |          | fS || fS )a  Call ray.wait and explicitly return the ready objects/results
    and remaining Ray remote refs.

    Args:
        refs: A list of Ray object refs.
        chunk_size: The `num_returns` parameter to pass to `ray.wait()`.
        yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
        **kwargs: Additional keyword arguments to pass to `ray.wait()`.

    Returns:
        A tuple of two lists, ready and not ready. This is the same as the return value of `ray.wait()`.
       `chunk_size` must be >= 1num_returns)
ValueErrorraywaitminlenget)r   r   r   kwargsreadys        d/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/helpers.py_wait_and_get_single_batchr       s    ( A~~4555\rF ( 
CII..  KE4  $wu~~t##$;    alpha)	stabilityr   r   c             +      K   |dk     rt          d          d|v rt          d          | rt          | f||d|\  }} |E d{V  | dS dS )a  Given a list of Ray task references, yield results as they become available.

    Unlike calling :meth:`~ray.get` on a list of references (i.e., `ray.get(refs)`) which
    waits for all results to be ready, this function begins to yield result as soon as
    a batch of `chunk_size` results are ready.

    .. note::
        Generally there is no guarantee on the order of results. For example, the first result
        is not necessarily the first one completed, but rather the first one submitted in the
        first available batch (See :meth:`~ray.wait` for more details about
        preservation of submission order).

    .. note::
        Use this function instead of calling :meth:`~ray.get` inside a for loop. See
        https://docs.ray.io/en/latest/ray-core/patterns/ray-get-loop.html for more details.

    Example:
        Suppose we have a function that sleeps for x seconds depending on the input.
        We expect to obtain a partially sorted list of results.

        .. testcode:: python
            import ray
            import time

            @ray.remote
            def f(x):
                time.sleep(x)
                return x

            refs = [f.remote(i) for i in [10, 4, 6, 8, 2]]
            for x in ray.util.as_completed(refs, chunk_size=2):
                print(x)

        .. testoutput::
            :options: +MOCK

            # Output:
            4
            2
            6
            8
            10

    Args:
        refs: A list of Ray object refs.
        chunk_size: The number of tasks to wait for in each iteration (default 10).
            The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
        yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
        **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
            `timeout` and `fetch_local`.

    Yields:
        Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.
    r   r   r   z6Use the `chunksize` argument instead of `num_returns`.r$   N)r   r    )r   r   r   r   resultss        r   as_completedr'   8   s      | A~~4555QRRR
 2
!)
 
 	
 
       r!   )backpressure_sizer   r   fnr   itemsr(   c             +     K   |t          d          }n|dk    rt          d          |dk     rt          d          d|v rt          d          g }|D ]X}|                    |                     |                     t	          |          |k    rt          |f||d	|\  }}|E d{V  Yt          |f||d	|E d{V  dS )
a  Apply a Ray remote function to a list of items and return an iterator that yields
    the completed results as they become available.

    This helper function applies backpressure to control the number of pending tasks, following the
    design pattern described in
    https://docs.ray.io/en/latest/ray-core/patterns/limit-pending-tasks.html.

    .. note::
        There is generally no guarantee on the order of results.

    Example:
        Suppose we have a function that sleeps for x seconds depending on the input.
        We expect to obtain a partially sorted list of results.

        .. testcode:: python

            import ray
            import time

            @ray.remote
            def f(x):
                time.sleep(x)
                return x

            # Example 1: chunk_size=2
            for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], chunk_size=2):
                print(x)

        .. testoutput::
            :options: +MOCK

            4
            2
            6
            8
            10

        .. testcode:: python

            # Example 2: backpressure_size=2, chunk_size=1
            for x in ray.util.map_unordered(f, [10, 4, 6, 8, 2], backpressure_size=2, chunk_size=1):
                print(x)

        .. testoutput::
            :options: +MOCK

            4
            10
            6
            8
            2

    Args:
        fn: A remote function to apply to the list of items. For more complex use cases, use Ray Data's
            :meth:`~ray.data.Dataset.map` / :meth:`~ray.data.Dataset.map_batches` instead.
        items: An iterable of items to apply the function to.
        backpressure_size: Maximum number of in-flight tasks allowed before
            calling a blocking :meth:`~ray.wait` (default 100). If None, no backpressure is applied.
        chunk_size: The number of tasks to wait for when the number of in-flight tasks exceeds
            `backpressure_size`. The parameter is passed as `num_returns` to :meth:`~ray.wait` internally.
        yield_obj_refs: If True, return Ray remote refs instead of results (by calling :meth:`~ray.get`).
        **kwargs: Additional keyword arguments to pass to :meth:`~ray.wait`, e.g.,
            `timeout` and `fetch_local`.

    Yields:
        Union[Any, ObjectRef]: The results (or optionally their Ray references) of the Ray tasks as they complete.

    .. seealso::

        :meth:`~ray.util.as_completed`
            Call this method for an existing list of Ray object refs.

        :meth:`~ray.data.Dataset.map`
            Use Ray Data APIs (e.g., :meth:`~ray.data.Dataset.map` and :meth:`~ray.data.Dataset.map_batches`)
            for better control and complex use cases, e.g., functions with multiple arguments.

    .. note::

        This is an altenative to `pool.imap_unordered()` in Ray's Actor-based `multiprocessing.Pool`.
        See https://docs.ray.io/en/latest/ray-more-libs/multiprocessing.html for more details.

    Ninfr   z#backpressure_size must be positive.r   r   r   z7Use the `chunk_size` argument instead of `num_returns`.r$   )floatr   appendremoter   r    r'   )	r)   r*   r(   r   r   r   r   itemr&   s	            r   map_unorderedr1      sN     z  #(<<	a		>???A~~4555RSSSD 
 
BIIdOO$$$t99)))6%-  	 MGT 
!)
 
 	
 
 	
 	
 	
 	
 	
 	
 	
 	
 	
r!   )typingr   r   r   r   r   r   r	   r   ray.util.annotationsr
   r   ray.remote_functionr   DEFAULT_CHUNK_SIZEDEFAULT_BACKPRESSURE_SIZEintbooltuplelistr    r'   r1    r!   r   <module>r<      s   T T T T T T T T T T T T T T T T T T 



 * * * * * * 3222222    !	# # #
# # 	# 4c;&'(*;;<# # # #L W ) 	J J J
J J 	J eC$%&J J J JZ W
 (A( y
 y
 y
y
C=y
  }	y

 y
 y
 eC$%&y
 y
 y
 y
 y
 y
r!   