
    `iF                        d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z% d dl&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0 ddl1m2Z2m3Z3m4Z4 g dZ5 G d de          Z6 ede7          	 	 	 	 d2dede-deej8                 de9d e:d!ee%         d"e"fd#            Z; e d$%          e2dddddd$d&ded'ee<ej=        df         dee-         d!ee%         deej8                 d e:d(e:d"e"fd)                        Z>e G d* d+                      Z? e d$%          dddde6j@        ddd$d,ded'ee<ej=        df         dee-         d!ee%         deej8                 d-e6d.ee'         d e:d(e:d"eee?f         fd/            ZA e d$%          ded"efd0            ZB	 	 	 	 	 d3dede-deej8                 de9d e:d!ee%         d(e:d"e"fd1ZCdS )4    N)Future)	dataclass)Enum)castOptionalUnion)
deprecated)STATE_DICT_TYPE)_AsyncCheckpointExecutor)$_ProcessBasedAsyncCheckpointExecutor)#_ThreadBasedAsyncCheckpointExecutor)_storage_setup)DefaultSavePlanner)_dcp_method_logger)Metadata)SavePlanSavePlanner)AsyncStagerDefaultStagerStagingOptions)Stateful)StorageWriterWriteResult)_get_default_group   )_api_bc_check_DistWrapper_profile)save_state_dictsave
async_saveAsyncCheckpointerTypeAsyncSaveResponsec                       e Zd ZdZdZdZdS )r"   z!Enum for async checkpointer type.threadprocessN)__name__
__module____qualname____doc__THREADPROCESS     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torch/distributed/checkpoint/state_dict_saver.pyr"   r"   2   s        ++FGGGr.   r"   za`save_state_dict` is deprecated and will be removed in future versions.Please use `save` instead.)categoryF
state_dictstorage_writerprocess_groupcoordinator_rankno_distplannerreturnc           	          |                                  t                      5  t          | |||||          cddd           S # 1 swxY w Y   dS )z3This method is deprecated. Please switch to 'save'.N)resetr   _save_state_dict)r1   r2   r3   r4   r5   r6   s         r/   r   r   9   s      
 
 

 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   AAATlog_exceptionscheckpoint_idr2   r6   r3   r5   use_collectivesr>   r?   c          	         t           j                            d           |p't          j                     pt          j                     }|rt          j        d           t                      5  t          t          t          ||d                    }t          t          |           |||||          cddd           S # 1 swxY w Y   dS )a;  
    Save a distributed model in SPMD style.

    This function is different from ``torch.save()`` as it handles
    ``ShardedTensor`` , and ``DTensor`` by having each rank only save their local shards.

    For each ``Stateful`` object (having both a ``state_dict`` and a ``load_state_dict``),
    save will call ``state_dict`` before serialization.

    .. warning::
        There is no guarantees of Backwards Compatibility across PyTorch versions
        for saved state_dicts.

    .. warning::
        If using the `process_group` argument, make sure that only its ranks
        call `save_state_dict` and that all data in state_dict belong to it.

    .. note::
        When saving checkpoint for FSDP's `ShardingStrategy.HYBRID_SHARD`, only one of
        the shard_group should be calling `save_state_dict` and the corresponding process
        group needs to be passed in.

    .. note::
        If no process group is available, this function assumes the intention is to save the
         state_dict in the local process.

    .. note:
        Rank 0 is assumed to be the coordinator rank.


    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform writes. If this is not
            specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        no_dist (bool):
            If ``True``, this function will assume the intent is to load
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives (bool): If ``False``, this function will assume the intent is to save
            a checkpoint without using cross-rank synchronization.
            (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Metadata: Metadata object for the saved checkpoint.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> torch.distributed.checkpoint.save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )

    .. note::
        save_state_dict uses collectives to coordinate writes across ranks.
        For NCCL-based process groups, internal tensor representations of
        objects must be moved to the GPU device before communication takes place.
        In this case, the device used is given by ``torch.cuda.current_device()``
        and it is the user's responsibility to ensure that this is set so that
        each rank has an individual GPU, via ``torch.cuda.set_device()``.
    z!torch.distributed.checkpoint.savezptorch.distributed is disabled, unavailable or uninitialized, assuming the intent is to save in a single process.Freader)r1   r2   r3   r5   r6   r?   N)torch_C_log_api_usage_oncedistis_availableis_initializedwarningswarnr   r   r   r   r:   _stateful_to_state_dict)r1   r>   r2   r6   r3   r5   r?   s          r/   r    r    U   s$   ~ 
H  !DEEEQd/111Q4;N;P;P7PG 
~	
 	
 	
 
 
 
>.-PUVVV
 
  .z::)'+
 
 

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
s   .ACC	Cc                   @    e Zd ZU dZed         ed<   ed         ed<   dS )r#   a!  This class contains futures for staging and upload completion.
    It is returned by async_save().
    staging_completion is a future that indicates when local copy
    of state_dict is complete.
    upload_completion is a future that indicates when a checkpoint
    completed saving.
    Nstaging_completionupload_completion)r'   r(   r)   r*   r   __annotations__r-   r.   r/   r#   r#      s>           t$$$d|#####r.   r#   )r>   r2   r6   r3   async_checkpointer_typeasync_stagerr5   r?   rP   rQ   c          	          t           j                            d           t          j                    rHt          j                    r5|pt                      }	t          j        d          |	j        v s
J d            9|t          |t                    r|nt          t          dddd                    t          t          t          ||d                    }t!                      t#          d          d	t$          t&          t(                   t(          f         f fd
            }
 |
            }|t*          j        k    rt/                      nt1                      }|                    |||||||          }t          |t&                    r|}t'                      }|fdt&          t(                   dt&          d         fd}|                                s|                    |           n|                    d           t;          ||          S t#          d          fd            } |             |S )a   Asynchronous version of ``save``. This code first de-stages the state_dict on to the
    staging storage (defaults to CPU memory), and then calls the `save` in a separate thread.

    .. warning::
        This feature is experimental and subject to change.
        MUST CALL CLOSE AFTER LAST CHECKPOINT IS SAVED

    Args:
        state_dict (Dict[str, Any]): The state_dict to save.
        checkpoint_id (Union[str, os.PathLike, None]):
            The ID of this checkpoint instance. The meaning of the checkpoint_id
            depends on the storage. It can be a path to a folder or to a file.
            It can also be a key if the storage is a key-value store.
            (Default: ``None``)
        storage_writer (Optional[StorageWriter]):
            Instance of StorageWriter used to perform 'stage' and  'save'. If
            this is not specified, DCP will automatically infer the writer based on the
            checkpoint_id. If checkpoint_id is also None, an exception will
            be raised. (Default: ``None``)
        planner (Optional[SavePlanner]):
            Instance of SavePlanner. If this is not specified, the default
            planner will be used. (Default: ``None``)
        process_group (Optional[ProcessGroup]):
            ProcessGroup to be used for cross-rank synchronization.
            (Default: ``None``)
        async_checkpointer_type (AsyncCheckpointerType):
            whether to do checkpoint in separate thread or process
            (Default: ``AsyncCheckpointerType.THREAD``)
        async_stager (AsyncStager):
            provides staging implementation. If storage_writer implements AsyncStager
            and async_stager is provided, async_stager will be used for staging
        no_dist (bool):
            If ``True``, this function will assume the intent is to save
            a checkpoint on a single rank/process.
            (Default: ``False``)
        use_collectives: If False, Save the checkpoint without rank coordination. (Default: ``True``)
            This configuration is experimental and should be used with caution.
            It will change the format of the saved checkpoint and may not be backward compatible.

    Returns:
        Future: A future holding the resultant Metadata object from `save`.

    Example:
        >>> # xdoctest: +SKIP
        >>> my_model = MyModule()

        >>> state_dict = {"model": my_model}

        >>> fs_storage_writer = torch.distributed.checkpoint.FileSystemWriter(
        ...     "/checkpoint/1"
        ... )
        >>> checkpoint_future = torch.distributed.checkpoint.async_save(
        >>>     state_dict=state_dict,
        >>>     storage_writer=fs_storage_writer,
        >>> )
        >>>
        >>> # ... do some work ...
        >>>
        >>> checkpoint_future.result()

    z'torch.distributed.checkpoint.async_savecpuzfA CPU backend must be enabled for async save; try initializing process group with 'cpu:gloo,cuda:nccl'NFrA   Tr;   r7   c                  .                                    S N)stage)rQ   r1   s   r/   stage_state_dictz$async_save.<locals>.stage_state_dictA  s    !!*---r.   r=   original_staging_futurereturn_staging_futurec                     	 |                                   |                    d            d S # t          $ r }|                    |           Y d }~d S d }~ww xY wrU   )result
set_result	Exceptionset_exception)rX   rY   es      r/   callbackzasync_save.<locals>.callback[  sx    7'..000%0066666 7 7 7%33A6666666667s   )- 
AAA)rM   rN   c                  B     j         r                                  d S d S rU   ) should_synchronize_after_executesynchronize_staging)rQ   s   r/   maybe_synchronize_stagingz-async_save.<locals>.maybe_synchronize_stagingp  s0    < 300222223 3r.   )rC   rD   rE   rF   rG   rH   r   device_device_types
isinstancer   r   r   r   r   r   rK   r   r   r   r
   r"   r,   r   r   execute_savedoneadd_done_callbackr\   r#   )r1   r>   r2   r6   r3   rP   rQ   r5   r?   pgrW   staging_future_or_state_dictupload_executorupload_futurestaging_futurerY   r`   rd   s   `     `           r/   r!   r!      s   T 
H  !JKKK 
t244 
2022L2#3333t 433
 %*^[*Q*Q%)LL(	  L ~nmERRR N )44Jt,,,.eF?$;_$LM . . . . . . -,. $4#3#5#5  #&;&CCC 	-...022  ,88$#%#' 9  M .77 5.4hh 3H	7 	7%+O%<	7#)$<	7 	7 	7 	7 ""$$ 	3,,X6666!,,T222 !4
 
 
 	

 
4	0	0	0	3 	3 	3 	3 
1	0	3 	"!###r.   c                     i }|                                  D ]3\  }}t          |t                    r|                                n|||<   4|S )z]Creates a shallow copy of `state_dict` where `state_dict` is called for each Stateful object.)itemsrg   r   r1   )r1   stateful_state_dictkeyelems       r/   rK   rK   y  s_     %%'' 
 
	T!+D(!;!;EDOO 	C   r.   c                     t           j                            d           t          || |          t	                      J d i }t          dd           x}||d<   j        |d<   t          di | fd            }	t          di |fd            }
d r                    d|	|
          n |	            } |
|g          }|d         t          di |fd            }t          di |fd	            }r	                    d
||          }n* |            } ||g          }
                                 |S )Nz,torch.distributed.checkpoint.save_state_dictr>   r3   c                     J                                  } dt          j        j                  j        vr0t          j        d                               j                   n                    | j                   dt          j        j                  j        v r#                    j        j	                   n                    j                   
                                }                    |          }|S )Nstorage_metazThe function definition for SavePlanner.set_up_planner has been updated to include the storage_meta argument. Please update your implementation to include this parameter.)r1   rw   is_coordinatorkwargs)rankr?   )rw   inspect	signatureset_up_planner
parametersrI   rJ   rx   set_up_storage_writerrz   create_local_planprepare_local_plan)rw   
local_plandistWr6   r1   r2   r?   s     r/   
local_stepz$_save_state_dict.<locals>.local_step  s6   """%2244!273I!J!J!UUUM.  
 "":u/CDDDD""%)$3 #     !EFFQR R 00$Z / 1     001EFFF..00
#66zBB
r.   c                 j    J                      |           \  }                     |           } | S rU   )create_global_planprepare_global_plan)all_local_plansglobal_metadatar6   r2   s    r/   global_stepz%_save_state_dict.<locals>.global_step  sB     """+2+E+Eo+V+V((<<_MMr.   planr   c                      J J                                }                     |           }|                                 |                                S rU   )finish_plan
write_datawaitvalue)final_local_plan
all_writescentral_planr6   r2   s     r/   r   z$_save_state_dict.<locals>.write_data  sg    """'''"..|<<#../?II
!!!r.   c                 >    J                      |            S )N)metadataresults)finish)all_resultsr   r2   s    r/   finish_checkpointz+_save_state_dict.<locals>.finish_checkpoint  s.    ***LLLr.   writer-   )rC   rD   rE   r   r   getattrgroupr   reduce_scatter
all_reducebarrier)r1   r2   r3   r4   r5   r6   r?   ckpt_kwargsckpt_idr   r   r   global_planr   r   r   write_resultsr   r   r   s   ``   ``          @@@r/   r:   r:     s#    
H  !OPPPG5EFFE$&&OK>?DAAAN'.O$',{O$&&+&&        '&B &&+&&      '& (,L &++FJLL)z||
&1k:,&?&?"1~&&+&&" " " " " " '&" &&+&&     '&
  ##GZ9JKK+5:<<$$m_55Or.   )Nr   FN)Nr   FNT)Dr{   osrI   concurrent.futuresr   dataclassesr   enumr   typingr   r   r   typing_extensionsr	   rC   torch.distributeddistributedrF   #torch.distributed._state_dict_utilsr
   ,torch.distributed.checkpoint._async_executorr   4torch.distributed.checkpoint._async_process_executorr   3torch.distributed.checkpoint._async_thread_executorr   +torch.distributed.checkpoint._storage_utilsr   ,torch.distributed.checkpoint.default_plannerr   #torch.distributed.checkpoint.loggerr   %torch.distributed.checkpoint.metadatar   $torch.distributed.checkpoint.plannerr   r   $torch.distributed.checkpoint.stagingr   r   r   %torch.distributed.checkpoint.statefulr   $torch.distributed.checkpoint.storager   r   "torch.distributed.distributed_c10dr   utilsr   r   r   __all__r"   FutureWarningProcessGroupintboolr   strPathLiker    r#   r+   r!   rK   r:   r-   r.   r/   <module>r      s    				  % % % % % % ! ! ! ! ! !       ( ( ( ( ( ( ( ( ( ( ( ( ( ( ( (              ? ? ? ? ? ?                G F F F F F K K K K K K B B B B B B : : : : : : F F F F F F F F         
 ; : : : : : K K K K K K K K A A A A A A 8 8 8 8 8 8 8 8 8 8      D    !   26%)
 

!
 D-.
 	

 
 k"
 
 
 
 

. 4((( 48.2%)15 q
 q
 q
q
 bk4/0q
 ]+	q

 k"q
 D-.q
 q
 q
 q
 q
 q
  )(q
h 
$ 
$ 
$ 
$ 
$ 
$ 
$ 
$ 4((( 48.2%)155J5Q*. \ \ \\ bk4/0\ ]+	\
 k"\ D-.\ 3\ ;'\ \ \ 6$$%\ \ \ )(\~ 4((( O    )( 26%) a aa!a D-.a 	a
 a k"a a a a a a a ar.   