
    &`i1=                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ  ed
          Z ej        e          Z  G d d          Z!e G d d                      Z"e G d d                      Z#ddee         defdZ$de"fdZ% G d de          Z&dS )    N)defaultdict)	dataclass)CallableDictListOptionalTupleTypeTypeVarUnion)ActorHandle)exception_causeskip_exceptions)BaseWorkerGroup)	ObjectRef)PlacementGroupTc                   2    e Zd ZdZdedef         defdZdS )RayTrainWorkerz@A class to execute arbitrary functions. Does not hold any state.func.returnc                 v    	  ||i |S # t           $ r$}t          |          }|t          |          d}~ww xY w)zExecutes the input function and returns the output.

        Args:
            func: The function to execute.
            args, kwargs: The arguments to pass into func.
        N)	Exceptionr   r   )selfr   argskwargseskippeds         t/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/_internal/worker_group.py	__executezRayTrainWorker.__execute   sX    	84(((( 	8 	8 	8%a((Gw777	8s   
 
838N)__name__
__module____qualname____doc__r   r   _RayTrainWorker__execute     r   r   r      sE        JJ8hsAv. 8A 8 8 8 8 8 8r'   r   c                   b    e Zd ZU dZeed<   eed<   eed<   eeee         f         ed<   eed<   dS )WorkerMetadataa  Metadata for each worker/actor.

    This information is expected to stay the same throughout the lifetime of
    actor.

    Args:
        node_id: ID of the node this worker is on.
        node_ip: IP address of the node this worker is on.
        hostname: Hostname that this worker is on.
        resource_ids: Map of accelerator resources
            ("GPU", "neuron_cores", ..) to their IDs.
        pid: Process ID of this worker.
    node_idnode_iphostnameresource_idspidN)	r!   r"   r#   r$   str__annotations__r   r   intr&   r'   r   r)   r)   &   sY           LLLLLLMMMsDI~&&&&	HHHHHr'   r)   c                   (    e Zd ZU dZeed<   eed<   dS )WorkerzClass representing a Worker.actormetadataN)r!   r"   r#   r$   r   r0   r)   r&   r'   r   r3   r3   =   s0         &&r'   r3   executable_clsr   c                 n    | st           S t          | t                     r| S  G d d| t                     }|S )z5Create the executable class to use as the Ray actors.c                        e Zd Z fdZ xZS )3create_executable_class.<locals>._WrappedExecutablec                 :     t                      j        |i | d S N)super__init__)r   r   r   	__class__s      r   r=   z<create_executable_class.<locals>._WrappedExecutable.__init__N   s%      $1&11111r'   )r!   r"   r#   r=   __classcell__)r>   s   @r   _WrappedExecutabler9   M   s8        2 2 2 2 2 2 2 2 2r'   r@   )r   
issubclass)r6   r@   s     r   create_executable_classrB   E   s\     
"	NN	3	3 "	2 	2 	2 	2 	2 	2 	2 	2 "!r'   c                  F   t          j                                                    } t           j                                        }t          j                    }t          j                                                    }t          j	                    }t          | ||||          S )z]Creates metadata for this worker.

    This function is expected to be run on the actor.
    )r*   r+   r,   r-   r.   )rayget_runtime_contextget_node_idutilget_node_ip_addresssocketgethostnameget_accelerator_idsosgetpidr)   )r*   r+   r,   accelerator_idsr.   s        r   construct_metadatarO   T   s    
 %''3355Gh**,,G!##H-//CCEEO
)++C$   r'   c                      e Zd ZdZ	 	 	 	 	 	 ddedeeeef                  de	dee
         d	ee         d
eeef         fdZd Zd defdZdedef         dee         fdZdedef         dee         fdZdededef         defdZdededef         defdZdee         fdZdefdZd!dee         fdZd ZdefdZdS )"WorkerGroupa:  Group of Ray Actors that can execute arbitrary functions.

    ``WorkerGroup`` launches Ray actors according to the given
    specification. It can then execute arbitrary Python functions in each of
    these workers.

    If not enough resources are available to launch the actors, the Ray
    cluster will automatically scale up if autoscaling is enabled.

    Args:
        num_workers: The number of workers (Ray actors) to launch.
            Defaults to 1.
        resources_per_worker (Optional[Dict[str, float]]):
            Dictionary specifying the resources that will be
            requested for each worker. Defaults to {"CPU": 1}.
        actor_cls (Optional[Type]): If specified use this class as the
            remote actors.
        remote_cls_args, remote_cls_kwargs: If ``remote_cls`` is provided,
            these args will be used for the worker initialization.
        placement_group (PlacementGroup|str): The placement group that workers
            should be created in. Defaults to "default" which will inherit the
            parent placement group (if child tasks should be captured).


    Example:

    .. code_block:: python

        worker_group = WorkerGroup(num_workers=2)
        output = worker_group.execute(lambda: 1)
        assert len(output) == 2
        assert all(o == 1 for o in output)
       Ndefaultnum_workersresources_per_worker	actor_clsactor_cls_argsactor_cls_kwargsplacement_groupc                 6   |ddi}n|                                 }|dk    rt          d| d          t          d |                                D                       rt          d| d          |s|r|st          d	          || _        || _        t          j        |          }|                    dd          | _        |                    d
d          | _	        |                    dd          | _
        g | _        t          |          | _        t          | j        t                    sJ |pg | _        |pi | _        || _         t'          j        | j        | j	        | j
        |          | j                  | _        |                                  d S )NCPUrR   r   zHThe provided `num_workers` must be greater than 0. Received num_workers=z	 instead.c              3   "   K   | ]
}|d k     V  dS )r   Nr&   ).0vs     r   	<genexpr>z'WorkerGroup.__init__.<locals>.<genexpr>   s&      <<q1u<<<<<<r'   zWThe number of resources per worker must not be negative. Received resources_per_worker=.zW`actor_cls_args` or `actor_class_kwargs` are passed in but no `actor_cls` is passed in.GPUmemory)num_cpusnum_gpusrb   	resources)copy
ValueErroranyvaluesrT   rU   deepcopypopnum_cpus_per_workernum_gpus_per_workermemory_per_workerworkersrB   	_base_clsrA   r   _actor_cls_args_actor_cls_kwargs_placement_grouprD   remote_remote_clsstart)r   rT   rU   rV   rW   rX   rY   _resources_per_workers           r   r=   zWorkerGroup.__init__   s     '$)1:  #7#<#<#>#> !0;     <<299;;<<<<< 	I1EI I I  
  	. 		 	=  
 '$8! $.B C C#8#<#<UA#F#F #8#<#<UA#F#F !6!:!:8Q!G!G0;;$..99999-3!1!7R /
3:--)+	
 
 

 .  	

r'   c                 :   | j         r't          | j                   dk    rt          d          t                              d| j         d           |                     | j                   t                              t          | j                    d           dS )z,Starts all the workers in this worker group.r   z`The workers have already been started. Please call `shutdown` first if you want to restart them.z	Starting 	 workers.z# workers have successfully started.N)ro   lenRuntimeErrorloggerdebugrT   add_workersr   s    r   rv   zWorkerGroup.start   s    < 	C--11    	<!1<<<===)***DL))NNNOOOOOr'      
patience_sc                    t                               dt          | j                   d           |dk    r$| j        D ]}t	          j        |j                   nid | j        D             }t	          j        ||          \  }}|r=t                               d           | j        D ]}t	          j        |j                   t                               d           g | _        dS )	au  Shutdown all the workers in this worker group.

        Args:
            patience_s: Attempt a graceful shutdown
                of the workers for this many seconds. Fallback to force kill
                if graceful shutdown is not complete after this time. If
                this is less than or equal to 0, immediately force kill all
                workers.
        zShutting down ry   r   c                 J    g | ] }|j         j                                        !S r&   )r4   __ray_terminate__rt   )r]   ws     r   
<listcomp>z(WorkerGroup.shutdown.<locals>.<listcomp>   s)    RRR299;;RRRr'   )timeoutz8Graceful termination failed. Falling back to force kill.zShutdown successful.N)r|   r}   rz   ro   rD   killr4   wait)r   r   worker	done_refsdonenot_dones         r   shutdownzWorkerGroup.shutdown   s     	Bc$,&7&7BBBCCC??, ' '&&&&' SRT\RRRI XiDDDND( +WXXX"l + +FHV\****+,,,r'   r   .r   c                     t          | j                  dk    rt          d          fd| j        D             S )a  Execute ``func`` on each worker and return the futures.

        Args:
            func: A function to call on each worker.
            args, kwargs: Passed directly into func.

        Returns:
            (List[ObjectRef]) A list of ``ObjectRef`` representing the
                output of ``func`` from each worker. The order is the same
                as ``self.workers``.

        r   zThere are no active workers. This worker group has most likely been shut down. Pleasecreate a new WorkerGroup or restart this one.c                 ~    g | ]9} |j         j                            d j                   j        gR i :S )_RayTrainWorker__execute.name)r4   r%   optionsr!   rt   )r]   r   r   r   r   s     r   r   z-WorkerGroup.execute_async.<locals>.<listcomp>  s}     
 
 
 AG,44@@@ 5  T, , , ,$*, ,
 
 
r'   )rz   ro   r{   r   r   r   r   s    ```r   execute_asynczWorkerGroup.execute_async   sm     t|!!@  
 
 
 
 
 
 \	
 
 
 	
r'   c                 H    t          j         | j        |g|R i |          S )ad  Execute ``func`` on each worker and return the outputs of ``func``.

        Args:
            func: A function to call on each worker.
            args, kwargs: Passed directly into func.

        Returns:
            (List[T]) A list containing the output of ``func`` from each
                worker. The order is the same as ``self.workers``.

        )rD   getr   r   s       r   executezWorkerGroup.execute	  s2     w)t)$@@@@@@AAAr'   worker_indexc                     |t          | j                  k    rt          d| d| j         d           | j        |         j        j                            d|j                   j        |g|R i |S )aW  Execute ``func`` on worker ``worker_index`` and return futures.

        Args:
            worker_index: The index to execute func on.
            func: A function to call on the first worker.
            args, kwargs: Passed directly into func.

        Returns:
            (ObjectRef) An ObjectRef representing the output of func.

        zThe provided worker_index z is not valid for ry   r   r   )	rz   ro   rg   rT   r4   r%   r   r!   rt   r   r   r   r   r   s        r   execute_single_asyncz WorkerGroup.execute_single_async  s     3t|,,,,=\ = =!%!1= = =  
DL&+GG@@@ -4 - - D	+  	+ + + $*	+ +	
r'   c                 J    t          j         | j        ||g|R i |          S )a-  Execute ``func`` on worker with index ``worker_index``.

        Args:
            worker_index: The index to execute func on.
            func: A function to call on the first worker.
            args, kwargs: Passed directly into func.

        Returns:
            (T) The output of func.

        )rD   r   r   r   s        r   execute_singlezWorkerGroup.execute_single4  s4     w0t0tUdUUUfUUVVVr'   worker_indexesc                     g }t          t          | j                            D ]&}||vr |                    | j        |                    '|| _        dS )zRemoves the workers with the specified indexes.

        The removed workers will go out of scope and their actor processes
        will be terminated.

        Args:
            worker_indexes (List[int]): The indexes of the workers to remove.
        N)rangerz   ro   append)r   r   new_workersis       r   remove_workerszWorkerGroup.remove_workersE  s[     s4<(()) 	4 	4A&&""4<?333"r'   c                 "   g }g }t          |          D ]} | j                            | j                  j        | j        i | j        }|                    |           |                    |j                            d                              t                               t          j        |          }t          t          |                    D ]7}| j                            t          ||         ||                              8dS )au  Adds ``num_workers`` to this WorkerGroup.

        Note: Adding workers when the cluster/placement group is at capacity
        may lead to undefined hanging behavior. If you are attempting to
        replace existing workers in the WorkerGroup, remove_workers() should
        be called first.

        Args:
            num_workers: The number of workers to add.
        )rY   z+_RayTrainWorker__execute.construct_metadatar   )r4   r5   N)r   ru   r   rs   rt   rq   rr   r   r%   rO   rD   r   rz   ro   r3   )r   rT   
new_actorsnew_actor_metadata_r4   r5   r   s           r   r~   zWorkerGroup.add_workersT  s7    
{## 		 		AD$,, $ 5 -  d*F.2.DF FE e$$$%%.66F 7  &+,,    7-..s:'' 	S 	SALZ]Xa[ Q Q QRRRR	S 	Sr'   _first_node_idc                 \   t          t                    }|g ||<   | j        D ]'}||j        j                                     |           (dt          fd}|D ]}||                             |           g }|                                D ]}|	                    |           || _        dS )a  Reorder the workers by their node id and the lowest GPU id.

        This is useful for collocating workers on the same node.

        Example:
            Given workers with the following attributes:
                worker_0: node_id=1, gpu_ids=[1]
                worker_1: node_id=0, gpu_ids=[0]
                worker_2: node_id=1, gpu_ids=[0]
                worker_3: node_id=0, gpu_ids=[1]

            The function will perform the following steps:
                1. Group by node ID:
                    node_id=0: worker_1, worker_3
                    node_id=1: worker_0, worker_2

                2. Sort each group by GPU ID:
                    node_id=0: worker_1 (gpu_id=0), worker_3 (gpu_id=1)
                    node_id=1: worker_2 (gpu_id=0), worker_0 (gpu_id=1)

            Resulting in the order: [worker_1, worker_3, worker_2, worker_0]

        Args:
            _first_node_id: The first ID to group by.
                Set this to the node ID of the trainer coordinator to ensure that the
                rank 0 worker is on the same node, allowing additional resources to
                be specified for rank 0 workers via
                `ScalingConfig(trainer_resources=)`.
        Nr   c                     | j         j                            dg           }|sdS 	 t          d |D                       S # t          $ r t          |          cY S w xY w)Nra   r   c              3   4   K   | ]}t          |          V  d S r;   )r1   )r]   gpu_ids     r   r_   z\WorkerGroup.sort_workers_by_node_id_and_gpu_id.<locals>.get_lowest_gpu_id.<locals>.<genexpr>  s(      ==63v;;======r'   )r5   r-   r   minrg   )r   gpu_idss     r   get_lowest_gpu_idzIWorkerGroup.sort_workers_by_node_id_and_gpu_id.<locals>.get_lowest_gpu_id  sx    o266ubAAG q$==W====== $ $ $7||###$s   ? AA)key)
r   listro   r5   r*   r   r1   sortri   extend)r   r   node_id_to_workersr   r   r*   sorted_workersro   s           r   "sort_workers_by_node_id_and_gpu_idz.WorkerGroup.sort_workers_by_node_id_and_gpu_idr  s    < )..%13~.l 	G 	GFv67>>vFFFF	$ 	$ 	$ 	$ 	$ * 	D 	DGw',,1B,CCCC)0022 	+ 	+G!!'****%r'   c                 *    t          | j                  S r;   )rz   ro   r   s    r   __len__zWorkerGroup.__len__  s    4<   r'   c                 4    t          j        | j                  S )z'Get the resources allocated per worker.)rf   rj   rU   r   s    r   get_resources_per_workerz$WorkerGroup.get_resources_per_worker  s    }T6777r'   )rR   NNNNrS   )r   r;   )r!   r"   r#   r$   r1   r   r   r/   floatr
   r	   r   r   r=   rv   r   r   r   r   r   r   r   r   r   r   r~   r   r   dictr   r&   r'   r   rQ   rQ   h   s<          H ;?*.+/6?9 99 'tCJ'789 	9
 !9 #4.9 ~s239 9 9 9vP P P 5    6
(36"2 
Y 
 
 
 
6BHS!V, B$q' B B B B 

'/Q'7
	
 
 
 
6WW'/Q'7W	
W W W W"#T#Y # # # #Ss S S S S<<& <&# <& <& <& <&|! ! !8$ 8 8 8 8 8 8r'   rQ   r;   )'rf   loggingrL   rI   collectionsr   dataclassesr   typingr   r   r   r   r	   r
   r   r   rD   	ray.actorr   ray.air._internal.utilr   r   %ray.train._internal.base_worker_groupr   	ray.typesr   ray.util.placement_groupr   r   	getLoggerr!   r|   r   r)   r3   rB   rO   rQ   r&   r'   r   <module>r      s%     				  # # # # # # ! ! ! ! ! ! N N N N N N N N N N N N N N N N N N N N 



 ! ! ! ! ! ! C C C C C C C C A A A A A A       3 3 3 3 3 3GCLL		8	$	$8 8 8 8 8 8 8 8"        ,        " "HTN "d " " " "N    (M8 M8 M8 M8 M8/ M8 M8 M8 M8 M8r'   