
    &`iT-                     T   d Z ddlZddlZddlmZmZ ddlZddlmZ 	 ddlm	Z	m
Z
 n$# e$ r  ej        dej         d           Y nw xY wddlmZmZ dd	lmZmZmZ dd
lmZ ej        dk    rd Znd ZddZd ZddZdZdefdZd Zd Zd Z ddZ!d Z"dede e!de"e"fdZ#ddZ$dS ) zu
The following is adapted from Dask release 2021.03.1:
    https://github.com/dask/dask/blob/2021.03.1/dask/local.py
    N)EmptyQueue)config)DataNodeDependenciesMappingzEDask on Ray is available only on dask>=2024.11.0, you are on version .)local_callbacksunpack_callbacks)flattenget_dependenciesreverse_dict)orderntc                 V    	 	 |                      dd          S # t          $ r Y nw xY w))NTg?)blocktimeout)getr   qs    q/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/util/dask/scheduler_utils.py	queue_getr      sG    	uu4u555   	s    
''c                 *    |                                  S N)r   r   s    r   r   r   &   s    uuww    c           
         |t          |           j        }|t          j        dd          }|t                      }t	                      |                                 D ]<\  }}t          |t                    r" |            ||<                       |           =| 	                                }|
                    |           t          |           }fd|                                D             }t          |          }|D ]6}	|                    |	d          D ]}
||
                             |	           7d |                                D             }d |                                D             }t          ||d          }d	 |                                D             }||||||t	                      t	                      t	                      d
	}|S )a  Start state from a dask
    Examples
    --------
    >>> dsk = {
        'x': 1,
        'y': 2,
        'z': (inc, 'x'),
        'w': (add, 'z', 'y')}  # doctest: +SKIP
    >>> from pprint import pprint  # doctest: +SKIP
    >>> pprint(start_state_from_dask(dsk))  # doctest: +SKIP
    {'cache': {'x': 1, 'y': 2},
     'dependencies': {'w': {'z', 'y'}, 'x': set(), 'y': set(), 'z': {'x'}},
     'dependents': {'w': set(), 'x': {'z'}, 'y': {'w'}, 'z': {'w'}},
     'finished': set(),
     'ready': ['z'],
     'released': set(),
     'running': set(),
     'waiting': {'w': {'z'}},
     'waiting_data': {'x': {'z'}, 'y': {'w'}, 'z': {'w'}}}
    Ncachec                 >    i | ]\  }}|v	|t          |          S  )set).0kv	data_keyss      r   
<dictcomp>z)start_state_from_dask.<locals>.<dictcomp>P   s/    PPPTQQi=O=Oq#a&&=O=O=Or   r   c                 B    i | ]\  }}|||                                 S r   )copyr    r!   r"   s      r   r$   z)start_state_from_dask.<locals>.<dictcomp>V   s+    DDDDAq!DAqvvxxDDDr   c                     h | ]	\  }}||
S r   r   r'   s      r   	<setcomp>z(start_state_from_dask.<locals>.<setcomp>X   s!    888tq!a8888r   Tkeyreversec                     i | ]
\  }}|||S r   r   r'   s      r   r$   z)start_state_from_dask.<locals>.<dictcomp>Z   s#    555115q!555r   )	dependencies
dependentswaitingwaiting_datar   readyrunningfinishedreleased)r   r   r   dictr   items
isinstancer   addr&   updater   r   removesorted)dskr   sortkeyr!   r"   dsk2r.   r0   r/   abr1   	ready_setr2   stater#   s                  @r   start_state_from_daskrD   *   s   * **.}
7D))}I		  1a"" 	qssE!HMM!88::DKK&s++LPPPP\%7%7%9%9PPPGl++J ! !2&& 	! 	!AAJa    	!DDJ,<,<,>,>DDDL88w}}888I9'4888E55555G % $55EEEE
 
E Lr   c                     	  ||          \  }} ||          } |            }	 |||	f          }d}
n%# t           $ r} |||          }d}
Y d}~nd}~ww xY w| ||
fS )zx
    Compute task and handle all administration
    See Also
    --------
    _execute_task : actually execute task
    FTN)BaseException)r+   	task_infodumpsloadsget_idpack_exceptiontaskdataresultidfailedes               r   execute_taskrR   k   s    U9%%
ddVXX|$$   5)) s   25 
AAATc                     | |d         v r|d         |          rJ |d         | = |d                              |            |r|d         | = dS dS )zQRemove data from temporary storage
    See Also
    --------
    finish_task
    r1   r5   r   N)r9   )r+   rC   deletes      r   release_datarU   ~   st     eN###(----.!#&	*#  'N3   r   Fc                    t          |d         |         |d          D ]K}|d         |         }|                    |           |s$|d         |= |d                             |           L|d         |         D ]}||d         v r|d         |         }|                    |           |sj||vrft          rQdd	lm}	 t          d
||t          t          |	|d         	                                          dz            fz              ||||           |r||vr ||||           |d         
                    |           |d                             |           |S )zn
    Update execution state after a task finishes
    Mutates.  This should run atomically (with a lock).
    r/   Tr*   r0   r2   r.   r1   r   )nbytesz&Key: %s	Dep: %s	 NBytes: %.2f	 Releaser   g    .A)rT   r4   r3   )r<   r;   appendDEBUG
chest.corerW   printsummapvaluesr9   )
r=   r+   rC   resultsr>   rT   rU   depsrW   s
             r   finish_taskrb      s    eL)#.GTJJJ ' ')S!	 	'i %'N!!#&&&^$S) 4 4%'''n%c*AHHSMMM 8G++ 111111CSVU7^5J5J5L5L)M)MPS)S%T%TUV   S%7777 	47**LeF3333	*#	)C   Lr   c                 t    t          | t                    rt          fd| D                       S |          S )zGet nested index from collection
    Examples
    --------
    >>> nested_get(1, 'abc')
    'b'
    >>> nested_get([1, 0], 'abc')
    ('b', 'a')
    >>> nested_get([[1, 0], [0, 1]], 'abc')
    (('b', 'a'), ('a', 'b'))
    c              3   8   K   | ]}t          |          V  d S r   )
nested_get)r    icolls     r   	<genexpr>znested_get.<locals>.<genexpr>   s-      66QZ4((666666r   )r8   listtuple)indrg   s    `r   re   re      sD     #t 6666#666666Cyr   c                      dS )zDefault get_idNr   r   r   r   default_get_idrm      s    4r   c                       r   r   )rQ   rH   s     r   default_pack_exceptionro      s    	r   c                 B    | j         |ur|                     |          | r   )__traceback__with_traceback)exctbs     r   reraiseru      s)    
""  $$$
Ir   c                     | S )z<Identity function. Returns x.
    >>> identity(3)
    3
    r   )xs    r   identityrx      s	    
 Hr   c           
      R   
"#$ t                      #t          |t                    rt          t	          |                    }n|h}t          |          }t                    t          |	          5 }	t          |	          \  }}"}}g }d}i $	 |	D ]0}|d         r |d                    |                    |           1t                    }t          ||j                  $|	D ]\  }}}}}|r |$           |t          j        dd          }$d         r$d         st          d           
"#$f	d	}$d         rDt          $d
                   |k     r+ |             $d         rt          $d
                   |k     +$d         s$d         s	$d
         rt          #          \  }}}|rL |          \  }}|r0$fdt!          |          D             }|         } ||           n |||            |          \  }}|$d         |<   t#          |$||j                   |D ]}  | ||$|           $d         rDt          $d
                   |k     r+ |             $d         rt          $d
                   |k     +$d         $d         $d
         d}|D ]\  }}}}}!|!r |!$|            n # |D ]\  }}}}}!|!r |!$|            w xY w	 ddd           n# 1 swxY w Y   t%          |$d                   S )a  Asynchronous get function
    This is a general version of various asynchronous schedulers for dask.  It
    takes a an apply_async function as found on Pool objects to form a more
    specific ``get`` method that walks through the dask array with parallel
    workers, avoiding repeat computation and minimizing memory use.
    Parameters
    ----------
    apply_async : function
        Asynchronous apply function as found on Pool or ThreadPool
    num_workers : int
        The number of active tasks we should have at any one time
    dsk : dict
        A dask dictionary specifying a workflow
    result : key or list of keys
        Keys corresponding to desired data
    cache : dict-like, optional
        Temporary storage of results
    get_id : callable, optional
        Function to return the worker id, takes no arguments. Examples are
        `threading.current_thread` and `multiprocessing.current_process`.
    rerun_exceptions_locally : bool, optional
        Whether to rerun failing tasks in local process to enable debugging
        (False by default)
    pack_exception : callable, optional
        Function to take an exception and ``dumps`` method, and return a
        serialized tuple of ``(exception, traceback)`` to send back to the
        scheduler. Default is to just raise the exception.
    raise_exception : callable, optional
        Function that takes an exception and a traceback, and raises an error.
    dumps: callable, optional
        Function to serialize task data and results to communicate between
        worker and parent.  Defaults to identity.
    loads: callable, optional
        Inverse function of `dumps`.  Defaults to identity.
    callbacks : tuple or list of tuples, optional
        Callbacks are passed in as tuples of length 5. Multiple sets of
        callbacks may be passed in as a list of tuples. For more information,
        see the dask.diagnostics documentation.
    See Also
    --------
    threaded.get
    Fr   )r   r>   Nrerun_exceptions_locallyr0   r2   z Found no accessible jobs in daskc            	      ,  	 d                                          } d                             |            	D ]} ||            fdt          |           D             } t          |  |          |f          f
j                   dS )z"Fire off a task to the thread poolr2   r3   c                 .    i | ]}|d          |         S r   r   r    r`   rC   s     r   r$   z0get_async.<locals>.fire_task.<locals>.<dictcomp>C  s$    WWWSU7^C0WWWr   )argscallbackN)popr9   r   rR   put)r+   frM   apply_asyncr=   rH   rJ   rI   rK   pretask_cbsqueuerC   s      r   	fire_taskzget_async.<locals>.fire_task:  s     Gn((**i $$S)))$ ' 'AAc3&&&& XWWW<LSRU<V<VWWW s3x.//& #Y     r   r3   c                 .    i | ]}|d          |         S r}   r   r~   s     r   r$   zget_async.<locals>.<dictcomp>\  s4           #  w!4     r   r   T)r   r8   ri   r   r   r6   r	   r
   rX   r   rD   r   r   
ValueErrorlenr   r   rb   re   )%r   num_workersr=   rN   r   rJ   rz   rK   raise_exception	callbacksrH   rI   kwargsresult_flatr_   _posttask_cbsstarted_cbs	succeededcbkeyorderstart_stater   r+   res_inforP   rs   rt   rM   rL   res	worker_idr   finishr   r   rC   s%   ` `  ` `  ``                      @@@r   	get_asyncr      s   r GGE&$ '&//**h+G
s))C		#	# S6y-=i-H-H*1k<	 L	6 ' 'a5 BqE#JJJ""2&&&&SzzH)#UHLQQQE+4 , ,';1a ,KU+++'/+1:6PRW+X+X(Y Eg E !CDDD            2 . Sy)9%:%:[%H%H	 . Sy)9%:%:[%H%H 	"  eGn  i8H  (1%(8(8%Xv 
1#eHooGC/ 1       '7S'A'A       #3xT



'R000!&xY&)gs#CeWhlCCC% 7 7AAc3UI6666Gn  U9-=)>)>)L)LIKKK Gn  U9-=)>)>)L)L' 	"  eGn  i8H  , I '2 6 6"1aF 6F39}5556k 6 6"1aF 6F39}5556 6cS6 S6 S6 S6 S6 S6 S6 S6 S6 S6 S6 S6 S6 S6 S6j feGn---s+   :LH(K?LK88LLLr   c                 <    |i } | |i |}| ||           dS dS )z*A naive synchronous version of apply_asyncNr   )funcr   kwdsr   r   s        r   
apply_syncr   w  s?    |
$


C r   )NN)Tr   )r   NN)%__doc__oswarningsr   r   r   daskr   dask._task_specr   r   ImportErrorwarn__version__dask.callbacksr	   r
   	dask.corer   r   r   
dask.orderr   namer   rD   rR   rU   rY   rb   re   rm   ro   ru   rx   r   r   r   r   r   <module>r      s?   
 
			                =========   HM	2".	2 	2 	2    
 = < < < < < < < = = = = = = = = = =      7d??
     > > > >B  &         	 /3! ! ! !H  "  

 
 
      !)

W. W. W. W.t     s   ' AA