
    Pi3                         d dl Z d dlmZ d dlmZmZmZmZ d dlZd dl	m
Z
 d dlmZmZmZ d dlmZmZmZ d dlmZ d dlmZ  ej        d	          Ze G d
 d                      Z G d d          ZdS )    N)	dataclass)AnyDictOptionalUnion)
DictConfig)_init_optim_stateset_model_state_dictset_state_dict)configtrainingutils)DistributedCheckpointer)OptimizerInBackwardWrapperDEBUGc                       e Zd ZU dZeed<   eed<   eed<   eed<   dZeee	e
f                  ed<   dee	ef         fd	ZdS )
TrainingProgressz-
    This is training progress metadata.
    seed
epochs_runtotal_epochsmax_steps_per_epochNdataloader_state_dictreturnc           
          t           j        | j        t           j        | j        t           j        | j        t           j        | j        t           j	        | j
        iS )N)r   SEED_KEYr   
EPOCHS_KEYr   TOTAL_EPOCHS_KEYr   MAX_STEPS_KEYr   DATALOADER_KEYr   selfs    /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchtune/training/checkpointing/_checkpoint_client.py
state_dictzTrainingProgress.state_dict&   sA    ty%t'8"D$<#T%?
 	
    )__name__
__module____qualname____doc__int__annotations__r   r   r   strr   objectr#    r$   r"   r   r      s           IIIOOO6:8DcN3:::
Df- 
 
 
 
 
 
r$   r   c            
          e Zd ZdZdeddfdZd Zd Zdej	        j
        d	eej        j        ef         d
ededdf
dZdej	        j
        d	eej        j        ef         d
ededdf
dZdej	        j
        d	eej        j        ef         d
ededdf
dZdeeef         fdZdej	        j
        d	eej        j        ef         deeef         fdZdS )CheckpointClientaK  
    Stateful checkpointing client for TorchTune recipes. This class is responsible for
    saving and loading checkpoints using the user configured checkpointers or distributed
    checkpointer if asynchronous checkpointing is enabled.

    Args:
        cfg (DictConfig): Configuration object used to instantiate the recipe.
    cfgr   Nc                    || _         d | _        d | _        | j                             dd          | _        | j                             dd          | _        | j                             dd          | _        t          j        | j         j	                  | _
        t          j                    \  }| _        | j        dk    | _        d S )Nresume_from_checkpointFenable_async_checkpointingoptimizer_in_bwddevicer   )_cfg_checkpointer_dcp_checkpointerget_resume_from_checkpoint_enable_async_checkpointing_optimizer_in_bwdr   
get_devicer6   _deviceget_world_size_and_rank_rank_is_rank_zero)r!   r0   _s      r"   __init__zCheckpointClient.__init__:   s     	 " "&'+y}}5Mu'U'U$+/9==(%,
 ,
( "&/A5!I!I'ty/?@@@5774:!Z1_r$   c                     | j         s5| j        rdn| j        }t          j        | j        j        |          | _         | j         S )zF
        Builds and returns the user configured Checkpointer.
        F)should_load_recipe_state)r8   r<   r;   r   instantiater7   checkpointer)r!   rF   s     r"   _get_checkpointerz"CheckpointClient._get_checkpointerP   s]     ! 		 321 %
 "(!3	&)A" " "D !!r$   c                     | j         s4|                                 }t          |j        |j                  | _         | j         S )z
        Builds and returns the DistributedCheckpointer.
        DistributedCheckpointer is used for asynchronous checkpointing, if enabled.
        Uses the user configured checkpointer directory and outout directories.
        )checkpoint_dir
output_dir)r9   rI   r   _checkpoint_dir_output_dir)r!   rH   s     r"   _get_dcp_checkpointerz&CheckpointClient._get_dcp_checkpointer`   sN     % 	1133L%<+;'3& & &D"
 %%r$   model	optimizertraining_progressepochc                    | j         r-t                              d           t          j                    }i }|                    |                                           |                                |t          j        <   |                                |t          j	        <   | 
                                }|                    ||d           | j         r5t                              dt          j                    |z
  dd           dS dS )a0  
        Checkpoint the training state asynchronously as a distributed checkpoint. Saving
        asnchronously unblocks the training sooner to continue for the next epoch.
        The constructed checkpoint state dict contains the following information:
        - Model weights with key training.MODEL_KEY
        - Relevant recipe state, including optimizer, if training is not complete

        To correctly resume training from a distributed checkpoint, user needs to have both
        resume_from_checkpoint and enable_async_checkpointing flags set to True in the config.
        User does not need to provide any paths to checkpoint or recipe files. Latest intermediate
        and valid checkpoint will be loaded from the output directory and training progress will be
        restored automatically.
        z?Saving checkpoint asynchronously. Retrieving full state dict...T)rS   
save_asyncz$Saving asynchronous checkpoint took .2f secsN)rB   loginfotimeperf_counterupdater#   r   	MODEL_KEYOPT_KEYrO   save_checkpoint)r!   rP   rQ   rR   rS   cp_start	ckpt_dict	dcp_savers           r"   _save_checkpoint_asyncz'CheckpointClient._save_checkpoint_asyncp   s   *  	+HHVWWW(**H 	*5577888(-(8(8(:(:	($%&/&:&:&<&<	("#..00	!! 	" 	
 	
 	
  	HH`t7H7J7JX7U````    	 	r$   c                 `    dz   j         k                                      }t          |t                     }i  j        r-t
                              d           t          j                    i i |r\t          j
        | j         j                   j        r3t
                              dt          j                    z
  dd           n|                                r j        r-t
                              d           t          j                    }|ro j        s#t          j        || j         j                  nY|j                                        D ]*\  }}	t          j        ||	 j         j                  |<   +n|                                 j        r3t
                              dt          j                    |z
  dd           nd	 fd
}
|r1 j        r
 |
             t"          j                                         d	S  |
             d	S )a  
        Checkpoint the training state synchronously.
        The constructed checkpoint state dict contains the following information:
        - Model weights with key training.MODEL_KEY
        - Relevant recipe state, including optimizer, if training is not complete

        To correctly resume training from this checkpoint, user needs to have both
        resume_from_checkpoint flag set to True and recipe file paths set in the config.
           zOSaving checkpoint. This may take some time. Retrieving full model state dict...r5   z#Getting full model state dict took rV   rW   zGetting optimizer state dict...z"Getting optimizer state dict took Nc                                           t          j        i           rH                      t          j        i                                                                                                                                            j        r5t          	                    dt          j                    z
  dd           d S d S )N)rS   intermediate_checkpointzSaving checkpoint took rV   rW   )r\   r   r]   r^   r#   rI   r_   rB   rX   rY   rZ   r[   )checkpoint_dictr`   rS   rg   model_state_dictoptim_state_dictr!   rR   s   r"   _save_checkpoint_helperzGCheckpointClient._save_checkpoint_sync.<locals>._save_checkpoint_helper   s    ""H$68H#IJJJ ' G&&(8:J'KLLL&&'8'C'C'E'EFFF""$$44(? 5    ! Wd.?.A.AH.LWWWW     r$   )r   rI   
isinstancer   rB   rX   rY   rZ   r[   r   gather_cpu_state_dictr?   r#   r=   get_full_optimizer_state_dict	optim_mapitemstorchdistributedbarrier)r!   rP   rQ   rR   rS   rH   no_distoptim_startparamoptrk   rh   r`   rg   ri   rj   s   `  ``      @@@@@r"   _save_checkpoint_syncz&CheckpointClient._save_checkpoint_sync   s   " #(!).?.L"L--// /FGGG  	+HHa   (**H 	2  (="|      ! c$:K:M:MPX:Xcccc    %//11" 	$! 2:;;;"/11 :- '/'M!*#|	( ( ($$ '0&9&?&?&A&A  
s %B!3(:4<   )!  $-#7#7#9#9 ! e9J9L9L{9Zeeee    $	 	 	 	 	 	 	 	 	 	 	 	*  	&! *'')))%%'''''##%%%%%r$   c                     |dz   |j         k     }|r!| j        r|                     ||||           dS |                     ||||           dS )a  
        Checkpoint the training state.
        The constructed checkpoint state dict contains the following information:
        - Model weights with key training.MODEL_KEY
        - Relevant recipe state, including optimizer state, if training is not complete

        If asynchronous checkpointing is enabled, the checkpoint will be saved asynchronously
        as a distributed checkpoint.
        Otherwise, the checkpoint will be saved synchronously with the
        checkpointer user has configured.
        re   N)r   r<   rc   rx   )r!   rP   rQ   rR   rS   rg   s         r"   r_   z CheckpointClient.save_checkpoint  sm    $ #(!).?.L"L" 	St'G 	S''y:KUSSSSS&&ui9JERRRRRr$   c                 N    |                                                                  S )zp
        This method is used to load the base model from the checkpoint
        configured by the user.
        )rI   load_checkpointr    s    r"   load_base_checkpointz%CheckpointClient.load_base_checkpoint!  s"    
 %%''77999r$   c                    | j         rt          j                    }| j        st	          |           i }|                                }|                                }|                    t          j        |t          j	        |t          j
        dt          j        dt          j        dt          j        di           |                                                     |          }| j        sdt          j	        |v r4t!          |||t          j                 |t          j	                            nvt#          ||t          j                            nTt#          ||t          j                            t          j	        |v r%|                    |t          j	                            | j         r3t&                              dt          j                    |z
  dd           |S )z
        This method is used to resume training from a distributed checkpoint state.
        Due to being distributed, this method is called on every rank.
        r   )ri   rj   )rP   ri   z1DistributedCheckpointer loaded the checkpoint in rV   z	 seconds.)rB   rZ   r[   r=   r	   r#   r\   r   r]   r^   r   r   r   r   rO   r{   r   r
   load_state_dictrX   rY   )r!   rP   rQ   dcp_load_startrh   ri   rj   s          r"   load_distributed_checkpointz,CheckpointClient.load_distributed_checkpoint(  s     	1!.00N% 	)i((( *, ++--$//11"$4 "2!1#Q)1&		
 		
 		
 4466FFWW % 	M?22%4X5G%H%4X5E%F	     %%4X5G%H    
 !!01C!D   
 ?22))/(:J*KLLL 	HHwDDUDWDWZhDhwwww   r$   )r%   r&   r'   r(   r   rD   rI   rO   rq   nnModuler   optim	Optimizerr   r   r)   rc   rx   r_   r   r+   r   r|   r   r-   r$   r"   r/   r/   0   s        -- 
- - - -," " " & & & *x* .0JJK* ,	*
 * 
* * * *Xj&xj& .0JJKj& ,	j&
 j& 
j& j& j& j&XSxS .0JJKS ,	S
 S 
S S S S2:d38n : : : :=x= .0JJK= 
c3h	= = = = = =r$   r/   )rZ   dataclassesr   typingr   r   r   r   rq   	omegaconfr   'torch.distributed.checkpoint.state_dictr	   r
   r   	torchtuner   r   r   .torchtune.training.checkpointing._checkpointerr   torchtune.training.memoryr   
get_loggerrX   r   r/   r-   r$   r"   <module>r      sY    ! ! ! ! ! ! - - - - - - - - - - - -                      
 . - - - - - - - - - R R R R R R @ @ @ @ @ @ew 
 
 
 
 
 
 
 
*u u u u u u u u u ur$   