
    &`iD                         d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZmZ d dlmZ dZdZeddeedfd	ed
edededede	e         dede	e         dedede
ej        ej        ej        f         e
ej        ddf         f         fd            Z	 	 d-ded
edede	e         dede	ej                 fdZddeedfd	ed
edededede	e         dede	e         dedede
ej        ej        ej        f         f         fdZdedeee
eef         f         fdZ ej         e          Z!	 	 d.dede	e         de	eee
eef         f                  dej"        fdZ#dedefdZ$ej          G d d                      Z%d ej        dee&ddf         fd!Z'd"d#d$ej"        d%ed&eddfd'Z(ej         d(ej        d%eddfd)            Z)dd"d*ded%ede	e         d&eddf
d+Z* ej         e*          Z+defd,Z,dS )/    N)Dict	GeneratorListOptionalTupleUnion)TempFileLock)_force_on_node_get_node_id_from_node_ip)DeveloperAPIi  @   @F	source_ipsource_path	target_iptarget_path	force_allexcludechunk_size_bytesmax_size_bytesreturn_futuresreturnc	                     | |k    rt          | ||||||||	  	        S ||k    rt          | ||||          }	|r|	ddfS |	S dS )aD  Synchronize directory on source node to directory on target node.

    Per default, this function will collect information about already existing
    files in the target directory. Only files that differ in either mtime or
    filesize will be transferred, unless ``force_all=True``.

    If ``source_ip==target_ip``, shutil will be used to copy the directory. Otherwise,
    the directory will be packed and sent through the Ray Object Store to the target
    node.

    Args:
        source_ip: IP of source node.
        source_path: Path to directory on source node.
        target_ip: IP of target node.
        target_path: Path to directory on target node.
        force_all: If True, all files will be transferred (not just differing files).
            Ignored if ``source_ip==target_ip``.
        exclude: Pattern of files to exclude, e.g.
            ``["*/checkpoint_*]`` to exclude trial checkpoints.
        chunk_size_bytes: Chunk size for data transfer. Ignored if
            ``source_ip==target_ip``.
        max_size_bytes: If packed data exceeds this value, raise an error before
            transfer. If ``None``, no limit is enforced. Ignored if
            ``source_ip==target_ip``.
        return_futures: If True, returns a tuple of the unpack future,
            the pack actor, and the files_stats future. If False (default) will
            block until synchronization finished and return None.

    Returns:
        None, or Tuple of unpack future, pack actor, and files_stats future.
        If ``source_ip==target_ip``, pack actor and files_stats future will be None.

    )	r   r   r   r   r   r   r   r   r   )ipr   r   r   r   N)!_sync_dir_between_different_nodes_sync_dir_on_same_node)
r   r   r   r   r   r   r   r   r   rets
             p/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/tune/utils/file_transfer.pysync_dir_between_nodesr      s    b I0##-))

 

 

 
	
 
	#	#$##)
 
 
  	#d?"
 
$	#    r   c                     t          |           }t          j        dddit          |          }|                    |||          }|r|S t          j        |          S )aH  Synchronize directory to another directory on the same node.

    Per default, this function will collect information about already existing
    files in the target directory. All files will be copied over.

    Args:
        ip: IP of the node.
        source_path: Path to source directory.
        target_path: Path to target directory.
        exclude: Pattern of files to exclude, e.g.
            ``["*/checkpoint_*]`` to exclude trial checkpoints.
        return_futures: If True, returns a future of the copy task.

    Returns:
        None, or future of the copy task.

    num_cpusr   )
source_dir
target_dirr    )r   _remote_copy_diroptionsr
   remoterayget)r   r   r   r   r   node_idcopy_on_nodecopy_futures           r   r   r   [   su    2 (++G#+RRQR.:Q:QRRL%%; &  K  7;r   c	                    t          |           }	t          |          }
t          j        dddit          |	          }t	          j        dddit          |
          }|rd}n3t          j        dddit          |
                              |          }|                    |||||          }|                    ||          }|r|||fS t          j        |          S )a  Synchronize directory on source node to directory on target node.

    Per default, this function will collect information about already existing
    files in the target directory. Only files that differ in either mtime or
    filesize will be transferred, unless ``force_all=True``.

    Args:
        source_ip: IP of source node.
        source_path: Path to directory on source node.
        target_ip: IP of target node.
        target_path: Path to directory on target node.
        force_all: If True, all files will be transferred (not just differing files).
        exclude: Pattern of files to exclude, e.g.
            ``["*/checkpoint_*]`` to exclude trial checkpoints.
        chunk_size_bytes: Chunk size for data transfer.
        max_size_bytes: If packed data exceeds this value, raise an error before
            transfer. If ``None``, no limit is enforced.
        return_futures: If True, returns a tuple of the unpack future,
            the pack actor, and the files_stats future. If False (default) will
            block until synchronization finished and return None.

    Returns:
        None, or Tuple of unpack future, pack actor, and files_stats future.

    r!   r   N)r"   files_statsr   r   r   r$   )	r   
_PackActorr&   r
   _unpack_from_actor%_remote_get_recursive_files_and_statsr'   r(   r)   )r   r   r   r   r   r   r   r   r   source_node_idtarget_node_idpack_actor_on_source_nodeunpack_on_target_noder.   
pack_actorunpack_futures                   r   r   r      s8   J /y99N.y99N * 2 ! !!$^44! ! /6  $^44   ;C 
 

(88
 

&

 	 +11)% 2  J *00[IIM 6j+557=!!!r   pathc                 v   i }t          j        | d          D ]\  }}}t           j                            ||           }|D ]v}	 t           j                            ||          }t          j        t           j                            | |                    }|j        |j        f||<   g# t          $ r Y sw xY w|S )aT  Return dict of files mapping to stats in ``path``.

    This function scans a directory ``path`` recursively and returns a dict
    mapping each contained file to a tuple of (mtime, filesize).

    mtime and filesize are returned from ``os.lstat`` and are usually a
    floating point number (timestamp) and an int (filesize in bytes).
    Ftopdown)	oswalkr8   relpathjoinlstatst_mtimest_sizeFileNotFoundError)	r8   r.   rootdirsfilesrel_rootfilekeystats	            r   _get_recursive_files_and_statsrK      s     KWT5999 
 
dE7??4.. 	 	Dgll8T22xT3 7 788#'=$,#>C  $    	 s   A#B((
B54B5r"   r.   c           	         dt           dt          ffd}t          j                    }t	          j        |dt          j                  5 }|ss|                    | dd           nm|pi }|                    | dd	           t          j	        | d	
          D ]9\  }}}t          j
                            ||           }	|D ]X}
t          j
                            |	|
          }|                    t          j
                            | |          |d	           Y|D ]}t          j
                            |	|          }t          j        t          j
                            | |                    }|j        |j        f} ||          rn||v r||         |k    r|                    t          j
                            | |          |           ;ddd           n# 1 swxY w Y   |S )a_  Pack whole directory contents into an uncompressed tarfile.

    This function accepts a ``files_stats`` argument. If given, only files
    whose stats differ from these stats will be packed.

    The main use case for this is that we can collect information about files
    already existing in the target directory, and only pack files that have
    been updated. This is similar to how cloud syncing utilities decide
    which files to transfer.

    Args:
        source_dir: Path to local directory to pack into tarfile.
        exclude: Pattern of files to exclude, e.g.
            ``["*/checkpoint_*]`` to exclude trial checkpoints.
        files_stats: Dict of relative filenames mapping to a tuple of
            (mtime, filesize). Only files that differ from these stats
            will be packed.

    Returns:
        Tarfile as a stream object.
    	candidater   c                 J    sdS D ]}t          j         | |          r dS dS )NFT)fnmatch)rM   exclr   s     r   _should_excludez"_pack_dir.<locals>._should_exclude   sD     	5 	 	Dy$// ttur   w)fileobjmodeformat T)arcname	recursiveFr:   )rW   N)strboolioBytesIOtarfileopen
PAX_FORMATaddr<   r=   r8   r>   r?   r@   rA   rB   )r"   r   r.   rQ   streamtarrD   rE   rF   rG   dirrI   rH   rJ   	file_stats    `             r   	_pack_dirre      sO   63 4       Z\\F	f3w7I	J	J	J Hc 	H7 	HGGJdG;;;;%+KGGJeG<<<%'WZ%G%G%G H H!dE7??4<< Y YC',,x55CGGBGLLS993RWGXXXX! H HD',,x66C8BGLLS$A$ABBD $t| ;I&s++ ! k))k#.>).K.K GGBGLLS993GGGGGH!H H H H H H H H H H H H H H H> Ms   
FG""G&)G&	num_bytesc                 .    t          | dz            ddS )Nr   z.2fGiB)float)rf   s    r   _gib_stringrj   *  s!    I	)**33333r   c                       e Zd ZdZddeefdedee         dee	ee
eef         f                  dedee         f
dZd	efd
Zd	eeddf         fdZd	ee         fdZdS )r/   a  Actor wrapping around a packing job.

    This actor is used for chunking the packed data into smaller chunks that
    can be transferred via the object store more efficiently.

    The actor will start packing the directory when initialized, and separate
    chunks can be received by calling the remote ``next()`` task.

    Args:
        source_dir: Path to local directory to pack into tarfile.
        exclude: Pattern of files to exclude, e.g.
            ``["*/checkpoint_*]`` to exclude trial checkpoints.
        files_stats: Dict of relative filenames mapping to a tuple of
            (mtime, filesize). Only files that differ from these stats
            will be packed.
        chunk_size_bytes: Cut bytes stream into chunks of this size in bytes.
        max_size_bytes: If packed data exceeds this value, raise an error before
            transfer. If ``None``, no limit is enforced.
    Nr"   r   r.   r   r   c           
      <   t          |||          | _        | j                            dd           | j                                        }|r9||k    r3t	          d| dt          |           dt          |           d          || _        || _        d | _        d S )N)r"   r   r.   r      zPacked directory z content has a size of z, which exceeds the limit of z. Please check the directory contents. If you want to transfer everything, you can increase or disable the limit by passing the `max_size` argument.)	re   ra   seektellRuntimeErrorrj   
chunk_sizemax_sizeiter)selfr"   r   r.   r   r   	file_sizes          r   __init__z_PackActor.__init__D  s      !7
 
 

 	AK$$&&	 	i.88LJ L Ly))L L!.11L L L   +&			r   r   c                 4    | j                                         S N)ra   getvaluert   s    r   get_full_dataz_PackActor.get_full_data`  s    {##%%%r   c              #      K   | j                             d           | j                             | j                  }|r'|V  | j                             | j                  }|%d S d S )Nr   )ra   rn   readrq   )rt   datas     r   _chunk_generatorz_PackActor._chunk_generatorc  sy      {00 	5JJJ;##DO44D  	5 	5 	5 	5 	5r   c                     | j         s&t          |                                           | _         	 t          | j                   S # t          $ r Y d S w xY wrx   )rs   r   nextStopIterationrz   s    r   r   z_PackActor.nextj  s\    y 	6T224455DI		??" 	 	 	44	s   A 
AA)__name__
__module____qualname____doc___DEFAULT_CHUNK_SIZE_BYTES_DEFAULT_MAX_SIZE_BYTESrY   r   r   r   r   ri   intrv   bytesr{   r   r   r   r$   r   r   r/   r/   .  s         . #'>B 9(?  $ d3eSj(9#9:;	
  !   8&u & & & &5)E4,="> 5 5 5 5huo      r   r/   actorc              #   p   K   	 t          j        | j                                                  }|dS |V  4)z0Iterate over actor task and return as generator.TN)r(   r)   r   r'   )r   buffers     r   _iter_remoter   s  s@      **,,-->F	r   T_retryra   r#   r   c                X   |                      d           t          j                            |          }	 t	          | dd          5  t          j        |           5 }|                    |           ddd           n# 1 swxY w Y   ddd           dS # 1 swxY w Y   dS # t          $ rx t	          | d          5  	 ddd           n# 1 swxY w Y   t          j        	                    |          s*|rt          | |d           Y dS t          d| d	          Y dS w xY w)
z,Unpack tarfile stream into target directory.r   .locktimeout)rS   NFr   Target directory u does not exist and couldn't be recreated. Please raise an issue on GitHub: https://github.com/ray-project/ray/issues)rn   r<   r8   normpathr	   r]   r^   
extractallTimeoutErrorexists_unpack_dirrp   )ra   r#   r   rb   s       r   r   r   |  s)   
KKNNN!!*--J Z...::: 	+ 	+f--- +z***+ + + + + + + + + + + + + + +	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+ 	+    Z...// 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 w~~j)) 		 FJu======="@
 @ @ @  			 		 		s   B' 
B B6BB	B	B	
BB' BB' !B"B' 'D)CD)C	D)C	6D)D)(D)r6   c                     t          j                    }t          |           D ]}|                    |           t	          ||           dS )z8Iterate over chunks received from pack actor and unpack.)r#   N)r[   r\   r   writer   )r6   r#   ra   r   s       r   r0   r0     sT     Z\\Fz**  V:......r   )r   r   c                    t           j                            |          }	 t          | dd          5  t	          |           d}r fd}|}t          j         ||           ddd           dS # 1 swxY w Y   dS # t          $ rx t          | d          5  	 ddd           n# 1 swxY w Y   t           j                            |          s*|rt           |d           Y dS t          d	| d
          Y dS w xY w)z"Copy dir with shutil on the actor.r   r   r   Nc                    t                      }t          j                            |           }|D ]S}t          j                            ||          }D ].}t          j        ||          r|                    |            n/T|S rx   )setr<   r8   r>   r?   rO   r`   )	r8   namesignored_namesrel_pathnamerM   rP   r   r"   s	          r   _ignorez_copy_dir.<locals>._ignore  s    $'EEM!wtZ@@H % & &$&GLL4$@$@	$+ & &D&y$?? & - 1 1$ 7 7 7 %& )(r   )ignoreFr   r   r   )r<   r8   r   r	   _delete_path_unsafeshutilcopytreer   r   	_copy_dirrp   )r"   r#   r   r   _ignore_funcr   s   ` `   r   r   r     s    !!*--J' Z...::: 	I 	I
+++L '	) 	) 	) 	) 	) 	)  'OJ
<HHHH'	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I 	I(    Z...// 	 		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 w~~j)) 		 *j???????"@
 @ @ @  			 		 		s^   B 3A7*B 7A;;B >A;?B D B."D.B2	2D5B2	66D/DDc                     t           j                            |           rJt           j                            |           rt	          j        |            nt          j        |            dS dS )z1Delete path (files and directories). No filelock.TF)r<   r8   r   isdirr   rmtreeremove)r   s    r   r   r     s[    	w~~k"" 7==%% 	#M+&&&&Ik"""t5r   )NF)NN)-rO   r[   r<   r   r]   typingr   r   r   r   r   r   r(   ray.air._internal.filelockr	   ray.air.util.noder
   r   ray.util.annotationsr   r   r   rY   rZ   r   	ObjectRefActorIDr   r   r   ri   rK   r'   r1   r\   re   rj   r/   r   r   r   r0   r   r%   r   r$   r   r   <module>r      s    				 				   @ @ @ @ @ @ @ @ @ @ @ @ @ @ @ @ 



 3 3 3 3 3 3 G G G G G G G G - - - - - -- 0   "5$; F FFF F 	F
 F d^F F SMF F 	#-cm
34	#-t
#$&F F F FZ # #  # # #  #  d^	# 
 #  cm#  #  #  # V "5$; B" B"B"B" B" 	B"
 B" d^B" B" SMB" B" 4s}ck3=@AABB" B" B" B"J c5;L6L1M    4 )3
3Q(R(R %
 #:>D DDd^D $sE%*$5567D Z	D D D DN45 4S 4 4 4 4 A A A A A A A AH 	%t2C(D     HL   
   PT    : /3; /C /D / / / / #0 0 000 d^	0
 0 
0 0 0 0h 3:i(( S      r   