
    &`i#$              	          d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZ erd dlZd dlZd dlZd dlmZ d dlmZmZ  edd	
          Zeed         ed         f         Zedeee         eedf         e	edf         e	eef         f         Zdede fdZ!dede fdZ"dede fdZ#dede fdZ$dede fdZ%edede fd            Z&eded         eedf         f         Z'e G d dee                               Z(e G d de(d                               Z)e G d de(eeej*        f                                        Z+e G d de(d                               Z,e G d  d!e)                      Z-dS )"    N)ThreadPoolExecutor)
TYPE_CHECKINGAnyDictGenericListMappingOptionalTupleTypeVarUnion)env_integer)	DataBatch)DeveloperAPI)CollatedDataTorchDeviceTypeDataBatchTyper   )boundtorch.Tensor)r   ..batchreturnc                 4    ddl }t          | |j                  S )z*Check if a batch is a single torch.Tensor.r   N)torch
isinstanceTensor)r   r   s     g/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/collate_fn.py
_is_tensorr   4   s    LLLeU\***    c                 l    t          | t          t          f          ot          d | D                       S )aD  Check if a batch is a sequence of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence(torch.ones(1))
    False
    >>> _is_tensor_sequence([torch.ones(1), torch.ones(1)])
    True
    >>> _is_tensor_sequence((torch.ones(1), torch.ones(1)))
    True
    >>> _is_tensor_sequence([torch.ones(1), 1])
    False
    c              3   4   K   | ]}t          |          V  d S Nr   .0ts     r   	<genexpr>z&_is_tensor_sequence.<locals>.<genexpr>H   s(      3Q3QaJqMM3Q3Q3Q3Q3Q3Qr   r   listtupleallr   s    r   _is_tensor_sequencer,   ;   s4     edE]++Q3Q3Q53Q3Q3Q0Q0QQr   c                 l    t          | t          t          f          ot          d | D                       S )a=  Check if a batch is a sequence of sequences of torch.Tensors.

    Stops at one level of nesting.

    >>> import torch
    >>> _is_nested_tensor_sequence([torch.ones(1), torch.ones(1)])
    False
    >>> _is_nested_tensor_sequence(
    ...    ([torch.ones(1), torch.ones(1)], [torch.ones(1)])
    ... )
    True
    c              3   4   K   | ]}t          |          V  d S r!   r,   r#   s     r   r&   z-_is_nested_tensor_sequence.<locals>.<genexpr>X   s<       4 4#$A4 4 4 4 4 4r   r'   r+   s    r   _is_nested_tensor_sequencer0   K   sG     edE]++  4 4(-4 4 4 1 1 r   c                     t          | t                    o*t          d |                                 D                       S )a   Check if a batch is a mapping of keys to torch.Tensors.

    >>> import torch
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    True
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": [torch.ones(1), torch.ones(1)]})
    False
    c              3   4   K   | ]}t          |          V  d S r!   r"   r$   vs     r   r&   z%_is_tensor_mapping.<locals>.<genexpr>f   s(      -T-Tjmm-T-T-T-T-T-Tr   r   r	   r*   valuesr+   s    r   _is_tensor_mappingr7   ]   s8     eW%%T#-T-TU\\^^-T-T-T*T*TTr   c                     t          | t                    o*t          d |                                 D                       S )aE  Check if a batch is a mapping of keys to sequences of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    False
    >>> _is_tensor_sequence_mapping(
    ...    {"a": (torch.ones(1), torch.ones(1)), "b": [torch.ones(1), torch.ones(1)]}
    ... )
    True
    c              3   4   K   | ]}t          |          V  d S r!   r/   r3   s     r   r&   z._is_tensor_sequence_mapping.<locals>.<genexpr>t   s<       . .#$A. . . . . .r   r5   r+   s    r   _is_tensor_sequence_mappingr:   i   sK     eW%% # . .(-. . . + + r   c                     t          |           p;t          |           p,t          |           pt          |           pt	          |           S )a"  Check if a batch matches any of the TensorBatchType variants.

    This function checks if the input batch is one of the following types:
    1. A single torch.Tensor
    2. A sequence of torch.Tensors
    3. A sequence of sequences of torch.Tensors
    4. A mapping (e.g., dict) of keys to torch.Tensors
    5. A mapping (e.g., dict) of keys to sequences of torch.Tensors

    Args:
        batch: The input batch to check. Can be any type.

    Returns:
        bool: True if the batch matches any TensorBatchType variant, False otherwise.
    )r   r,   r0   r7   r:   r+   s    r   is_tensor_batch_typer<   y   sX    $ 	5 	.u%%	.%e,,	. e$$	. 'u--r   c                   <    e Zd ZdZej        deddfd            ZdS )	CollateFnzAbstract interface for collate_fn for `iter_torch_batches`. See doc-string of
    `collate_fn` in `iter_torch_batches` API for more details.
    r   r   r   c                     dS )zConvert a batch of data to collated format.

        Args:
            batch: The input batch to collate.

        Returns:
            The collated data in the format expected by the model.
        N selfr   s     r   __call__zCollateFn.__call__   s	     	r   N)__name__
__module____qualname____doc__abcabstractmethodr   rC   r@   r   r   r>   r>      sP          		m 	 	 	 	 	 	 	r   r>   c                       e Zd ZdZddZdS )	ArrowBatchCollateFna  Collate function that takes pyarrow.Table as the input batch type.
    Arrow tables with chunked arrays can be efficiently transferred to GPUs without
    combining the chunks with the `arrow_batch_to_tensors` utility function.
    See `DefaultCollateFn` for example.
    r   pyarrow.Tabler   r   c                     dS )zConvert a batch of pyarrow.Table to collated format.

        Args:
            batch: The input pyarrow.Table batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr@   rA   s     r   rC   zArrowBatchCollateFn.__call__   	     	r   N)r   rL   r   r   rD   rE   rF   rG   rC   r@   r   r   rK   rK      s2         	 	 	 	 	 	r   rK   rL   c                   <    e Zd ZdZdeeej        f         ddfdZdS )NumpyBatchCollateFnzQCollate function that takes a dictionary of numpy arrays as the input batch type.r   r   r   c                     dS )zConvert a batch of numpy arrays to collated format.

        Args:
            batch: The input dictionary of numpy arrays batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr@   rA   s     r   rC   zNumpyBatchCollateFn.__call__   rN   r   N)	rD   rE   rF   rG   r   strnpndarrayrC   r@   r   r   rQ   rQ      sG        [[	d3
?3 	 	 	 	 	 	 	r   rQ   c                       e Zd ZdZddZdS )	PandasBatchCollateFnzGCollate function that takes a pandas.DataFrame as the input batch type.r   pandas.DataFramer   r   c                     dS )zConvert a batch of pandas.DataFrame to collated format.

        Args:
            batch: The input pandas.DataFrame batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr@   rA   s     r   rC   zPandasBatchCollateFn.__call__   rN   r   N)r   rX   r   r   rO   r@   r   r   rW   rW      s.        QQ	 	 	 	 	 	r   rW   rX   c            	            e Zd ZdZ edd          Zdddefdeedee	df         f                  ded	         d
e
def fdZd Zdddeee	df         ee	ed         f         f         fdZ xZS )DefaultCollateFnzIDefault collate function for converting Arrow batches to PyTorch tensors.2RAY_DATA_DEFAULT_COLLATE_FN_THREADPOOL_MAX_WORKERS   NFdtypesztorch.dtypedevicer   
pin_memorynum_workersc                     ddl }t                                                       || _        t	          |t
          t          f          r |j        |          | _        n|| _        || _        || _	        d| _
        dS )aH  Initialize the collate function.

        Args:
            dtypes: The torch dtype(s) for the created tensor(s); if None, the dtype
                will be inferred from the tensor data.
            device: The device on which the tensor should be placed. Can be a string
                (e.g. "cpu", "cuda:0") or a torch.device object.
            pin_memory: Whether to pin the memory of the created tensors.
            num_workers: Number of worker threads for parallel tensor conversion.
                Defaults to `RAY_DATA_DEFAULT_COLLATE_FN_THREADPOOL_MAX_WORKERS`.
        r   N)r   super__init__r^   r   rS   intr_   r`   ra   _threadpool)rB   r^   r_   r`   ra   r   	__class__s         r   rd   zDefaultCollateFn.__init__   sz    $ 	fsCj)) 	!&%,v..DKK DK$&9=r   c                 b    t          | dd          r| j                            d           dS dS )z#Clean up threadpool on destruction.rf   NF)wait)getattrrf   shutdown)rB   s    r   __del__zDefaultCollateFn.__del__  s@    4-- 	2%%5%11111	2 	2r   r   rL   r   r   c                     ddl m} | j        dk    r!| j        t	          | j                  | _        | j        duo| j        j        dk    } ||| j        || j        | j                  S )zConvert an Arrow batch to PyTorch tensors.

        Args:
            batch: PyArrow Table to convert

        Returns:
            Dictionary mapping column names to lists of tensors
        r   )arrow_batch_to_tensorsN)max_workerscpu)r^   combine_chunksr`   
threadpool)	ray.air._internal.torch_utilsrn   ra   rf   r   r_   typer^   r`   )rB   r   rn   rq   s       r   rC   zDefaultCollateFn.__call__  s    	
 	
 	
 	
 	
 	
 aD$4$<1d>NOOOD D0NT[5E5N%%;)'
 
 
 	
r   )rD   rE   rF   rG   r   _DEFAULT_NUM_WORKERSr
   r   r   rS   boolre   rd   rl   r   rC   __classcell__)rg   s   @r   r[   r[      s       SS&;<	  LP.2 /> >}d33E.FFGH> *+> 	>
 > > > > > ><2 2 2

$
	tC'($sD4H/H*II	J
 
 
 
 
 
 
 
r   r[   ).rH   concurrent.futuresr   typingr   r   r   r   r   r	   r
   r   r   r   numpyrT   ray._private.ray_constantsr   ray.data.blockr   ray.util.annotationsr   pandaspyarrowr   ray.data.datasetr   r   r   TensorSequenceTyperS   TensorBatchTyperv   r   r,   r0   r7   r:   r<   TensorBatchReturnTyper>   rK   rU   rQ   rW   r[   r@   r   r   <module>r      s   



 1 1 1 1 1 1                            2 2 2 2 2 2 $ $ $ $ $ $ - - - - - - ?MMMNNNLLL((((((>>>>>>>> {;;;	
  
  			
c
!"C  C##$	&+c +d + + + +Rs Rt R R R R c d    $	Uc 	Ud 	U 	U 	U 	Us t           2 	
n	      &   $     )O4   &     )Dbj$9:        9%78    I
 I
 I
 I
 I
* I
 I
 I
 I
 I
r   