
    `i#                        U d Z ddlZddlZddlmZ ddlmZmZ ddlm	Z	 ddl
mZmZ ddlmZ ddlmc mc mc mZ  ej                    Zi Zeeef         ed<   ded	efd
Ze G d d                      Zded	ed         fdZ G d dej                  Z e G d de                       Z!e G d de                       Z"dS )a/  
Barrier implementations for synchronizing distributed checkpoint operations.

This module provides abstract and concrete barrier implementations that ensure
all ranks in a distributed training environment complete their checkpoint operations
before proceeding, which is essential for data consistency.
    N)Counter)	dataclassfield)	timedelta)AnyOptionalBARRIER_REGISTRYbarrier_classreturnc                 D    t          | d          r| t          | j        <   | S )z0Register a barrier class in the global registry.barrier_type)hasattrr	   r   )r
   s    /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torch/distributed/checkpoint/_experimental/barriers.pyregister_barrierr      s)    }n-- E7D34    c                   `    e Zd ZU dZdZee         ed<    ee	          Z
e	eef         ed<   dS )BarrierConfiga&  
    Configuration for barrier construction.

    This class provides a flexible way to configure different barrier implementations
    with their specific constructor arguments. The barrier type will be looked up
    from a registry and instantiated with rank_info and barrier_args.

    Attributes:
        barrier_type: A string identifying the barrier type (e.g., "tcp_store").
                     If None, no barrier will be used.
        barrier_args: Dictionary of arguments to pass to the barrier constructor.
                     rank_info will be automatically injected as the first argument.

    Examples:
        # No barrier
        BarrierConfig()

        # TCPStore barrier
        BarrierConfig(
            barrier_type="tcp_store",
            barrier_args={
                'timeout_barrier_init_secs': 30,
                'barrier_prefix_list': ['checkpoint'],
                'use_checkpoint_barrier_tcpstore_libuv': False,
                'tcpstore_port': 12345,
                'master_address': 'localhost'
            }
        )
    Nr   )default_factorybarrier_args)__name__
__module____qualname____doc__r   r   str__annotations__r   dictr   r    r   r   r   r   "   sW          < #'L(3-&&&#(5#>#>#>L$sCx.>>>>>r   r   barrier_configBarrierc           	          | j         dS | j         t          vr>t          d| j          dt          t                                                               t          | j                  } |di | j        S )a&  
    Create a barrier instance from BarrierConfig.

    Args:
        barrier_config: Configuration for barrier construction.

    Returns:
        Barrier instance or None if no barrier type is configured.

    Raises:
        ValueError: If the barrier_type is not found in the registry.
    NzUnknown barrier type: z. Available types: r   )r   r	   
ValueErrorlistkeysr   )r   r
   s     r   create_barrier_from_configr$   F   s     "*t"*:::@^%@ @ @ $%5%:%:%<%< = =@ @
 
 	

 %^%@AM=77>6777r   c                   j    e Zd ZdZej        deeef         fd            Z	ej        dd            Z
dS )r   a(  
    Abstract base class for synchronization barriers.

    A barrier ensures that all ranks in a distributed environment reach a certain
    point in execution before any rank proceeds further, which is essential for
    coordinating operations like checkpointing across multiple processes.
    kwargsc                     dS )a+  
        Initialize a barrier.

        Args:
            **kwargs: Keyword arguments for specific barrier implementations.
                     Common arguments may include rank information, barrier prefixes,
                     timeout settings, and other barrier-specific configuration.
        Nr   )selfr&   s     r   __init__zBarrier.__init__k         r   r   Nc                     dS )z
        Execute a synchronization barrier.

        This method uses the barrier_prefix provided during initialization to
        coordinate synchronization across processes.
        Nr   r(   s    r   execute_barrierzBarrier.execute_barrierw   r*   r   r   N)r   r   r   r   abcabstractmethodr   r   r   r)   r-   r   r   r   r   r   b   su          	c3h     	     r   c                   *    e Zd ZdZdZ	 	 ddZddZdS )DistBarriera2  
    A barrier implementation using PyTorch's distributed barrier for synchronization.

    This barrier uses the built-in torch.distributed.barrier() function to coordinate
    synchronization across multiple processes. It's simpler than TCPStoreBarrier but
    requires an initialized process group.
    dist_barrierr   Nc                 @    t          j                    s
J d            dS )a/  
        Initialize a DistBarrier.

        This barrier requires an initialized PyTorch distributed process group.
        No additional arguments are needed as it uses the current process group.

        Raises:
            AssertionError: If the distributed process group is not initialized.
        z2DistBarrier requires an initialized process group.N)distis_initializedr,   s    r   r)   zDistBarrier.__init__   s4     "$$ 	
 	
@	
 	
$ 	
 	
r   c                 ,    t          j                     dS )zd
        Execute a synchronization barrier using the prefix provided during initialization.
        N)r5   barrierr,   s    r   r-   zDistBarrier.execute_barrier   s     	r   r.   )r   r   r   r   r   r)   r-   r   r   r   r2   r2      sU          "L
	
 
 
 
      r   r2   c                   F    e Zd ZdZdZdedededededed	ed
efdZddZ	dS )TCPStoreBarrieraS  
    A barrier implementation using PyTorch's TCPStore for synchronization.

    This barrier uses a TCP-based distributed key-value store to coordinate
    synchronization across multiple processes. It uses a single TCP store
    for all barrier operations, with different prefixes to distinguish between
    different barrier types.
    	tcp_storeglobal_rankglobal_world_sizebarrier_prefixtimeout_barrier_init_secs%use_checkpoint_barrier_tcpstore_libuvtcpstore_portmaster_addresstimeout_secsc	                 6   t                               d||||||||	  	         t                      | _        || _        || _        || _        || _        t          j	        |t          |          | j        t          |          | j        dk              | _        dS )a  
        Initialize a TCPStoreBarrier.

        Args:
            global_rank: The rank of the current process in the distributed environment.
            global_world_size: The total number of processes in the distributed environment.
            barrier_prefix: A string prefix to identify this specific barrier.
            timeout_barrier_init_secs: Timeout in seconds for initializing the TCPStore.
            use_checkpoint_barrier_tcpstore_libuv: Whether to use libuv for the TCPStore.
            tcpstore_port: Port number for the TCPStore.
            master_address: Address of the master node for the TCPStore.
            timeout_secs: Maximum time in seconds to wait for all ranks to reach the barrier.
        zInitializing TCPStore master_address=%s tcpstore_port=%s rank=%s world_size=%s barrier_prefix=%s timeout_barrier_init_secs=%s use_checkpoint_barrier_tcpstore_libuv=%s timeout_secs=%s)secondsr   )
world_sizetimeout	is_masterN)loggerinfor   _tcp_store_barrier_seq_barrier_prefix_global_rank_global_world_size_timeout_secsr5   TCPStoreintr   
_tcp_store)	r(   r<   r=   r>   r?   r@   rA   rB   rC   s	            r   r)   zTCPStoreBarrier.__init__   s    0 	G %1	
 	
 	
 07yy#- ("3) -.&?@@@(A-
 
 
r   r   Nc           	         | j         }t                              d|| j                   dt          dt
          fd}| j                             || j                  t          | j	        |                              t          j        | j        | j        |t          | j	        |                   z              | j	        |xx         dz  cc<   dS )a;  
        Execute a synchronization barrier using the prefix provided during initialization.

        The implementation uses a sequence number that is incremented every time
        a barrier is reached. The sequence number is per barrier prefix to allow
        different barriers to operate concurrently.
        z3Executing barrier barrier_prefix=%s timeout_secs=%srankr   c                     d|  S )NrT   r   )rT   s    r   	_rank_keyz2TCPStoreBarrier.execute_barrier.<locals>._rank_key   s     $== r   )storerF   
key_prefix   N)rL   rI   rJ   rO   rQ   r   rR   setrM   rK   
store_utilr8   rN   )r(   r>   rV   s      r   r-   zTCPStoreBarrier.execute_barrier   s     -A	
 	
 	
	!C 	!C 	! 	! 	! 	! 	Id'((+N;<<	
 	
 	
 	/.T%@%P!Q!QQ		
 	
 	
 	
 	#N333q833333r   r.   )
r   r   r   r   r   rQ   r   boolr)   r-   r   r   r   r:   r:      s          L6
6
 6
 	6

 $'6
 046
 6
 6
 6
 6
 6
 6
p!9 !9 !9 !9 !9 !9r   r:   )#r   r/   loggingcollectionsr   dataclassesr   r   datetimer   typingr   r   torch.distributeddistributedr5   %torch.distributed.elastic.utils.storeelasticutilsrW   r[   	getLoggerrI   r	   r   r   typer   r   r   r$   ABCr   r2   r:   r   r   r   <module>rj      s     


        ( ( ( ( ( ( ( (                                   : : : : : : : : : : : : : : : 
			 %' $sDy/ & & &D T      ?  ?  ?  ?  ?  ?  ?  ?F8!8i8 8 8 88    cg   > ! ! ! ! !' ! ! !H e9 e9 e9 e9 e9g e9 e9 e9 e9 e9r   