
    &`ip.                        d Z ddlZddlmZmZmZmZmZmZ ddl	Z	ddl	m
Z
 ddlmZ eZeZeZ G d d          Z G d d	e          Zd
ee         dedeeeef                  fdZe	j         G d d                      Zd Zeeddddeegee         f         dededeeeee
ef                  gef         deee         egee         f         dedededee         fdZd Z	 	 	 	 	 	 	 	 	 d%d"Zd# Zed$k    r e             dS dS )&a  A simple distributed shuffle implementation in Ray.

This utility provides a `simple_shuffle` function that can be used to
redistribute M input partitions into N output partitions. It does this with
a single wave of shuffle map tasks followed by a single wave of shuffle reduce
tasks. Each shuffle map task generates O(N) output objects, and each shuffle
reduce task consumes O(M) input objects, for a total of O(N*M) objects.

To try an example 10GB shuffle, run:

    $ python -m ray.experimental.shuffle         --num-partitions=50 --partition-size=200e6         --object-store-memory=1e9

This will print out some statistics on the shuffle execution such as:

    --- Aggregate object store stats across all nodes ---
    Plasma memory usage 0 MiB, 0 objects, 0.0% full
    Spilled 9487 MiB, 2487 objects, avg write throughput 1023 MiB/s
    Restored 9487 MiB, 2487 objects, avg read throughput 1358 MiB/s
    Objects consumed by Ray tasks: 9537 MiB.

    Shuffled 9536 MiB in 16.579771757125854 seconds
    N)AnyCallableIterableListTupleUnion)	ObjectRef)Clusterc                   @    e Zd ZdZd ZdeddfdZdee         fdZ	dS )ObjectStoreWriteraT  This class is used to stream shuffle map outputs to the object store.

    It can be subclassed to optimize writing (e.g., batching together small
    records into larger objects). This will be performance critical if your
    input records are small (the example shuffle uses very large records, so
    the naive strategy works well).
    c                     g | _         d S Nresultsselfs    l/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/experimental/shuffle.py__init__zObjectStoreWriter.__init__2           itemreturnNc                 ^    | j                             t          j        |                     dS )zQueue a single item to be written to the object store.

        This base implementation immediately writes each given item to the
        object store as a standalone object.
        N)r   appendrayputr   r   s     r   addzObjectStoreWriter.add5   s(     	CGDMM*****r   c                     | j         S )z6Return list of object refs representing written items.r   r   s    r   finishzObjectStoreWriter.finish=   s
    |r   )
__name__
__module____qualname____doc__r   InTyper   r   r	   r     r   r   r   r   )   sn           + +4 + + + +Y      r   r   c                   <    e Zd Zd ZdeddfdZdee         fdZdS )ObjectStoreWriterNonStreamingc                     g | _         d S r   r   r   s    r   r   z&ObjectStoreWriterNonStreaming.__init__C   r   r   r   r   Nc                 :    | j                             |           d S r   )r   r   r   s     r   r   z!ObjectStoreWriterNonStreaming.addF   s    D!!!!!r   c                     | j         S r   r   r   s    r   r    z$ObjectStoreWriterNonStreaming.finishI   s
    |r   )	r!   r"   r#   r   r%   r   r   r   r    r&   r   r   r(   r(   B   sd          " "4 " " " "S	      r   r(   input_streamnum_partitionsr   c              #   8   K   d}| D ]}||fV  |dz  }||z  }dS )a8  Round robin partitions items from the input reader.

    You can write custom partitioning functions for your use case.

    Args:
        input_stream: Iterator over items from the input reader.
        num_partitions: Number of output partitions.

    Yields:
        Tuples of (partition id, input item).
    r      Nr&   )r,   r-   ir   s       r   round_robin_partitionerr1   M   sH       	
A  $i	Q	^ r   c                        e Zd Zd Zd Zd ZdS )_StatusTrackerc                 >    d| _         d| _        g | _        g | _        d S Nr   )num_map
num_reducemap_refsreduce_refsr   s    r   r   z_StatusTracker.__init__d   s%    r   c                 "    || _         || _        d S r   )r8   r9   )r   r8   r9   s      r   register_objectrefsz"_StatusTracker.register_objectrefsj   s     &r   c                    | j         rUt          j        | j         dt          | j                   d          \  }| _         | xj        t          |          z  c_        n[| j        rTt          j        | j        dt          | j                  d          \  }| _        | xj        t          |          z  c_        | j        | j        fS )Nr/   F)timeoutnum_returnsfetch_local)r8   r   waitlenr6   r9   r7   )r   readys     r   get_progressz_StatusTracker.get_progressn   s    = 	*#&8..!	$ $ $ E4= LLCJJ&LLL 	*&)h  011!	' ' '#E4# OOs5zz)OO|T_,,r   N)r!   r"   r#   r   r;   rC   r&   r   r   r3   r3   b   sA          ' ' '- - - - -r   r3   c                    ddl m } d}d} ||d          }|                    d            ||d          }|                    d           ||k     s||k     rt          j        | j                                                  \  }}	|                    ||z
             |                    |	|z
             |}|	}t          j        d           ||k     |||k     |	                                 |	                                 d S )Nr   )tqdm)totalpositionzMap Progress.r/   zReduce Progress.g?)
rE   set_descriptionr   getrC   remoteupdatetimesleepclose)
trackerinput_num_partitionsoutput_num_partitionsrE   r6   r7   map_bar
reduce_barnew_num_mapnew_num_reduces
             r   render_progress_barrV      s2   GJd-:::GO,,,1A>>>J1222
(
(
(J9N,N,N&)gg.B.I.I.K.K&L&L#^{W,---.:5666#

3 (
(
(J9N,N,N MMOOOr   T)partitionerobject_store_writerrO   	streaminginput_readerrP   rQ   output_writerrW   rX   rO   rY   c           	      |   	
 t          j                  dt          dt          t          t          t
          t          f                           f fd            	t           j        dt          dt          t          t          t
          t          f                           dt          ffd            	fdt                    D             

fdt                    D             }|r6|j	                            d	 
D             |           t          |           t          j        |          S )
a  Simple distributed shuffle in Ray.

    Args:
        input_reader: Function that generates the input items for a
            partition (e.g., data records).
        input_num_partitions: The number of input partitions.
        output_num_partitions: The desired number of output partitions.
        output_writer: Function that consumes a iterator of items for a
            given output partition. It returns a single value that will be
            collected across all output partitions.
        partitioner: Partitioning function to use. Defaults to round-robin
            partitioning of input items.
        object_store_writer: Class used to write input items to the
            object store in an efficient way. Defaults to a naive
            implementation that writes each input record as one object.
        tracker: Tracker actor that is used to display the progress bar.
        streaming: Whether or not if the shuffle will be streaming.

    Returns:
        List of outputs from the output writers.
    r>   r0   r   c                     fdt                    D             }  |                     D ] \  }}||                             |           !d |D             S )Nc                 $    g | ]}             S r&   r&   ).0_rX   s     r   
<listcomp>z7simple_shuffle.<locals>.shuffle_map.<locals>.<listcomp>   s#    OOOQ&&((OOOr   c                 6    g | ]}|                                 S r&   )r    )r`   cs     r   rb   z7simple_shuffle.<locals>.shuffle_map.<locals>.<listcomp>   s     ,,,q

,,,r   )ranger   )r0   writersout_ir   rZ   rX   rQ   rW   s       r   shuffle_mapz#simple_shuffle.<locals>.shuffle_map   s|    OOOO%8M2N2NOOO&;||A8MNN 	% 	%KE4ENt$$$$,,G,,,,r   mapper_outputsc                     g }t          |          k    sJ |D ]}|D ]}|                    |            | |          S r   )rA   r   )r0   ri   input_objectsobj_refsobj_refrP   r[   s        r   shuffle_reducez&simple_shuffle.<locals>.shuffle_reduce   sp     >""&:::::& 	. 	.H# . .$$W----.}Q...r   c                 :    g | ]}                     |          S r&   )rJ   )r`   r0   rh   s     r   rb   z"simple_shuffle.<locals>.<listcomp>   s'    RRR{))!,,RRRr   c           	      b    g | ]* j         gfd t                    D             R  +S )c                 ,    g | ]}|                  S r&   r&   )r`   r0   jshuffle_map_outs     r   rb   z-simple_shuffle.<locals>.<listcomp>.<listcomp>   s#    LLL1#A&LLLr   )rJ   re   )r`   rr   rP   rs   rn   s    @r   rb   z"simple_shuffle.<locals>.<listcomp>   sk         		
LLLLL6J0K0KLLL	
 	
 	
  r   c                     g | ]
}|d          S )r   r&   )r`   map_outs     r   rb   z"simple_shuffle.<locals>.<listcomp>   s    777GWQZ777r   )r   rJ   PartitionIDr   r   r   r	   OutTypere   r;   rV   rI   )rZ   rP   rQ   r[   rW   rX   rO   rY   shuffle_reduce_outrh   rs   rn   s   ``````   @@@r   simple_shufflery      s   F 	Z1222-{ -tDsI~1F,G'H - - - - - - - - 32- 	Z//)-d5i3H.I)J/	/ / / / / / Z/ SRRRe<P6Q6QRRRO      ,--	    R#**777779K	
 	
 	
 	G%9;PQQQ7%&&&r   c                     t                      }t          |           D ]}|                    ||           |                                 |S )Nnum_cpusobject_store_memory)r
   re   add_nodewait_for_nodes)	num_nodesr|   r}   clusterra   s        r   build_clusterr      sX    iiG9 U U(@STTTTNr       eA       קA   Fc	                    dd l }	dd l|}
| r&t          d           t          j        | d           n|
rqt          d           t          d|            t          d|            t          d|            t          |||          }t          j        |j        	           n%t          d
           t          j        ||           t          |          }|dz  |t          	                                }dt          dt          t                   ffd}dt          dt          t                   dt          ffd}dt          dt          t                    dt          fd}|r
|}t"          }n	t$          }|} |	j                     }t'          ||||          } |	j                     |z
  } |	j        d           t                       d }t+          d          D ]M}	 t          j        j                            d          }n # t2          $ r  |	j        d           Y nw xY w|r nNt          |           t                       t          dt          t5          |          dz            d|d           d S )Nr   z#Connecting to a existing cluster...T)addressignore_reinit_errorzEmulating a cluster...zNum nodes: zNum CPU per node: zObject store memory per node: )r   zStart a new cluster...r{      r0   r   c              3   v   K   t                    D ]%}                    z  dfj                  V  &d S )N   )dtype)re   onesint64)r0   ra   npr-   rows_per_partitions     r   rZ   zrun.<locals>.input_reader  sX      ~&& 	U 	UA''-?C28'TTTTTT	U 	Ur   shuffle_inputsc                     d}s,|D ](}t          j        |          }||j        |j        z  z  })nE|rCt          j        |d          \  \  }}t          j        |          }||j        |j        z  z  }|C|S )Nr   r/   r]   )r   rI   sizeitemsizer@   )r0   r   rF   rm   arrrB   use_waits         r   r[   zrun.<locals>.output_writer  s     	1) 1 1gg&&CL001 ! 1*-(>q*Q*Q*Q'gennCL00 ! 1
 r   c                 8    d}|D ]}||j         |j        z  z  }|S r5   )r   r   )r0   r   rF   r   s       r   output_writer_non_streamingz(run.<locals>.output_writer_non_streaming  s2     ! 	- 	-CSX,,EEr   )rZ   rP   rQ   r[   rX   rO   g      ?r   )
stats_onlyr/   Shuffledi   zMiB inseconds)rL   numpyprintr   initr   r   intr3   rJ   rv   r   r%   r   r	   rw   r   r(   r   ry   rM   re   _privateinternal_apimemory_summary	Exceptionsum)ray_addressr}   r-   partition_sizer   r|   no_streamingr   rO   rL   is_multi_noder   rZ   r[   r   output_writer_callablerX   startoutput_sizesdeltasummaryr0   r   r   s     `    `              @@r   runr      s6    KKKM M3444$?????	 	M&''''I''(((-8--...D/BDDEEE	85HII)))))&'''(8KLLLL((N#N'E2 ''))HU U(8 U U U U U U U U d9o '      (,S		     /!<;/!.DIKKE!!+,,/  L DIKK%EDJsOOO	GGGG1XX  	l/>>$>OOGG 	 	 	DJqMMMD	  	E		'NNN	GGG	CL))[9::HeY    s   *%HH-,H-c            
         dd l } |                                 }|                    dt          d            |                    dt          d           |                    dt
          d           |                    dt          d	           |                    d
t
          d            |                    dt
          d           |                    ddd           |                    ddd           |                                }t          |j        |j	        |j
        |j        |j        |j        |j        |j                   d S )Nr   z--ray-address)typedefaultz--object-store-memoryr   z--num-partitionsr   z--partition-sizer   z--num-nodesz
--num-cpusr   z--no-streaming
store_trueF)actionr   z
--use-wait)r   r}   r-   r   r   r|   r   r   )argparseArgumentParseradd_argumentstrfloatr   
parse_argsr   r   r}   r-   r   r   r|   r   r   )r   parserargss      r   mainr   J  sW   OOO$$&&F
c4@@@
/eSIII
*a@@@
*FFF
C>>>
3:::
(uMMM
\5IIID$ 4**.&	 	 	 	 	 	r   __main__)	Nr   r   r   Nr   FFN)r$   rL   typingr   r   r   r   r   r   r   r	   ray.cluster_utilsr
   r%   rw   r   rv   r   r(   r1   rJ   r3   rV   boolry   r   r   r   r!   r&   r   r   <module>r      s   0  > > > > > > > > > > > > > > > > 



       % % % % % % 

       2    $5   6"47eK'()   * - - - - - - - ->  : 	 ->"D' D' D'K=(6*::;D' D' 	D'
 [$uY^/D*EFOPD' 	&	3+!66D' +D' D' D' 
']D' D' D' D'N   a a a aH  4 zDFFFFF r   