
    `in                        d dl mZ d dlmZ d dlmZ d dlmZmZ d dl	Z	e	j
        j        ZdZdZdZd	Zd
Zg dZedefd            ZdededefdZ	 ddedededededee         fdZdedededeegef         dedeee                  fdZdededefdZ	 	 	 	 d dedededee         deeegef                  deddfdZdS )!    )Iterable)contextmanager)	timedelta)CallableOptionalNz/num_membersz/last_memberz/TRACEz/TRACING_GATE   )store_timeoutget_allsynchronizebarriertimeoutc              #      K   | j         }|                     t          |                     dV  |                     |           dS )z
    This sets the timeout and then restores the old timeout when the context
    manager exits.

    Args:
        store: the store to set the timeout on
        timeout: the timeout to set
    secondsN)r   set_timeoutr   )storer   old_timeouts      y/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torch/distributed/elastic/utils/store.pyr	   r	      sQ       -K	i000111	EEE	k"""""    rankprefix
world_sizec                     |                      fdt          |          D                       }t          | | d          }|dk    r|                     |g           |S )ad  
    Given a store and a prefix, the method goes through the array of keys
    of the following format: ``{prefix}{idx}``, where idx is in a range
    from 0 to size, and tries to retrieve the data.

    The Rank0 process waits at the end to make sure all other processes
    finished the procedure before exiting.

    Usage

    ::

     values = get_all(store, "torchelastic/data", 3)
     value1 = values[0]  # retrieves the data for key torchelastic/data0
     value2 = values[1]  # retrieves the data for key torchelastic/data1
     value3 = values[2]  # retrieves the data for key torchelastic/data2

    c                     g | ]} | 	S  r   ).0idxr   s     r   
<listcomp>zget_all.<locals>.<listcomp>B   s$    NNNS6 03 0 0NNNr   z	/finishedr   r   
key_prefixr   )	multi_getrange_barrier_nonblockingwait)r   r   r   r   data_arrbarrier_keys     `   r   r
   r
   /   s    & NNNNE*<M<MNNNOOH&'''  K
 qyy 	

K=!!!Or   ,  datar    returnc                     t          | |          5  |                     | | |           t          | |||          }|cddd           S # 1 swxY w Y   dS )aT  
    Synchronizes ``world_size`` agents between each other using the underlying c10d store.
    The ``data`` will be available on each of the agents.

    Note: The data on the path is not deleted, as a result there can be stale data if
        you use the same key_prefix twice.

    Time complexity: O(N) per worker, O(N^2) globally.
    N)r	   setr
   )r   r(   r   r   r    r   
agent_datas          r   r   r   R   s    " 
ug	&	&  		Z'''...UD*jAA
                 s   .AAArank_decodertrace_timeoutc                                             | t           d            fd} fd}|dk    r+ |            }                       t           d           |S  |            S )N<val_ignored>c                     t                      } d}t          d          D ]}|t          k    r n	 |dk    r1                     | t           gt                               n0                     | t           gt          d                     x# t          $ r& |dz  }|                      |                     Y w xY w| S )Nr      r   )milliseconds)r+   r"   _MAX_TRACE_MISSING_RANKSr$   _TRACEr   DistStoreErroradd)missing_rank_inforanks_missingir    r-   r   r.   r   s      r   _find_missing_ranksz9_try_detecting_missing_ranks.<locals>._find_missing_rankss   s   EEq*%% 	7 	7A  888
7 A%%JJ&336334i6V6V6V   
 JJ: :q :& : :;YTU=V=V=VWWW! 7 7 7"!%%ll1oo666667 ! s   A'B-C
	C
c                      	                        t           g           d d           dgS # t          $ r Y d S w xY w)Nz[<check rank 0 (r   z) for missing rank info>])r$   _TRACING_GATEr6   )r    r-   r   s   r   _checkinz._try_detecting_missing_ranks.<locals>._checkin   sf    	JJ:6}667888Q||AQQQRR 	 	 	44	s   .2 
A A r   )r+   r5   r=   )	r   r   r    r   r-   r.   r;   r>   r8   s	   ``` ``   r   _try_detecting_missing_ranksr?   i   s     
II+T+6++_===! ! ! ! ! ! ! ! !*       qyy//11		Z000/BBB  xzzr   c                     |t           z   }|t          z   }|                     |d          }||k    r|                     |d           |S )zq
    Does all the non-blocking operations for a barrier and returns the final key
    that can be waited on.
    r2   r0   )_NUM_MEMBERS_LAST_MEMBER_CHECKINr7   r+   )r   r   r    num_members_keylast_member_keyr   s         r   r#   r#      sQ    
 !</O #77O
))OQ
'
'C
j		/?333r   
   barrier_timeoutrank_tracing_decoderc                    ||
J d            t          | |          5  t          | ||          }	 |                     |g           no# t          $ rb}||t	          | ||||pd |          }	|	>t          d                    |||dd                    |	           d|                    d|d}~ww xY w	 ddd           dS # 1 swxY w Y   dS )	at  
    A global lock between agents. This will pause all workers until at least
    ``world_size`` workers respond.

    This uses a fast incrementing index to assign waiting ranks and a success
    flag set by the last worker.

    Time complexity: O(1) per worker, O(N) globally.

    Optionally, passing rank will enable tracing of missing ranks on timeouts.
    `rank_tracing_decoder` lambda arg can be used to convert rank data
    into a more meaningful information at an app level (e.g. hostname).

    Note: Since the data is not removed from the store, the barrier can be used
        once per unique ``key_prefix``.
    Nz!Tracing requires rank informationr   c                      t          |           S )N)str)xs    r   <lambda>zbarrier.<locals>.<lambda>   s    s1vv r   ziTimed out waiting on barrier on rank {}, for key prefix: {} (world_size={}, missing_ranks={}, timeout={})[z, ])r	   r#   r$   r6   r?   formatjoin)
r   r   r    rF   r   rG   r.   rD   emissing_rankss
             r   r   r      s   4 |#++-P+++	uo	.	.  .J:
 
 
	JJ()))) 	 	 	| <(>-=-=!! ! !,(ddjdj &&;		- 8 8;;;+e e	 	  	  G1	 *                 s5   CA
	C

B6AB11B66CCC)r'   )r'   NNrE   )collections.abcr   
contextlibr   datetimer   typingr   r   torch_C_DistStoreErrorr6   rA   rB   r5   r=   r4   __all__floatr	   intrJ   r
   byteslistr   r?   r#   r   r   r   r   <module>r_      sU   % $ $ $ $ $ % % % % % %       % % % % % % % %  )% 	  A
@
@ #% # # # #    c  s        R  
  	
   
%[   .,, , 	,
 C5#:&, , hsm, , , ,^C S S    & !;?; ;; ; 	;
 3-; #8SE3J#78; ; 
; ; ; ; ; ;r   