
    &`i'                        d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ 	 d dlZn# e$ r dZY nw xY werd d
lmZ  ej        e           Z! ed           G d d                      Z" ej#        d            G d d                      Z$d Z% G d d          Z&d Z'dS )    N)defaultdict)TYPE_CHECKINGAnyListOptional)(_ref_bundles_iterator_to_block_refs_list)cached_remote_fn)BlockAccessor)DataContext)	ObjectRef)	PublicAPI)Datasetalpha)	stabilityc                       e Zd ZdZdddedefdZd Zdede	e         fd	Z
d
ee         deee                  fdZdefdZdefdZdedefdZdS )RandomAccessDatasetzuA class that provides distributed, random access to a Dataset.

    See: ``Dataset.to_random_access_dataset()``.
    dsr   keynum_workersc                     |                     d          }|t          |t                    rt          d          t	          j                    }t                              d           |                              }t          t                    |                                }t          |          }t                              d           t          j        fd|D                       }	g  _        d _        g  _        t%          |	          D ][\  }
}|rT j                            ||
                     j        |d          _         j                            |d	                    \t                              d
                    |                     t+          j                    }|j        fdt1          |          D              _                                         \   _         _        t                              d                     j                             t          j         fd j        D                        t                              d           t	          j                    |z
   _        dS )zConstruct a RandomAccessDataset (internal API).

        The constructor is a private API. Use ``ds.to_random_access_dataset()``
        to construct a RandomAccessDataset.
        T)fetch_if_missingNz6RandomAccessDataset only supports Arrow-format blocks.z%[setup] Indexing dataset by sort key.z%[setup] Computing block range bounds.c                 <    g | ]}                     |          S  )remote).0b
get_boundsr   s     r/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/random_access_dataset.py
<listcomp>z0RandomAccessDataset.__init__.<locals>.<listcomp>=   s)    DDD*++As33DDD    r      z*[setup] Creating {} random access workers.c                 l    g | ]0}t                                                                        1S ))scheduling_strategy)_RandomAccessWorkeroptionsr   )r   _r   r#   s     r   r   z0RandomAccessDataset.__init__.<locals>.<listcomp>K   sP     
 
 
   ''<O'PPWW 
 
 
r    z'[setup] Worker to blocks assignment: {}c                 r    g | ]3}|j                             fd j        |         D                       4S )c                 ,    i | ]}|j         |         S r   )_non_empty_blocks)r   iselfs     r   
<dictcomp>z;RandomAccessDataset.__init__.<locals>.<listcomp>.<dictcomp>\   s3        41!4  r    )assign_blocksr   _worker_to_blocks_map)r   wr+   s     r   r   z0RandomAccessDataset.__init__.<locals>.<listcomp>Z   sk         &&   !%!;A!>     r    z-[setup] Finished assigning blocks to workers.)schema
isinstancetype
ValueErrortimeperf_counterloggerinfosortr	   _get_boundsiter_internal_ref_bundlesr   raygetr)   _lower_bound_upper_bounds	enumerateappendformatr   get_currentr#   range_workers$_compute_block_to_worker_assignments_block_to_workers_mapr.   _build_time)r+   r   r   r   r0   start	sorted_dsbundlesblocksboundsr*   r   ctxr   r#   s   ` `          @@r   __init__zRandomAccessDataset.__init__&   s    D11>Z55>UVVV!##;<<<GGCLL	%k22
55779'BB;<<<DDDDDVDDDEE!# f%% 	0 	0DAq 0&--fQi888$,()!D%"))!A$///@GGTTUUU%''!5
 
 
 
 
 ;''	
 
 
 5577	
&& 	5<<T=WXX	
 	
 	
 	      
	
 
	
 
	
 	CDDD,..6r    c                    t          t                    }t          t                    }t          t                    }t          j        d | j        D                       }t          |          D ]+\  }}||                             | j        |                    ,t          j                            | j	                  }t          | j	                  D ]i\  }}	||	         }
|
                    dg           }|D ]C}||         D ]8}||                             |           ||                             |           9Djt          | j	                  D ]m\  }}	t          ||                   dk    rOt          j        | j                  }||                             |           ||                             |           n||fS )Nc                 @    g | ]}|j                                         S r   )pingr   r   r/   s     r   r   zLRandomAccessDataset._compute_block_to_worker_assignments.<locals>.<listcomp>o   s"    ???A???r    node_idsr   )r   listr;   r<   rD   r?   r@   experimentalget_object_locationsr)   lenrandomchoice)r+   block_to_workersworker_to_blocksloc_to_workerslocsr*   loc
block_locs	block_idxblock
block_infoworkers               r   rE   z8RandomAccessDataset._compute_block_to_worker_assignmentsh   s   ?J4?P?P?J4?P?P >I=N=Nw?????@@oo 	9 	9FAs3&&t}Q'78888%::4;QRR
 !*$*@ A A 	? 	?Iu#E*J>>*b11D ? ?,S1 ? ?F$Y/66v>>>$V,33I>>>>?? !*$*@ A A 	; 	;Iu#I.//144t}55 +226::: (//	:::!111r    returnc                     |                      |          }|t          j        d          S |                     |          j                            ||          S )zAsynchronously finds the record for a single key.

        Args:
            key: The key of the record to find.

        Returns:
            ObjectRef containing the record (in pydict form), or None if not found.
        N)_find_ler;   put_worker_forr<   r   )r+   r   block_indexs      r   	get_asynczRandomAccessDataset.get_async   sP     mmC((74== ,,077SIIIr    keysc                    t          t                    }|D ]0}||                     |                                       |           1i }|                                D ]L\  }}||                     |          j                            |gt          |          z  |          }|||<   Mi |                                D ]<\  }}||         }t          j
        |          }	t          ||	          D ]
\  }}
|
|<   =fd|D             S )zSynchronously find the records for a list of keys.

        Args:
            keys: List of keys to find the records for.

        Returns:
            List of found records (in pydict form), or None for missing records.
        Nc                 :    g | ]}                     |          S r   )r<   )r   kresultss     r   r   z0RandomAccessDataset.multiget.<locals>.<listcomp>   s#    ---1A---r    )r   rT   rf   r@   itemsrh   multigetr   rW   r;   r<   zip)r+   rk   batchesrn   futuresindexkeybatchfutr*   valuesvro   s              @r   rq   zRandomAccessDataset.multiget   s3    d## 	0 	0ADMM!$$%,,Q////&}} 	! 	!OE8}""5))299#h--' C !GENNmmoo 	 	FAsqzHWS\\FHf--  1

--------r    c                 @   t          j        d | j        D                       }t          d |D                       }d |D             }d |D             }d}|d                    t          | j        d                    z  }|d                    t          |                    z  }|d	                    t          |          t          |          t          t          |          t          |          z                      z  }|d
                    t          |          t          |          t          t          |          t          |          z                      z  }|d                    t          |dt          |          z   z  dz                      z  }|S )z6Returns a string containing access timing information.c                 @    g | ]}|j                                         S r   )statsr   rR   s     r   r   z-RandomAccessDataset.stats.<locals>.<listcomp>   s$    AAAa))AAAr    c              3   &   K   | ]}|d          V  dS )
total_timeNr   r   ss     r   	<genexpr>z,RandomAccessDataset.stats.<locals>.<genexpr>   s&      88Q<888888r    c                     g | ]
}|d          S )num_accessesr   r   s     r   r   z-RandomAccessDataset.stats.<locals>.<listcomp>   s    555!An%555r    c                     g | ]
}|d          S )
num_blocksr   r   s     r   r   z-RandomAccessDataset.stats.<locals>.<listcomp>   s    111a!L/111r    zRandomAccessDataset:
z- Build time: {}s
   z- Num workers: {}
z-- Blocks per worker: {} min, {} max, {} mean
z/- Accesses per worker: {} min, {} max, {} mean
z- Mean access time: {}us
r!   g    .A)r;   r<   rD   sumrA   roundrG   rW   minmaxint)r+   r|   r~   accessesrK   msgs         r   r|   zRandomAccessDataset.stats   s   AA4=AAABB88%88888
55u555115111&$++E$2BA,F,FGGG$++CJJ777?FFKKVc#f++F*C&D&D
 
 	
 	AHHMM3x==#c(mmc(mm.K*L*L
 
 	
 	+22
a#h--/03677
 
 	
 
r    ri   c                 @    t          j        | j        |                   S N)rX   rY   rF   )r+   ri   s     r   rh   zRandomAccessDataset._worker_for   s    }T7DEEEr    xc                     t          j        | j        |          }|t          | j                  k    s|| j        k     rd S |S r   )bisectbisect_leftr>   rW   r=   )r+   r   r*   s      r   rf   zRandomAccessDataset._find_le   sD    t1155D&''''1t/@+@+@4r    N)__name__
__module____qualname____doc__strr   rN   rE   r   r   rj   r   r   rq   r|   rh   rf   r   r    r   r   r      s        
@7@7 @7 	@7 @7 @7 @7D2 2 2>JS JYs^ J J J J.T#Y .4+> . . . .8s    (Fs F F F F# #      r    r   )num_cpusc                   >    e Zd Zd Zd Zd Zd Zd ZdefdZ	d Z
d	S )
r$   c                 >    d | _         || _        d| _        d| _        d S )Nr   )rK   	key_fieldr   r~   )r+   r   s     r   rN   z_RandomAccessWorker.__init__   s#    "r    c                 L    d |                                 D             | _        d S )Nc                 >    i | ]\  }}|t          j        |          S r   )r;   r<   )r   rn   refs      r   r,   z5_RandomAccessWorker.assign_blocks.<locals>.<dictcomp>   s&    LLL61cq#'#,,LLLr    )rp   rK   )r+   block_ref_dicts     r   r-   z!_RandomAccessWorker.assign_blocks   s'    LL^5I5I5K5KLLLr    c                     t          j                    }|                     ||          }| xj        t          j                    |z
  z  c_        | xj        dz  c_        |S )Nr!   )r4   r5   _getr~   r   )r+   ri   r   rH   results        r   r<   z_RandomAccessWorker.get   sZ    !##;,,4,..66Qr    c                     t          j                    } j        |d                  }t          t	          |                    dk    rt           j        |d                  t          j                  rz j        |d                  }| j                 }t          j
        ||          }t          j        |          fdt          ||                    |          |          D             }n fdt          ||          D             } xj        t          j                    |z
  z  c_         xj        dz  c_        |S )Nr   r!   c                 v    g | ]5\  }}}|                                 |k    r                    |          nd 6S r   )as_py_get_row)r   r*   k1k2accs       r   r   z0_RandomAccessWorker.multiget.<locals>.<listcomp>   sO       Ar2 $&88::#3#3Q  r    c                 B    g | ]\  }}                     ||          S r   )r   )r   r*   rn   r+   s      r   r   z0_RandomAccessWorker.multiget.<locals>.<listcomp>   s)    KKK$!Qdii1ooKKKr    )r4   r5   rK   rW   setr1   paTabler   npsearchsortedr
   	for_blockrr   taker~   r   )	r+   block_indicesrk   rH   ra   colindicesr   r   s	   `       @r   rq   z_RandomAccessWorker.multiget   s?   !##M!,-s=!!""a''JKa()28-
 -
'
 Ka 01E'Coc400G)%00C   !$Wchhw.?.?!F!F  FF
 LKKK#mT2J2JKKKF4,..66Qr    c                 L    t          j                                                    S r   )r;   get_runtime_contextget_node_idr+   s    r   rQ   z_RandomAccessWorker.ping   s    &((44666r    rd   c                 F    t          | j                  | j        | j        dS )N)r   r   r~   )rW   rK   r   r~   r   s    r   r|   z_RandomAccessWorker.stats   s)    dk** -/
 
 	
r    c                 
   |d S | j         |         }|| j                 }t          |t          j                  rt          |          }t          ||          }|d S t          j        |          }|	                    |          S r   )
rK   r   r1   r   r   _ArrowListWrapper_binary_search_findr
   r   r   )r+   ri   r   ra   columnr*   r   s          r   r   z_RandomAccessWorker._get  s    4K(t~&eRX&& 	/&v..F,,94%e,,||Ar    N)r   r   r   rN   r-   r<   rq   rQ   dictr|   r   r   r    r   r$   r$      s          M M M    ,7 7 7
t 
 
 
 
    r    r$   c                 r    t          j        | |          }|t          |           k    r| |         |k    r|S d S r   )r   r   rW   )r   r   r*   s      r   r   r     s;    61%%ACKKF1INN4r    c                        e Zd Zd Zd Zd ZdS )r   c                     || _         d S r   )	arrow_col)r+   r   s     r   rN   z_ArrowListWrapper.__init__  s    "r    c                 @    | j         |                                         S r   )r   r   )r+   r*   s     r   __getitem__z_ArrowListWrapper.__getitem__  s    ~a &&(((r    c                 *    t          | j                  S r   )rW   r   r   s    r   __len__z_ArrowListWrapper.__len__  s    4>"""r    N)r   r   r   rN   r   r   r   r    r   r   r     sA        # # #) ) )# # # # #r    r   c                 $   t          |           dk    rd S | |         d         | |         t          |           dz
           f}t          | t          j                  r4|d                                         |d                                         f}|S )Nr   r!   )rW   r1   r   r   r   )ra   r   r   s      r   r9   r9   !  sv    
5zzQt	sAc
3u::>23A%"" )qTZZ\\1Q4::<<(Hr    )(r   loggingrX   r4   collectionsr   typingr   r   r   r   numpyr   r;   2ray.data._internal.execution.interfaces.ref_bundler   ray.data._internal.remote_fnr	   ray.data.blockr
   ray.data.contextr   	ray.typesr   ray.util.annotationsr   pyarrowr   ImportErrorray.data.datasetr   	getLoggerr   r6   r   r   r$   r   r   r9   r   r    r   <module>r      s4       # # # # # # 5 5 5 5 5 5 5 5 5 5 5 5     



      : 9 9 9 9 9 ( ( ( ( ( ( ( ( ( ( ( (       * * * * * *   	BBB  )((((((		8	$	$ Wl l l l l l l l^ Q< < < < < < < <~  # # # # # # # #    s   A AA