
    &`iP$                        d dl Z d dlmZmZmZmZmZmZ d dlm	Z	 d dl
mZmZ d dlmZmZ  e j        e          Z ed          Z ed          Zeeee         e	gee         f         eee         e	egee         f         edee         f         f         Ze G d	 d
                      Ze G d de                      Ze G d de                      Zdeeef         defdZdS )    N)AnyCallableIterableOptionalTypeVarUnion)TaskContext)BlockUserDefinedFunction)DeveloperAPI	PublicAPITU.c                       e Zd ZdS )ComputeStrategyN)__name__
__module____qualname__     n/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/compute.pyr   r      s        Dr   r   c                   J    e Zd ZdZ	 d	dee         fdZdedefdZ	de
fdZdS )
TaskPoolStrategya;  Specify the task-based compute strategy for a Dataset transform.

    TaskPoolStrategy executes dataset transformations using Ray tasks that are
    scheduled through a pool. Provide ``size`` to cap the number of concurrent
    tasks; leave it unset to allow Ray Data to scale the task count
    automatically.
    Nsizec                 D    ||dk     rt          d|          || _        dS )zConstruct TaskPoolStrategy for a Dataset transform.

        Args:
            size: Specify the maximum size of the task pool.
        N   z`size` must be >= 1)
ValueErrorr   )selfr   s     r   __init__zTaskPoolStrategy.__init__)   s.     q2D999			r   otherreturnc                 j    t          |t                    r| j        |j        k    p|dk    o| j        d u S )Ntasks)
isinstancer   r   r   r    s     r   __eq__zTaskPoolStrategy.__eq__7   s<    5"233O	UZ8O 
W2d!2	
r   c                     d| j          dS )NzTaskPoolStrategy(size=))r   r   s    r   __repr__zTaskPoolStrategy.__repr__<   s    4	4444r   N)r   r   r   __doc__r   intr   r   boolr&   strr*   r   r   r   r   r      s          # sm   
C 
D 
 
 
 

5# 5 5 5 5 5 5r   r   c                       e Zd ZdZddddddddee         dee         dee         dee         d	ee         d
efdZdedefdZ	de
fdZdS )ActorPoolStrategyae	  Specify the actor-based compute strategy for a Dataset transform.

    ActorPoolStrategy specifies that an autoscaling pool of actors should be used
    for a given Dataset transform. This is useful for stateful setup of callable
    classes.

    For a fixed-sized pool of size ``n``, use ``ActorPoolStrategy(size=n)``.

    To autoscale from ``m`` to ``n`` actors, use
    ``ActorPoolStrategy(min_size=m, max_size=n)``.

    To autoscale from ``m`` to ``n`` actors, with an initial size of ``initial``, use
    ``ActorPoolStrategy(min_size=m, max_size=n, initial_size=initial)``.

    To increase opportunities for pipelining task dependency prefetching with
    computation and avoiding actor startup delays, set max_tasks_in_flight_per_actor
    to 2 or greater; to try to decrease the delay due to queueing of tasks on the worker
    actors, set max_tasks_in_flight_per_actor to 1.

    The `enable_true_multi_threading` argument primarily exists to prevent GPU OOM issues with multi-threaded actors.
    The life cycle of an actor task involves 3 main steps:

        1. Batching Inputs
        2. Running actor UDF
        3. Batching Outputs

    The `enable_true_multi_threading` flag affects step 2. If set to `True`, then the UDF can be run concurrently.
    By default, it is set to `False`, so at most 1 actor UDF is running at a time per actor. The `max_concurrency`
    flag on `ray.remote` affects steps 1 and 3. Below is a matrix summary:

    - [`enable_true_multi_threading=False or True`, `max_concurrency=1`] = 1 actor task running per actor. So at most 1
        of steps 1, 2, or 3 is running at any point in time.
    - [`enable_true_multi_threading=False`, `max_concurrency>1`] = multiple tasks running per actor
      (respecting GIL) but UDF runs 1 at a time. This is useful for doing CPU and GPU work,
      where you want to use a large batch size but want to hide the overhead of *batching*
      the inputs. In this case, CPU *batching* is done concurrently, while GPU *inference*
      is done 1 at a time. Concretely, steps 1 and 3 can have multiple threads, while step 2 is done serially.
    - [`enable_true_multi_threading=True`, `max_concurrency>1`] = multiple tasks running per actor.
      Unlike bullet #3 ^, the UDF runs concurrently (respecting GIL). No restrictions on steps 1, 2, or 3

    NOTE: `enable_true_multi_threading` does not apply to async actors
    NF)r   min_sizemax_sizeinitial_sizemax_tasks_in_flight_per_actorenable_true_multi_threadingr   r2   r3   r4   r5   r6   c                d   |1|dk     rt          d|          |||t          d          |}|}|}||dk     rt          d|          ||d}||k    rt          d||          ||dk     rt          d|          |pd| _        |pt          d          | _        |d|| j        k     rt          d	| d
| j         d          | j        t          d          k    r&|| j        k    rt          d	| d| j         d          |p| j        | _        || _        d| _        d| _        || _        dS )a  Construct ActorPoolStrategy for a Dataset transform.

        Args:
            size: Specify a fixed size actor pool of this size. It is an error to
                specify both `size` and `min_size` or `max_size`.
            min_size: The minimum size of the actor pool.
            max_size: The maximum size of the actor pool.
            initial_size: The initial number of actors to start with. If not specified,
                defaults to min_size. Must be between min_size and max_size.
            max_tasks_in_flight_per_actor: The maximum number of tasks to concurrently
                send to a single actor worker. Increasing this will increase
                opportunities for pipelining task dependency prefetching with
                computation and avoiding actor startup delays, but will also increase
                queueing delay.
            enable_true_multi_threading: If enable_true_multi_threading=True, no more than 1 actor task
                runs per actor. Otherwise, respects the `max_concurrency` argument.
        Nr   zsize must be >= 1zMmin_size, max_size, and initial_size cannot be set at the same time as `size`zmin_size must be >= 1zmin_size must be <= max_sizez1max_tasks_in_flight_per_actor must be >= 1, got: infzinitial_size (z) must be >= min_size (r(   z) must be <= max_size (r   g?)	r   r2   floatr3   r4   r5   num_workersready_to_total_workers_ratior6   )r   r   r2   r3   r4   r5   r6   s          r   r   zActorPoolStrategy.__init__m   s   6 axx !4d;;;#x';|?W c   HHLHqLL4h???("" !?8TTT)5-11C-  
 !A 0E%LL #dm++ Z\ZZ$-ZZZ   }e,,1M1M Z\ZZ$-ZZZ   )9DM-J*,/)+F(((r   r    r!   c                     t          |t                    oO| j        |j        k    o?| j        |j        k    o/| j        |j        k    o| j        |j        k    o| j        |j        k    S r+   )r$   r1   r2   r3   r4   r6   r5   r%   s     r   r&   zActorPoolStrategy.__eq__   sw    %!233 
MU^+ 3/3!U%773 0E4UU3 223	
r   c                 x    d| j          d| j         d| j         d| j         d| j         d| j         d| j         dS )	NzActorPoolStrategy(min_size=z, max_size=z, initial_size=z , max_tasks_in_flight_per_actor=z)num_workers=z, enable_true_multi_threading=z, ready_to_total_workers_ratio=r(   )r2   r3   r4   r5   r:   r6   r;   r)   s    r   r*   zActorPoolStrategy.__repr__   s    Q$- Q QQ Q -Q Q .2-OQ Q  +	Q Q
 ,0+KQ Q -1,MQ Q Q	
r   )r   r   r   r,   r   r-   r.   r   r   r&   r/   r*   r   r   r   r1   r1   @   s        ) )\ #"&"&&*7;,1GG GG GG smGG 3-	GG
 3-GG smGG (0}GG &*GG GG GG GGR
C 
D 
 
 
 
	
# 	
 	
 	
 	
 	
 	
r   r1   compute_specr!   c                     t          | t          t          f          st          d|  d          | r| dk    rt                      S | dk    rt                      S t          | t                    r| S t          d          )NzXIn Ray 2.5, the compute spec must be either TaskPoolStrategy or ActorPoolStrategy, was: .r#   actorsz;compute must be one of [`tasks`, `actors`, ComputeStrategy])r$   r   r1   r   r   )r>   s    r   get_computerB      s    l%57H$IJJ XK;GK K K
 
 	
  X\W44!!!		!	! """	L/	2	2 XVWWWr   )loggingtypingr   r   r   r   r   r   'ray.data._internal.execution.interfacesr	   ray.data.blockr
   r   ray.util.annotationsr   r   	getLoggerr   loggerr   r   BlockTransformr   r   r1   r/   rB   r   r   r   <module>rK      s    D D D D D D D D D D D D D D D D ? ? ? ? ? ? 5 5 5 5 5 5 5 5 8 8 8 8 8 8 8 8		8	$	$GCLLGCLL 
 huo{+Xe_<=huo{,?@(5/QRS(5/!"$ 	 	 	 	 	 	 	 	 5 5 5 5 5 5 5 5@ H
 H
 H
 H
 H
 H
 H
 H
VXeC$89 Xo X X X X X Xr   