
    Pi./                        d dl mZ d dlmZmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d d	lmZ  e	d
e          Z	 d7dee         dedee         dee         fdZde
ee f         ddfdZ!de de ded         deee ef                  fdZ"dee         defdZ#e	 	 	 	 	 	 d8de ded ee         d!e$d"ed#ed$ed%         d&e$d'ee ef         defd(            Z%e	 	 d9d*ee ef         d+ee e&f         d,e d"edef
d-            Z'e	 	 	 	 	 	 d:d/ed0ed1ed2eeegef                  d3e$d#ed$ed%         d4ee         d5e$defd6            Z(dS );    )Path)AnyCallableDictListLiteralOptionalTypeVarUnion)request)load_dataset)split_dataset_by_node)default_collateDistributedSampler)DatasetTypeLoaderrequires_torchdata)	Transform)get_world_size_and_rankT)boundNtokensmax_seq_leneos_idreturnc                 @    | d|         }||d         |k    r||d<   |S )a  
    Truncate a list of tokens to a maximum length. If eos_id is provided, the last
    token will be replaced with eos_id.

    Args:
        tokens (List[Any]): list of tokens to truncate
        max_seq_len (int): maximum length of the list
        eos_id (Optional[Any]): token to replace the last token with. If None, the
            last token will not be replaced. Default is None.

    Returns:
        List[Any]: truncated list of tokens
    N )r   r   r   tokens_truncateds       i/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/torchtune/data/_utils.pytruncater!      s:    $ l{l+.r2f<<%    	image_loczPIL.Image.Imagec                 T   ddl m} t          | t                    rP|                     d          r;	 t          j        |           } n%# t          $ r}t          d|            |d}~ww xY w	 |	                    |           }n%# t          $ r}t          d|            |d}~ww xY w|S )a  
    Convenience method to load an image in PIL format from a local file path or remote source.

    Args:
        image_loc (Union[Path, str]): Local file path or remote source pointing to the image
            which will be loaded in PIL format.

    Note:
        If loading an image from a remote source, the function expects the URL provided in ``image_loc``
        to start with "http" or "https" e.g. "https://www.wikipedia.org/en/bird.jpg".

    Raises:
        ValueError:
            If the image cannot be loaded from remote source, **or**
            if the image cannot be opened as a :class:`~PIL.Image.Image`.

    Examples:
        >>> # Load from remote source
        >>> image = load_image("https://www.wikipedia.org/en/bird.jpg")

        >>> # Load from local file path
        >>> image = load_image(Path("/home/user/bird.jpg"))

    Returns:
        PIL.Image.Image: The loaded image.
    r   )ImagehttpzFailed to load image from Nz'Failed to open image as PIL Image from )
PILr%   
isinstancestr
startswithr   urlopen	Exception
ValueErroropen)r#   r%   eimages       r    
load_imager1   /   s    :  )S!! Ni&:&:6&B&B N	N	22II 	N 	N 	NE)EEFFAM	NW

9%% W W WN9NNOOUVVW Ls/   A 
A)A$$A)-B 
B%B  B%content	image_tagimagesc                   |                      |          }t          |          |k    r%t          dt          |           d| d|            |                     |          }g }t	          |          D ]q\  }}t          |          dk    r|                    d|d           |t          |          dz
  k     r+|                    d|                    d          d           r|S )	a  
    Given a raw text string, split by the specified ``image_tag``
    and form into list of dictionaries to be used in the :class:`~torchtune.data.Message` content
    field::

        [
            {
                "role": "system" | "user" | "assistant",
                "content":
                    [
                        {"type": "image", "content": <PIL.Image.Image>},
                        {"type": "text", "content": "This is a sample image."},
                    ],
            },
            ...
        ]

    Args:
        content (str): raw message text
        image_tag (str): string to split the text by
        images (List["PIL.Image.Image"]): list of images to be used in the content

    Raises:
        ValueError: If the number of images does not match the number of image tags in the content

    Examples:
        >>> content = format_content_with_images(
        ...     "<|image|>hello <|image|>world",
        ...     image_tag="<|image|>",
        ...     images=[<PIL.Image.Image>, <PIL.Image.Image>]
        ... )
        >>> print(content)
        [
            {"type": "image", "content": <PIL.Image.Image>},
            {"type": "text", "content": "hello "},
            {"type": "image", "content": <PIL.Image.Image>},
            {"type": "text", "content": "world"}
        ]

    Returns:
        List[Dict[str, Any]]: list of dictionaries to be used in the :class:`~torchtune.data.Message` content field
    zNumber of images (z') does not match number of image tags (z) in content: r   text)typer2      r0   )countlenr-   split	enumerateappendpop)r2   r3   r4   num_image_tags_in_contentsplit_contentfinal_content_listisubstrs           r    format_content_with_imagesrD   ^   s   Z !(i 8 8
6{{///CV C C)C C9@C C
 
 	

 MM),,M}-- S S	6v;;??%%v&&I&IJJJs=!!A%%%%%w6::a==&Q&QRRRr"   funcsc                        fd}|S )z
    Chain a list of functions together into a single function.

    Args:
        *funcs (List[Callable]): list of functions to chain together

    Returns:
        Callable: chained function
    c                 (    D ]} ||           } | S Nr   )xfnrE   s     r    
chained_fnzchain.<locals>.chained_fn   s&     	 	B1AAr"   r   )rE   rK   s   ` r    chainrL      s$        
 r"   TthreadFsource	transform	filter_fnshuffleseednum_workersparallel_method)processrM   	streamingload_dataset_kwargsc                    ddl m}	m}
m} d|v r)d|vsJ d|            |                    d          |d<   t          | fi |}||                    |          }t                      \  }}|r6t          |||          }|r|	                    |          } |	|          }n4t          |||||	          } ||          }t          |j        |          } |
||||
          }|S )a  
    Load a HuggingFace dataset (Map or Streaming) and apply a Transform to it.

    Args:
        source (str): HuggingFace dataset source.
        transform (Transform): Transform to apply to the samples of the dataset.
        filter_fn (Optional[Callable]): Filter function to pass to HuggingFace dataset.
        shuffle (bool): Whether to shuffle the dataset. Default is True. For streaming datasets, this is passed to
            HuggingFace dataset as .shuffle(). For map datasets, a DistributedSampler is used.
        seed (int): Seed for the random number generator in the case of Map style dataset shuffling. Default is 0.
        num_workers (int): Number of workers to use for loading the dataset. Default is 0 (no parallelism). Setting this
            greater than 0 will create `parallel_method` workers to perform transforms to the dataset.
        parallel_method (Literal["process", "thread"]): Method to use for parallelism. Default is "thread". No effect if
            num_workers is 0.
        streaming (bool): whether to load a streaming vs map-style dataset. Default False.
        **load_dataset_kwargs (Dict[str, Any]): Additional Keyword arguments to pass to HuggingFace dataset. See Hugging Face's
            documentation.

    Returns:
        A ``torchdata.nodes`` iterator that can be passed directly to a Loader, or combined with other-datasets in a multi-dataset
        sampler.
    r   )IterableWrapperParallelMapperSamplerWrappersubsetnamezTfound both 'subset' and 'name' found, you may only specify one, load_dataset_kwargs=N)rank
world_size)rR   )num_replicasr^   rQ   rR   map_fnrS   method)torchdata.nodesrY   rZ   r[   r>   r   filterr   r   rQ   r   rL   __getitem__)rN   rO   rP   rQ   rR   rS   rT   rV   rW   rY   rZ   r[   datasetr_   r^   nodesamplers                    r    load_hf_datasetrj      s`   D POOOOOOOOO&&&----dNadd .--&9&=&=h&G&GF#699%899G..++.00J :'dzRRR 	1oo4o00Gw''$#
 
 
 ~g&&'-y99	>YK  D Kr"   !CYCLE_UNTIL_ALL_DATASETS_EXHASTEDdatasetsweightsstop_criteriac                 ,    ddl m}  || |||          S )a  
    Given a dictionary of datasets and their corresponding weights, return a dataset that
    samples from the given datasets according to the specified weights.

    Args:
        datasets (Dict[str, DatasetType]): dictionary of datasets
        weights (Dict[str, float]): dictionary of weights for each dataset. If not
        stop_criteria (str): stop criteria for the sampler. Default "CYCLE_UNTIL_ALL_DATASETS_EXHASTED".
            See also: torchdata.nodes.StopCriteria
        seed (int): seed for the random number generator. Default 0.

    Returns:
        A ``torchdata.nodes`` iterator which can be passed to Loader, or further composed with other Nodes.
    r   )MultiNodeWeightedSampler)source_nodesrm   rn   rR   )rd   rp   )rl   rm   rn   rR   rp   s        r    get_multi_datasetrr      s<    * 988888###	   r"      rg   model_transform
batch_size
collate_fn	drop_lastprefetch_factor
pin_memoryc	                     ddl m}	m}
m}m} |t
          } |
| |||          } |	|||          } |
||||          }|r ||          }| |||          }t          |          S )ax  
    This will configure TorchData Nodes to approximate torch.utils.data.DataLoader.
    Given a dataset, apply model_transform (eg tokenization), batching, collation,
    memory pinning, and pre-fetching.

    Args:
        dataset (DatasetType): dataset to load. May be a MultiNodeWeightedSampler
        model_transform (Transform): model transform to apply to the samples of the dataset
        batch_size (int): batch size
        collate_fn (Optional[Callable[[Any], Any]]): collate function to apply to the samples of the dataset. If None, use
            torch.utils.data.default_collate. Default None.
        drop_last (bool): whether to drop the last batch. Default is True.
        num_workers (int): number of workers to use for loading the dataset. Default is 0 (no parallelism
        parallel_method (Literal["process", "thread"]): method to use for parallelism. Default is "thread".
        prefetch_factor (Optional[int]): number of batches to prefetch. Default is 4.
        pin_memory (bool): whether to pin memory. Default is False.

    Returns:
        A ``torchdata.nodes`` Loader, an Iterable that returns batches.
    r   )BatcherrZ   	PinMemory
PrefetcherNra   )rw   )rd   r{   rZ   r|   r}   r   r   )rg   rt   ru   rv   rw   rS   rT   rx   ry   r{   rZ   r|   r}   rh   s                 r    get_dataloaderr~     s    B ONNNNNNNNNNN$
>[  D 74y999D>Z[  D  y"z$00$<<r"   rH   )NTr   r   rM   F)rk   r   )NTr   rM   rs   F))pathlibr   typingr   r   r   r   r   r	   r
   r   urllibr   rl   r   datasets.distributedr   torch.utils.datar   r   torchtune.data._torchdatar   r   r   torchtune.modules.transformsr   torchtune.utilsr   r7   r   intr!   r)   r1   rD   rL   boolrj   floatrr   r~   r   r"   r    <module>r      s         O O O O O O O O O O O O O O O O O O O O       ! ! ! ! ! ! 6 6 6 6 6 6 @ @ @ @ @ @ @ @ M M M M M M M M M M 2 2 2 2 2 2 3 3 3 3 3 3GCt ! I SM 
#Y	   0,%c	* ,/@ , , , ,^<< #<-12C-D<	$sCx.< < < <~$x. X    &  %)4<C CCC !C 	C
 C C 01C C  S>C C C C CL  =	 3#$#u*  	
    < 
 264<%&1 111 1 3%*-.	1
 1 1 011 c]1 1 1 1 1 1 1 1r"   