
    Pib                     l   U d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZmZmZmZmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8  e6            Z9e j:        e;d<   ej<        Z=dZ>de?de#fdZ@deAfdZBdCdejC        deDdejC        fdZEdDde?deAde?fdZF e8d !          d"ee?ef         deAfd#            ZGdEd$ZH e8d%!          deeDeDf         fd&            ZId'ejJ        ddfd(ZK	 	 dFd'd)d*ee?ef         d+ejL        d,eAd-eAde%fd.ZMd/ejN        dejN        fd0ZO	 	 dGd'd)d1eAd+e
ejL                 d2eAdee?ef         f
d3ZP	 dHd'd)d4e'd1eAd+e
ejL                 dee?ef         f
d5ZQd'd)d4e'd*ee?ef         d+ejL        ddf
d6ZR	 dHd7e?d8ejJ        d9e
e	e?                  deAfd:ZSd;dd<d'e,d=e	ee?ejJ        geAf                  d-eAd>eAd?e
e!         ddfd@ZTd'ejJ        dAe!dejJ        fdBZUdS )I    N)chain)AnyCallablecastDictListOptionalTuple)nn)CPUOffloadPolicyfully_shard)distribute_tensorDTensor)DTensorSpec
TensorMeta)_init_optim_stateget_optimizer_state_dictset_model_state_dictset_optimizer_state_dictStateDictOptions)
DeviceMesh)ShardingStrategy)_IncompatibleKeys)	Optimizer)	NF4Tensorto_nf4)TransformerDecoder)MultiHeadAttention)DeepFusionModelEarlyFusionModel)get_adapter_state_dict)
get_device
get_logger)
deprecated_logFstrategyreturnc                 ,    t          t          |           S )zNHelper function to convert sharding strategy strings to ShardingStrategy enum.)getattrr   )r&   s    s/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchtune/training/_distributed.py_get_sharding_strategyr+   2   s    #X...    c                     t           j                            dd          } t           j                            dd          }t          t           j                            dd                    }t          t           j                            dd                    }t	          j                    }t          | o|o|dk    o|dk    o|          S )	a"  Check if all environment variables required to initialize torch.distributed are set
    and distributed is properly installed. This indicates a distributed run.
    https://pytorch.org/docs/stable/distributed.html#environment-variable-initialization

    Checks the following conditions:

    * torch.distributed is available
    * master port and master address environment variables are set
    * world size is >1
    * rank environment variable is set

    Returns:
        bool: True if all of the above conditions hold, False otherwise.
    MASTER_PORT MASTER_ADDR
WORLD_SIZE   RANKr   )osenvirongetintdistis_availablebool)portaddrsizerankavlbs        r*   is_distributedrA   7   s     :>>-,,D:>>-,,Drz~~lA..//Drz~~fb))**DDBB$!)B	BdCCCr,   tensorsrcc                 *   t          j                    rt          j                    rl| j        }t          j                    dk    r"|                     t          d                    } t          j        | |d           |                     |          S | S )zBroadcasts a tensor from a source to all other processes.

    Args:
        tensor (torch.Tensor): torch.Tensor to broadcast.
        src (int, optional): Source rank. Defaults to 0.

    Returns:
        torch.Tensor: Broadcasted tensor.
    ncclcudaN)rC   group)r9   r:   is_initializeddeviceget_backendtor"   	broadcast)rB   rC   rI   s      r*   _broadcast_tensorrM   N   s      t244 ''YYz&1122Fv3d3333yy   r,   device_typeoffload_ops_to_cpuc                 X    t           j        j        }d}| |v r||          }|r|  d| d}|S )a  Gets the PyTorch Distributed backend based on device type.

    Args:
        device_type (str): Device type to get backend for.
        offload_ops_to_cpu (bool, optional): Flag to check if any operations should be offloaded to CPU.
            Examples of these kinds of operations are CPU offload for FSDP and asynchronous save for distributed
            checkpointing. Defaults to False.

    Example:
        >>> get_distributed_backend("cuda")
        'nccl'
        >>> get_distributed_backend("cpu")
        'gloo'
        >>> get_distributed_backend("cuda", offload_ops_to_cpu=True)
        'cuda:nccl,cpu:gloo'

    Returns:
        str: Distributed backend for use in ``torch.distributed.init_process_group``.
    rE   :z	,cpu:gloo)r9   Backenddefault_device_backend_map)rN   rO   rS   backends       r*   get_distributed_backendrU   b   sM    ( "&!HG000,[9 6 557555Nr,   z^The functionality of `init_distributed` is covered by `torch.distributed.init_process_group`. )msgkwargsc                      t                      r5t          j                    rt          d          t          j        di |  dS dS )aK  Initialize process group required for ``torch.distributed``.

    Args:
        **kwargs (Dict[str, Any]): Additional arguments to pass to torch.distributed.init_process_group.

    Returns:
        bool: True if torch.distributed is initialized.

    Raises:
        RuntimeError: If torch.distributed is already initialized.
    z&torch.distributed already initialized.TFN )rA   r9   rH   RuntimeErrorinit_process_group)rW   s    r*   init_distributedr\      sT         	IGHHH))&)))tur,   c                     t          j                    t          j                                        rt          j                                        ndz  } t          j        |            t                              d|             dS )a  
    Sets the number of threads used by torch to utilize all physical CPU
    cores for intra-op parallelism. Currently, this function sets num_threads
    to be the number of physical CPU cores divided by the number of GPUs as we
    use one process per GPU, and this avoids CPU oversubscription. Note that this is
    currently a rough approximation, and doesn't take into account environments where
    things like CPU affinity is set.
    r2   z+Set intra op parallelism no. of threads to N)	r5   	cpu_counttorchrF   r:   device_countset_num_threadsr%   info)num_threadss    r*   set_torch_num_threadsrd      sq     ,..%*Z%<%<%>%>E
!!!AK 
+&&&IIIKIIJJJJJr,   z`get_world_size_and_rank` will move to `torchtune.utils._device` in future releases. Please use `torchtune.utils.get_world_size_and_rank` instead.c                      t          j                    rOt          j                    r<t          j                                        t          j                                        fS dS )zFunction that gets the current world size (aka total number
    of ranks) and rank number of the current process in the default process group.

    Returns:
        Tuple[int, int]: world size, rank
    )r2   r   )r9   r:   rH   r_   distributedget_world_sizeget_rankrY   r,   r*   get_world_size_and_rankri      sS      t244  //1153D3M3M3O3OOOtr,   modelc                     t          |                                 |                                           D ]\  }}|j        rt	          d| d           dS )aA  
    Utility to validate that model has no params or buffers on meta device.
    If a meta param or buffer is found, an error indicating the param name will
    be raised.

    Args:
        model (nn.Module): model to check for meta params

    Raises:
        RuntimeError: If meta params or buffers exist in model
    zUnexpected param or buffer z on meta device.N)r   named_parametersnamed_buffersis_metarZ   )rj   nps      r*   !validate_no_params_on_meta_devicerq      sr     e,,..0C0C0E0EFF R R19 	RPQPPPQQQ	RR Rr,   
FSDPModulefull_sdrI   strictcpu_offloadc                    t          d |                                 D                       }|                                 }t          rv|st|                                D ]:}|                    |          }||                             |j                  ||<   ;t          dd||          }	t          | ||	          S i }
|
                                D ]i\  }}|                    |          }|                    |j                                      |          }t          |d          rt          |j        t                    r|j        j        }|j        j        }t#          |||          }|j        }|j        dk    rt)          d|j                  d	}|                    |          }t-          t.          j        j        |                    |                                                    }t9          t/          j        ||d	
                    |         }|                    |                                          }|d|                    d	                                       |           tA          |tC          |j        |j"        tG          |                                |j        |$                                                    |j%                  }n.t          |d          s|}ntM          ||j        |j"                  }|r|'                                }tQ          j)        |          |
|<   k| *                    |
|d          S )a*  
    Converting full state dict into a sharded state dict
    and loading it into FSDP model
    Args:
        model (FSDPModule): Model to generate fully qualified names for cpu_state_dict
        full_sd (Dict[str, Any]): a full state dict to load into the model
        device (torch.device): device used to move full state dict tensors
        strict (bool): flag to check if to load the model in strict mode
        cpu_offload (bool): flag to check if offload to CPU is enabled

    Returns:
        ``NamedTuple`` with ``missing_keys`` and ``unexpected_keys`` fields:
            * **missing_keys** is a list of str containing the missing keys
            * **unexpected_keys** is a list of str containing the unexpected keys

    Raises:
        NotImplementedError: If got FSDP with more than 1D.
    c              3   j   K   | ].}t          |d           ot          |j        t                    V  /dS )_local_tensorN)hasattr
isinstancerx   r   ).0params     r*   	<genexpr>z2load_from_full_model_state_dict.<locals>.<genexpr>   sR         	''VJu7JI,V,V     r,   T)full_state_dictbroadcast_from_rank0rt   ru   )rj   model_state_dictoptionsrx   )
block_sizescaler_block_sizer2   z'only support 1D FSDP but got mesh.ndim=r   )dimN)shapedtypestride)mesh
placementstensor_meta)local_tensorspecrequires_graddevice_mesh)rt   assign)+any
parameters
state_dict(_DISTRIBUTED_STATE_DICT_API_IS_AVAILABLEkeysr7   rK   r   r   r   itemsry   rz   rx   r   r   r   r   r   ndimNotImplementedErrorr>   r   r_   rf   ProcessGroup	get_groupr?   listchunk	new_zeroscopy_r   r   r   r   r   r   r   cpur   	Parameterload_state_dict)rj   rs   rI   rt   ru   has_nf4meta_sharded_sd
param_namesharded_meta_paramr   
sharded_sdfull_tensorr   r   r   shard_mesh_dimshard_world_size
shard_rankr   sharded_paramsharded_tensors                        r*   load_from_full_model_state_dictr      s   >   %%''    G &&((O 0 LM LM!,,.. 	S 	SJ!0!4!4Z!@!@")*"5"8"89K9Q"R"RGJ" !%#	
 
 
 $'7
 
 
 	
 
'.}} ;	B ;	B#J!0!4!4Z!@!@%..);)ABBEEfMMK)?;; 5
"0)A A 5 0=H
$6$D$V!$)&7   *59q==-D	DD   "##'99^#<#< !%2DNN>4R4R $&&  U[6FANNNOO !, 5 5ejjll C Co

1o.44U;;; ")!.$/;#5#@$."4"9"9";";"4":#5#<#<#>#>% % %   #5"B" " " /?? !,!2&2&1" "
  6!/!3!3!5!5%'\.%A%AJz""$$Zt$LLLr,   r   c                    | j         }| j        }|                    |          \  }}g }|D ]}|j        ^}}||                                                                z  g|R }	t          j        |	|j        |j	                  }
t          j        |
||                                d           |                    |
           |                    |||j	                  \  }}|S )zR
    Manually gather NF4Tensor parameter since it does not support all_gather
    )rI   r   F)async_op)r   rx   fsdp_pre_all_gatherr   r   r>   r_   emptyrI   r   r9   all_gather_into_tensorappendfsdp_post_all_gather)r   r   
nf4_tensorquant_paramsmetadatafull_quant_paramsquant_paramd0dnr   full_quant_param
full_param_s                r*   _gather_nf4_tensorr   >  s    $D,J';;DAAL(# 	3 	3#Rdnn&&++---333 ;+,K4E
 
 
 	#k4>>+;+;e	
 	
 	
 	
 	  !12222338Z%5 MJ r,   is_rank_zeroadapter_weights_onlyc                 ,   i }|                                  }|                                D ]\  }}|j        r|                    |          }t	          |d          r>t          |j        t                    rt          |          }n|	                                }t          |t                    r|                    |j
                  }|r|                                ||<   t          j                                         |rt          |d          }|S )aJ  
    Converting sharded state dict into a full state dict on CPU
    Returning non-empty result only on rank0 to avoid peaking CPU memory
    Currenltly we can used distributed state dict API to process model without NF4Tensor. Otherwise, we need to
    manually gather any NF4 tensors until all-gather is supported in the NF4Tensor subclass
    TODO: add support for NF4Tensor at distributed state dict API

    Args:
        model (FSDPModule): Model to generate fully qualified names for cpu_state_dict
        is_rank_zero (bool): flag to check if the process is on rank 0
        device (Optional[torch.device]): device to use for sharded tensors. Default: None
        adapter_weights_only (bool): flag to check if only trainable parameters should be returned. Default: False

    Returns:
        Dict[str, Any]: State dict on CPU
    rx   N)rI   )r   r   is_cpurK   ry   rz   rx   r   r   r   r   r   r_   rf   barrierr!   )rj   r   rI   r   cpu_state_dictr   r   r|   s           r*   gather_cpu_state_dictr   V  s   . N!!##J'--// $ $
E< 	%HHV$$E5/** 	,%-y99 ,*511 ))++eY'' 	*HHU[))E 	5).N:&!!#### M/tLLLr,   optc                 V    t          ddd          }t          | ||          }|r|S i S )z
    Converting optimizer state from sharded to full
    For example, "exp_avg" in AdamW is `DTensor`,
    "exp_avg.full_tensor()" converts it to plain tensor on rank 0
    Returning non-empty cpu state dict on rank 0
    Tr~   r   ru   )rj   
optimizersr   )r   r   )rj   r   r   rI   r   r~   s         r*   get_full_optimizer_state_dictr     sQ     4T  G /W  O  	r,   c                     t           r;t          dd|t          j        d          u           }t	          | |||           d	S d}t          |           |                                d         }|                                d         }|d         }|d         }	t          ||          D ]\  }
}|                                D ]\  }}||k    r||
|<   t          |
|         ||                   D ]u\  }}||vr
||         }|	|         }|                                D ]F\  }}||         }t          |t                    rt          ||j        |j                  ||<   A|||<   Gv|                    ||d           d	S )
za
    Converting full optimizer state to sharded state dict
    and loading it into optimizer
    Tr   r   )rj   r   optim_state_dictr   paramsparam_groupsstate)r   r   N)r   r   r_   rI   r   r   r   zipr   rz   r   r   r   r   r   )rj   r   rs   rI   r   PARAMSr   r   full_param_groups
full_stateparam_groupfull_param_groupkeyvaluepidfull_pidparam_statefull_param_stateattrr   r   s                        r*   #load_from_full_optimizer_state_dictr     s    0 -
" !%%,u"5"55
 
 

 	!C'7	
 	
 	
 	
 	
 	
 #~~''7  )#N3W%
-0?P-Q-Q 	8 	8)K).4466 ) )
U&==#(C  !$[%8:J6:R!S!S 8 8Xe###Cj#-h#7 )9)?)?)A)A 8 8%D+%0%6N!.':: 	8,='*6*5- -D)) -8D))88" 	 , 	
 	
 	
 	
 	
r,   namemodulenames_to_matchc                     |r| |v rdS |                      d          }t          |          dk    r,|d         dk    ot                              |d                   S dS )a  
    Returs True for layers named {}.layers.i or layers that exactly match names_to_match, otherwise,
    returns False. This is a helper function for sharding a model with FSDP.
    In :func:`~torchtune.training.shard_model`, we iterate over the model's named modules
    and apply fully_shard using this condition.

    As part of our sharding strategy, we want each layer to be sharded separately, as this is
    generally efficient. We may also want to shard certain modules that are not layers, such as
    the embedding module.

    #TODO: a more robust way would be to shard on the module type, not the name.

    Args:
        name (str): Name of the module.
        module (nn.Module): Module to be sharded.
        names_to_match (Optional[List[str]]): List of names to match, if any.
        *args: Variable length argument list to be passed to the Embedding module.
        **kwargs: Arbitrary keyword arguments to be passed to the Embedding module.

    Returns:
        bool: True if the module name matches the condition, False otherwise.

    Examples:
        >>> names_to_match = ["embedding"]
        >>> layer_names = ["layers.0", "decoder.layers.1", "encoder.layers.2.attention",
            "my_wrapper.layer.1.something", "embedding"]
        >>> matches = []
        >>> for name in layer_names:
        >>>     if shard_condition_is_layer_or_match(name, None): matches.append(name)
        >>> print(matches)
        >>> ["layers.0", "decoder.layers.1", "embedding"]
    T.   layersr4   F)splitlenstrisdigit)r   r   r   argsrW   	name_lists         r*   get_shard_conditionsr     se    N  $.00t

3I
9~~}(GS[[2-G-GG5r,   T)reshard_after_forwarddp_meshshard_conditionsr   r   c                F   ||d}|rt                      |d<   d}t          t          |                                                     D ]3\  t	          fd|D                       rt          fi | |dz  }4|dk    rt          d          t          | fi | dS )a{  
    Utility to shard a model with FSDP using the PyTorch Distributed fully_shard API.

    This method will over the model's named modules from the bottom-up and apply shard modules
    based on whether they meet any of the criteria from shard_conditions.

    Args:
        model (TransformerDecoder): Model to shard with FSDP.
        shard_conditions (List[Callable[[str, nn.Module], bool]]): A list of functions to determine
            which modules to shard with FSDP. Each function should take module name (relative to root)
            and the module itself, returning True if FSDP should shard the module and False otherwise.
            If any of shard_conditions return True for a given module, it will be sharded by FSDP.
        cpu_offload (bool): If set to True, FSDP will offload parameters, gradients, and optimizer
            states to CPU.
        reshard_after_forward (bool): Whether to reshard parameters and buffers after
            the forward pass. Setting this to True corresponds to the FULL_SHARD sharding strategy
            from FSDP1, while setting it to False corresponds to the SHARD_GRAD_OP sharding strategy.
        dp_mesh (Optional[DeviceMesh]): Device mesh to use for FSDP sharding under mutliple parallelism.
            Default to None.

    Raises:
        ValueError: If no layer modules were sharded, indicating that no shard_condition was triggered.
    )r   r   offload_policyr   c                 (    g | ]} |          S rY   rY   )r{   shard_conditionmro   s     r*   
<listcomp>zshard_model.<locals>.<listcomp>.  s%    NNN/1%%NNNr,   r2   zXNo layer modules were sharded. Please check if shard conditions are working as expected.N)r   reversedr   named_modulesr   r   
ValueError)	rj   r   ru   r   r   fsdp_kwargsnum_layers_shardedr   ro   s	          @@r*   shard_modelr     s    > -B7SSK ;(8(:(:$% e11334455 $ $1NNNNN=MNNNOO 	$))[)))!#Qf
 
 	

 %%%%%%%r,   tp_meshc                 P   t          | t          t          f          }|r| j        n| }|                                }t          |                                          D ]}t          |t                    r|j        |z  dk    rt          d|j         d| d          |j
        |z  dk    rt          d|j
         d| d          |j        |z  dk    rt          d|j         d| d          |j        |z  |_        |j
        |z  |_
        |j        |z  |_        |r|| _        | S )aS  
    Utility to scale MultiHeadAttention parameters(num_heads, num_kv_heads, embed_dim) across
    tensor parallel devices. Each device will handle a portion of the attention computations.

    Args:
        model (nn.Module): Model whose attention parameters will be scaled by TP size.
        tp_mesh (DeviceMesh): Tensor parallel device mesh.

    Returns:
        nn.Module: The model with scaled MultiHeadAttention parameters.

    Raises:
        ValueError: If attention heads, kv heads, or embed dimension is not divisible by TP size.

    Examples:
        >>> from torchtune.modules import TransformerDecoder
        >>> from torch.distributed.device_mesh import DeviceMesh
        >>> model = TransformerDecoder(
                num_heads=32,
                num_kv_heads=32,
                embed_dim=4096,
            )
        >>> tp_mesh = DeviceMesh("cuda", torch.arange(2))  # 2 GPUs
        >>> model = prepare_mha_for_tp(model, tp_mesh)
        >>> # Now each GPU has:
        >>> # num_heads = 16 (32/2)
        >>> # num_kv_heads = 16 (32/2)
        >>> # embed_dim = 2048 (4096/2)
    r   zNumber of attention heads (z-) must be divisible by tensor parallel size (z).zNumber of KV heads (zEmbedding dimension ()rz   r   r    decoderr>   r   modulesr   	num_headsr   num_kv_heads	embed_dim)rj   r   is_fusion_modelr   tp_sizer   s         r*   prepare_mha_for_tpr  ;  s   D !:J(KLLO.9emmEGllnnG'//##$$ 1 1a+,, 	1{W$)) 9!+ 9 9-49 9 9   ~'1,, 91> 9 9-49 9 9   {W$)) 9AK 9 9-49 9 9   +0AK^w6AN+0AK  Lr,   )r   )F)r'   N)FF)NF)N)Vloggingr5   	itertoolsr   typingr   r   r   r   r   r	   r
   r_   torch.distributedrf   r9   r   "torch.distributed._composable.fsdpr   r   torch.distributed._tensorr   r   )torch.distributed._tensor.placement_typesr   r   'torch.distributed.checkpoint.state_dictr   r   r   r   r   torch.distributed.device_meshr   torch.distributed.fsdpr   torch.nn.modules.moduler   torch.optimr   torchao.dtypes.nf4tensorr   r   torchtune.modulesr   torchtune.modules.attentionr   torchtune.modules.model_fusionr   r    torchtune.modules.peftr!   torchtune.utilsr"   r#   torchtune.utils._loggingr$   r%   Logger__annotations____version__torch_versionr   r   r+   r;   rA   Tensorr8   rM   rU   r\   rd   ri   Modulerq   rI   r   r   r   r   r   r   r   r   r  rY   r,   r*   <module>r     sI    				       C C C C C C C C C C C C C C C C C C                    L L L L L L L L @ @ @ @ @ @ @ @ M M M M M M M M              5 4 4 4 4 4 3 3 3 3 3 3 5 5 5 5 5 5 ! ! ! ! ! ! 6 6 6 6 6 6 6 6 0 0 0 0 0 0 : : : : : : L L L L L L L L 9 9 9 9 9 9 2 2 2 2 2 2 2 2 / / / / / /!z||gn # # # !
 ,1 (/S /-= / / / /
D D D D D. el  U\    (  $ SV    : h  tCH~ $    *K K K K  	D  
sCx 
 
 
	 
RRY R4 R R R R* rM rMrM#s(^rM LrM 	rM
 rM rM rM rM rMjbl r|    6 &*!&	+ +++ U\"+ 	+
 
#s(^+ + + +d &*	 	  U\"	
 
#s(^   07
7
	7
 #s(^7
 L	7

 
7
 7
 7
 7
z +/. .
.I. T#Y'. 
. . . .l #'$(1& 1& 1&1&8S")$4d$:;<1& 	1&
  1& j!1& 
1& 1& 1& 1&h=9== Y= = = = = =r,   