
    &`iw_                     j   d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d dlZd dlmZmZ 	 d d	lmZmZmZmZmZ n$# e$ r  ej        d
ej         d           Y nw xY wd dlmZ d dlm Z m!Z! d dl"Z"d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z)m*Z*  ej+                    Z,da- ee.          Z/ ej0                    Z1dZ2	 	 d%dee3         dee4         dej5        j6        fdZ7d Z8d Z9d Z:d Z;d Z<e"j=        d             Z>d Z?d&dZ@d  ZAe
 G d! d"                      ZBd# ZCd$ ZDdS )'    N)OrderedDictdefaultdict)Mapping)	dataclass)
ThreadPool)pprint)Optional)
ishashableistask)AliasDataNodeTaskTaskRefconvert_legacy_graphzEDask on Ray is available only on dask>=2024.11.0, you are on version .)	CPU_COUNT)_thread_get_idpack_exception)local_ray_callbacksunpack_ray_callbacks)unpack_object_refs)
apply_sync	get_asynczUse ray_remote_args={"resources": {...}} instead of resources={...} to specify required Ray task resources; see https://docs.ray.io/en/master/ray-core/package-ref.html#ray-remote.tasksTshuffleuse_shuffle_optimizationreturnc                 f    |rddl m} nd}t          j                            t
          | |          S )a  
    Enable Dask-on-Ray scheduler. This helper sets the Dask-on-Ray scheduler
    as the default Dask scheduler in the Dask config. By default, it will also
    cause the task-based shuffle to be used for any Dask shuffle operations
    (required for multi-node Ray clusters, not sharing a filesystem), and will
    enable a Ray-specific shuffle optimization.

    >>> enable_dask_on_ray()
    >>> ddf.compute()  # <-- will use the Dask-on-Ray scheduler.

    If used as a context manager, the Dask-on-Ray scheduler will only be used
    within the context's scope.

    >>> with enable_dask_on_ray():
    ...     ddf.compute()  # <-- will use the Dask-on-Ray scheduler.
    >>> ddf.compute()  # <-- won't use the Dask-on-Ray scheduler.

    Args:
        shuffle: The shuffle method used by Dask, either "tasks" or
            "disk". This should be "tasks" if using a multi-node Ray cluster.
            Defaults to "tasks".
        use_shuffle_optimization: Enable our custom Ray-specific shuffle
            optimization. Defaults to True.
    Returns:
        The Dask config object, which can be used as a context manager to limit
        the scope of the Dask-on-Ray scheduler to the corresponding context.
    r   )dataframe_optimizeN	schedulerr   r   )ray.util.dask.optimizationsr   daskconfigsetray_dask_get)r   r   r   s      k/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/dask/scheduler.pyenable_dask_on_rayr(   *   sN    >   "BBBBBBB!
 ;??DV        c                  F    t           j                            ddd          S )zH
    Unsets the scheduler, shuffle method, and DataFrame optimizer.
    Nr    )r#   r$   r%    r)   r'   disable_dask_on_rayr,   V   s     ;??T4D?QQQr)   c                 $   |                     dd          }|                     dd          }t          j                    }|t          5  |J|t          u rAt
          2t          t                    at          j	        t
          j
                   t
          }nd|t          v r#|t          |         v rt          |         |         }n8t          |          }t          j	        |j
                   |t          |         |<   ddd           n# 1 swxY w Y   |                     dd          }|                     dd          }|                     dd          }d|v rt          t                    |                     d	i           }	t          j                    }
d|
v rt          t                    t!          | t"                    s,t%          | d
          r| j        } n|                                 } t+          | |
|	          }t-          |          5 }t/          |          \  }}}}}}t1          |           } t3          t5          |j        t8          |||||          t;          |j                  | |ft>          t@          d|}||D ]} |||            ~ |r|}n)d}|rtC          j"        d          }tG          ||          }||D ]} ||           ddd           n# 1 swxY w Y   t          5  tI          t          j%                              }|t          ur^tM          t                    D ]I}||vrCt                               |          '                                D ]}|
                                 Jddd           n# 1 swxY w Y   |S )a  
    A Dask-Ray scheduler. This scheduler will send top-level (non-inlined) Dask
    tasks to a Ray cluster for execution. The scheduler will wait for the
    tasks to finish executing, fetch the results, and repackage them into the
    appropriate Dask collections. This particular scheduler uses a threadpool
    to submit Ray tasks.

    This can be passed directly to `dask.compute()`, as the scheduler:

    >>> dask.compute(obj, scheduler=ray_dask_get)

    You can override the currently active global Dask-Ray callbacks (e.g.
    supplied via a context manager), the number of threads to use when
    submitting the Ray tasks, or the threadpool used to submit Ray tasks:

    >>> dask.compute(
            obj,
            scheduler=ray_dask_get,
            ray_callbacks=some_ray_dask_callbacks,
            num_workers=8,
            pool=some_cool_pool,
        )

    Args:
        dsk: Dask graph, represented as a task DAG dictionary.
        keys (List[str]): List of Dask graph keys whose values we wish to
            compute and return.
        ray_callbacks (Optional[list[callable]]): Dask-Ray callbacks.
        num_workers (Optional[int]): The number of worker threads to use in
            the Ray task submission traversal of the Dask graph.
        pool (Optional[ThreadPool]): A multiprocessing threadpool to use to
            submit Ray tasks.

    Returns:
        Computed values corresponding to the provided keys.
    num_workersNpoolray_callbacksray_persistF_ray_enable_progress_bar	resourcesray_remote_args_optimized_dsk)get_idr   _dask_on_ray_pb)progress_bar_actor)(pop	threadingcurrent_thread
pools_lockmain_threaddefault_poolr   r   atexitregisterclosepools
ValueErrorTOP_LEVEL_RESOURCES_ERR_MSGr#   get_annotations
isinstancer   hasattrr5   __dask_graph__!_build_key_scoped_ray_remote_argsr   r   r   r   _apply_async_wrapperapply_async_rayify_task_wrapperlen_poolr   r   ray	get_actorray_get_unpackr%   	enumeratelistvalues)dskkeyskwargsr.   r/   threadr0   persistenable_progress_barr4   annotationsscoped_ray_remote_argsray_presubmit_cbsray_postsubmit_cbsray_pretask_cbsray_posttask_cbsray_postsubmit_all_cbsray_finish_cbsobject_refscbresultpb_actoractive_threadstps                            r'   r&   r&   ]   s   J **]D11K::fd##D
 %''F| 	2 	2"v'<'<'#-i#8#8LOL$6777#5[E&M%A%AV}[1!+..
+++-1fk*	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 	2 JJ55Mjj..G **%?FF f4555jj!2B77O&((Kk!!4555 c7## '3()) 	'$CC $$&&C>[/  
]	+	+ 0} !//	
" #3''    $!" &  
OO
 ")
 
 
 
" "-, % %;$$$$
  	N FFH" <=):;;#KHMMMF%$  6



a0 0 0 0 0 0 0 0 0 0 0 0 0 0 0f 
 " "Y02233$$%[[ " "N**"YYq\\0022 " "				" " " " " " " " " " " " " " " Ms9   	B1DD
D
CKK#&K#1BNN	N	c                       d fd	}|S )a  
    Wraps the given pool `apply_async` function, hotswapping `real_func` in as
    the function to be applied and adding `extra_args` and `extra_kwargs` to
    `real_func`'s call.

    Args:
        apply_async: The pool function to be wrapped.
        real_func: The real function that we wish the pool apply
            function to execute.
        *extra_args: Extra positional arguments to pass to the `real_func`.
        **extra_kwargs: Extra keyword arguments to pass to the `real_func`.

    Returns:
        A wrapper function that will ignore it's first `func` argument and
        pass `real_func` in its place. To be passed to `dask.local.get_async`.
    r+   Nc           	      F    |si } |z   t          |fi |          S )N)argskwdscallback)dict)funcrl   rm   rn   rK   
extra_argsextra_kwargs	real_funcs       r'   wrapperz%_apply_async_wrapper.<locals>.wrapper   sJ     	D{
"d++l++	
 
 
 	
r)   )r+   NNr+   )rK   rs   rq   rr   rt   s   ```` r'   rJ   rJ      s:    $
 
 
 
 
 
 
 
 
 Nr)   c                     	  ||          \  }}t          || |||||	|
                    | i                     } |            } |||f          }d}n%# t          $ r} |||          }d}Y d}~nd}~ww xY w| ||fS )a  
    The core Ray-Dask task execution wrapper, to be given to the thread pool's
    `apply_async` function. Exactly the same as `execute_task`, except that it
    calls `_rayify_task` on the task instead of `_execute_task`.

    Args:
        key: The Dask graph key whose corresponding task we wish to
            execute.
        task_info: The task to execute and its dependencies.
        dumps: A result serializing function.
        loads: A task_info deserializing function.
        get_id: An ID generating function.
        pack_exception: An exception serializing function.
        ray_presubmit_cbs: Pre-task submission callbacks.
        ray_postsubmit_cbs: Post-task submission callbacks.
        ray_pretask_cbs: Pre-task execution callbacks.
        ray_posttask_cbs: Post-task execution callbacks.
        scoped_ray_remote_args: Ray task options for each key.

    Returns:
        A 3-tuple of the task's key, a literal or a Ray object reference for a
        Ray task's result, and whether the Ray task submission failed.
    FTN)_rayify_taskgetBaseException)key	task_infodumpsloadsr6   r   r]   r^   r_   r`   r\   taskdepsre   idfailedes                    r'   rL   rL     s    HU9%%
d"&&sB//	
 	
 VXX|$$   5)) s   AA 
A6A11A6c           	          t           t                    rfd D             S t                     r fdD             }|D ]}	|	|	c S 	t           t                    r1 j        }
t          |
t
                    r|
j                 S |
         S t           t                    r j        }nt          dt                     z            t                    \  }} t          j        ddt          |t                    sdn|j        dj         |g|R  }D ]} | |           |S t#                     s S  v r          S  S )	a  
    Rayifies the given task, submitting it as a Ray task to the Ray cluster.

    Args:
        task: A Dask graph value, being either a literal, dependency
            key, Dask task, or a list thereof.
        key: The Dask graph key for the given task.
        deps: The dependencies of this task.
        ray_presubmit_cbs: Pre-task submission callbacks.
        ray_postsubmit_cbs: Post-task submission callbacks.
        ray_pretask_cbs: Pre-task execution callbacks.
        ray_posttask_cbs: Post-task execution callbacks.
        ray_remote_args: Ray task options. See :func:`ray.remote` for details.

    Returns:
        A literal, a Ray object reference representing a submitted task, or a
        list thereof.
    c                 <    g | ]}t          |          S r+   )rv   )	.0rh   r~   ry   r^   r`   r]   r_   r4   s	     r'   
<listcomp>z _rayify_task.<locals>.<listcomp>c  sM     
 
 
  !" 	 	
 
 
r)   Nc                 *    g | ]} |          S r+   r+   )r   rd   r~   ry   r}   s     r'   r   z _rayify_task.<locals>.<listcomp>t  s'     Q Q QD#t!4!4 Q Q Qr)   zInvalid task type: %szdask:   )namenum_returnsr+   )rF   rS   r   r   targetr   ry   r   rp   rC   typer   dask_task_wrapperoptionsMultipleReturnFuncr   remoter
   )r}   ry   r~   r]   r^   r_   r`   r4   alternate_returnsalternate_returnr   rp   arg_object_refsrepackrc   rd   s   ````````        r'   rv   rv   D  s;   8 $ H
 
 
 
 
 
 
 
 
 
 
 
 
 	
 
 8 ( Q Q Q Q Q Q?P Q Q Q$5 , , 
 $/++++ 0 dE"" 	C[F&'** $FJ'' F|#d## 	C9DD4tDzzABBB
 #5T":":'/ 
 #D*<==S4CS
 

 
 
 
 
 
 
 )( 1 14dK0000 	Dzr)   c                     |fd|D             } |          \  }fd |           }  | |          }|%t          ||          D ]\  }	}
|	 |	||
           |S )ar  
    A Ray remote function acting as a Dask task wrapper. This function will
    repackage the given `arg_object_refs` into its original `deps` using
    `repack`, and then pass it to the provided Dask Task object , `task`.

    Args:
        task: The Dask Task class object to execute.
        repack: A function that repackages the provided args into
            the original (possibly nested) Python objects.
        key: The Dask key for this task.
        ray_pretask_cbs: Pre-task execution callbacks.
        ray_posttask_cbs: Post-task execution callback.
        *arg_object_refs (ObjectRef): Ray object references representing the dependencies'
            results.

    Returns:
        The output of the Dask task. In the context of Ray, a
        dask_task_wrapper.remote() invocation will return a Ray object
        reference representing the Ray task's result.
    Nc                 0    g | ]}| |          nd S Nr+   )r   rd   r   ry   s     r'   r   z%dask_task_wrapper.<locals>.<listcomp>  s>     
 
 
 )+BBsO$$$D
 
 
r)   c                 P   t          | t                    r | j                  | _        | S t          | t                    r fd|                                 D             S t          | t
                    rt          fd| D                       S t          | t          j                  rt          j        |           S t          | t                    rPt          | j
        t          j                  r/t          j        | j
                  }t          | j        |          S | S | S )Nc                 .    i | ]\  }}| |          S r+   r+   )r   kv_dereference_argss      r'   
<dictcomp>z@dask_task_wrapper.<locals>._dereference_args.<locals>.<dictcomp>  s+    BBB1A((++BBBr)   c              3   .   K   | ]} |          V  d S r   r+   )r   xr   s     r'   	<genexpr>z?dask_task_wrapper.<locals>._dereference_args.<locals>.<genexpr>  s/      99!**1--999999r)   )ry   value)rF   r   rl   r   itemstuplerO   	ObjectRefrw   r   r   ry   )r   r   r   s     r'   r   z,dask_task_wrapper.<locals>._dereference_args  s   a 	&&qv..AFH7## 	BBBB		BBBB5!! 
	9999q9999993=)) 	71::8$$ 	!'3=11 8((AE7777HHr)   )zip)r}   r   ry   r_   r`   r   
pre_statesrepacked_depsre   rd   	pre_stater   s     `  `     @r'   r   r     s    0 "
 
 
 
 
%
 
 

 vo..]    $ T""DT-  F# !1:>> 	+ 	+MB	~3	***Mr)   c                 :   ddl m } t          j        | j                                                  \  }}d} ||d          }|                    d           g }||k     rt          j        | j                                                  \  }}|                    ||z
             |}t          j        |dt          |          d          \  }}	t          |          t          |          k    rnt          j
        d           ||k     |                                 t          j        | j                                                  \  }}||k    rt          d           t          t          j        | j                                                             d S )	Nr   )tqdm)totalposition F)timeoutr   fetch_localg?z)Completed. There was state inconsistency.)r   rO   rw   re   r   set_descriptionupdatewaitrM   timesleeprA   printr   report)
trackerrc   r   r   finishedreported_finished_so_farpb_bar
ready_refs	submitted_s
             r'   render_progress_barr     s    ggn335566OE8 T***F
2J
U

!ggn&;&;&=&=>>	8h!99:::#+ K0@0@e
 
 

A z??c+....
3 U

 LLNNN''."7"7"9"9::IxH9:::
377>((**++,,,,,r)   c                    fd}t          | t                    rt          |           } t          | t                    r;t          d | D                       r"t	          |  \  } } ||           } ||          S  ||           S )am  
    Unpacks object references, gets the object references, and repacks.
    Traverses arbitrary data structures.

    Args:
        object_refs: A (potentially nested) Python object containing Ray object
            references.

    Returns:
        The input Python object with all contained Ray object references
        resolved with their concrete values.
    c                 P    rt          |            t          j        |           S r   )r   rO   rw   )rc   r8   s    r'   
get_resultz"ray_get_unpack.<locals>.get_result  s-     	A 2K@@@w{###r)   c              3   L   K   | ]}t          |t          j                   V   d S r   )rF   rO   r   )r   r   s     r'   r   z!ray_get_unpack.<locals>.<genexpr>  sB       - --.Jq#-(((- - - - - -r)   )rF   r   rS   anyr   )rc   r8   r   r   computed_results    `   r'   rQ   rQ     s    $ $ $ $ $
 +u%% (;''+t$$ ' - -2=- - - * * ' 1+>V$*[11vo&&&z+&&&r)   c                    |                     dd          }|                     dd          }t          |          5 }t          |          \  }}}}}	}
t          |           } t	          t          t          t          ||||          d| |fi |}|	|	D ]} |||            ~ |r|}nt          |          }|
|
D ]} ||           |cddd           S # 1 swxY w Y   dS )a  
    A synchronous Dask-Ray scheduler. This scheduler will send top-level
    (non-inlined) Dask tasks to a Ray cluster for execution. The scheduler will
    wait for the tasks to finish executing, fetch the results, and repackage
    them into the appropriate Dask collections. This particular scheduler
    submits Ray tasks synchronously, which can be useful for debugging.

    This can be passed directly to `dask.compute()`, as the scheduler:

    >>> dask.compute(obj, scheduler=ray_dask_get_sync)

    You can override the currently active global Dask-Ray callbacks (e.g.
    supplied via a context manager):

    >>> dask.compute(
            obj,
            scheduler=ray_dask_get_sync,
            ray_callbacks=some_ray_dask_callbacks,
        )

    Args:
        dsk: Dask graph, represented as a task DAG dictionary.
        keys (List[str]): List of Dask graph keys whose values we wish to
            compute and return.

    Returns:
        Computed values corresponding to the provided keys.
    r0   Nr1   Fr   )	r9   r   r   r   r   rJ   r   rL   rQ   )rU   rV   rW   r0   rY   r]   r^   r_   r`   ra   rb   rc   rd   re   s                 r'   ray_dask_get_syncr   (  s   < JJ55Mjj..G	]	+	+ ,} !//	
" #3''   $!"   
 
 
 
 "-, % %;$$$$
  	1 FF#K00F%$  6



Y, , , , , , , , , , , , , , , , , ,s   BCCCc                   *    e Zd ZU eed<   eed<   d ZdS )r   rp   r   c                      | j         |i |t          t                    st          t                    r(fdt	          t                              D             S )Nc                      g | ]
}|         S r+   r+   )r   r   returnss     r'   r   z/MultipleReturnFunc.__call__.<locals>.<listcomp>  s    ???awqz???r)   )rp   rF   ro   r   rangerM   )selfrl   rW   r   s      @r'   __call__zMultipleReturnFunc.__call__}  sj    $)T,V,,gt$$ 	@
7K(H(H 	@????5W+>+>???Gr)   N)__name__
__module____qualname__callable__annotations__intr   r+   r)   r'   r   r   x  s:         
NNN    r)   r   c                     | |         S r   r+   )multiple_returnsidxs     r'   multiple_return_getr     s    C  r)   c                     t           t          j        j                  s4t          j        j                            t                      d           i } fd                                 D             }|D ]\  }}|j        }||}nd|v rt          t                    |
                                D ]Y}|                                }	|	                    |           |	                    |                    |i                      |	||<   Zi }
|                                D ]G\  }}|                                }|                    |                    di                      ||
|<   H|
S )Nr+   )dependenciesc                 .    g | ]}|j         |         fS r+   )layers)r   r   rU   s     r'   r   z5_build_key_scoped_ray_remote_args.<locals>.<listcomp>  s%    JJJ4tSZ%&JJJr)   r3   r4   )rF   r#   highlevelgraphHighLevelGraphfrom_collectionsr   _toposort_layersr[   rC   rD   get_output_keyscopyr   rw   r   )rU   r[   r4   scoped_annotationsr   id_layerlayer_annotationsry   layer_annotations_for_keyr\   layer_ray_remote_argss   `           r'   rI   rI     s   c4.=>> 
!0AAsGGSr B 
 
 JJJJ33G3G3I3IJJJF @ @
U!-$ +---8999((** 	@ 	@C(3(8(8(:(:%%,,->???%,,-?-C-CC-L-LMMM&?s##	@  .4466 < <[ / 4 4 6 6 	$$[__5F%K%KLLL&;s##!!r)   )r   Tr   )Er?   r:   r   warningscollectionsr   r   collections.abcr   dataclassesr   multiprocessing.poolr   r   typingr	   r#   	dask.corer
   r   dask._task_specr   r   r   r   r   ImportErrorwarn__version__dask.systemr   dask.threadedr   r   rO   ray.util.dask.callbacksr   r   ray.util.dask.commonr   ray.util.dask.scheduler_utilsr   r   r;   r=   r>   ro   rB   Lockr<   rD   strboolr$   r%   r(   r,   r&   rJ   rL   rv   r   r   r   rQ   r   r   r   rI   r+   r)   r'   <module>r      sK          0 0 0 0 0 0 0 0 # # # # # # ! ! ! ! ! ! + + + + + +              ( ( ( ( ( ( ( (TTTTTTTTTTTTTTT   HM	2".	2 	2 	2    
 " ! ! ! ! ! 8 8 8 8 8 8 8 8 



 M M M M M M M M 3 3 3 3 3 3 ? ? ? ? ? ? ? ?&i&((DY^
J  %/3) )c])&tn) 
[_) ) ) )XR R RL L L^  >6 6 6rd d dN 8 8 8v- - -:!' !' !' !'HM M M`        ! ! !" " " " "s   A A43A4