
    &`i&                         d Z ddlZddlZddlmZ ddlmZmZmZm	Z	m
Z
mZmZmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ erddlZdd
lmZmZmZ  ej        e          Ze G d d                      Z e G d de                      Z!dS )zMCAP (Message Capture) datasource for Ray Data.

MCAP is a standardized format for storing timestamped messages from robotics and
autonomous systems, commonly used for sensor data, control commands, and other
time-series data.
    N)	dataclass)TYPE_CHECKINGAnyDictIteratorListOptionalSetUnion)DelegatingBlockBuilder)_check_import)Block)FileBasedDatasource)DeveloperAPI)ChannelMessageSchemac                   .    e Zd ZU dZeed<   eed<   d ZdS )	TimeRangezTime range for filtering MCAP messages.

    Attributes:
        start_time: Start time in nanoseconds (inclusive).
        end_time: End time in nanoseconds (exclusive).
    
start_timeend_timec                     | j         | j        k    r t          d| j          d| j         d          | j         dk     s| j        dk     rt          d| j          d| j                   dS )z)Validate time range after initialization.zstart_time (z) must be less than end_time ()r   z1time values must be non-negative, got start_time=z, end_time=N)r   r   
ValueErrorselfs    /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/mcap_datasource.py__post_init__zTimeRange.__post_init__&   s    ?dm++.t . .!]. . .   ?Q$-!"3"3,DO , , M, ,   #4"3    N)__name__
__module____qualname____doc__int__annotations__r    r   r   r   r      sC           OOOMMM    r   r   c                   d    e Zd ZdZdgZ	 	 	 	 ddeeee         f         deeee         e	e         f                  dee
         deeee         e	e         f                  d	ef
 fd
Zdddedee         fdZdddddddefdZdddddddedeeef         f
dZdefdZedefd            Z xZS )MCAPDatasourcea  MCAP (Message Capture) datasource for Ray Data.

    This datasource provides reading of MCAP files with predicate pushdown
    optimization for filtering by topics, time ranges, and message types.

    MCAP is a standardized format for storing timestamped messages from robotics and
    autonomous systems, commonly used for sensor data, control commands, and other
    time-series data.

    Examples:
        Basic usage:

        >>> import ray  # doctest: +SKIP
        >>> ds = ray.data.read_mcap("/path/to/data.mcap")  # doctest: +SKIP

        With topic filtering and time range:

        >>> from ray.data.datasource import TimeRange  # doctest: +SKIP
        >>> ds = ray.data.read_mcap(  # doctest: +SKIP
        ...     "/path/to/data.mcap",
        ...     topics={"/camera/image_raw", "/lidar/points"},
        ...     time_range=TimeRange(start_time=1000000000, end_time=2000000000)
        ... )  # doctest: +SKIP

        With multiple files and metadata:

        >>> ds = ray.data.read_mcap(  # doctest: +SKIP
        ...     ["file1.mcap", "file2.mcap"],
        ...     topics={"/camera/image_raw", "/lidar/points"},
        ...     message_types={"sensor_msgs/Image", "sensor_msgs/PointCloud2"},
        ...     include_metadata=True
        ... )  # doctest: +SKIP
    mcapNTpathstopics
time_rangemessage_typesinclude_metadatac                      t                      j        |fi | t          | dd           |rt          |          nd| _        |rt          |          nd| _        || _        || _        dS )a  Initialize MCAP datasource.

        Args:
            paths: Path or list of paths to MCAP files.
            topics: Optional list/set of topic names to include. If specified,
                only messages from these topics will be read.
            time_range: Optional TimeRange for filtering messages by timestamp.
                TimeRange contains start_time and end_time in nanoseconds, where
                both values must be non-negative and start_time < end_time.
            message_types: Optional list/set of message type names (schema names)
                to include. Only messages with matching schema names will be read.
            include_metadata: Whether to include MCAP metadata fields in the output.
                Defaults to True. When True, includes schema, channel, and message
                metadata.
            **file_based_datasource_kwargs: Additional arguments for FileBasedDatasource.
        r)   )modulepackageN)super__init__r   set_topics_message_types_time_range_include_metadata)r   r*   r+   r,   r-   r.   file_based_datasource_kwargs	__class__s          r   r3   zMCAPDatasource.__init__Z   s    2 	??">???d66:::: '-6s6{{{$4AKc-000t%!1r   fzpyarrow.NativeFilepathreturnc              #     K   ddl m}  ||          }|                    | j        rt	          | j                  nd| j        r| j        j        nd| j        r| j        j        nddd          }t                      }|D ]K\  }}}	| 	                    |||	          s| 
                    |||	|          }
|                    |
           L|                                dk    r|                                V  dS dS )aX  Read MCAP file and yield blocks of message data.

        This method implements efficient MCAP reading with predicate pushdown.
        It uses MCAP's built-in filtering capabilities for optimal performance
        and applies additional filters when needed.

        Args:
            f: File-like object to read from. Must be seekable for MCAP reading.
            path: Path to the MCAP file being processed.

        Yields:
            Block: Blocks of MCAP message data as pyarrow Tables.

        Raises:
            ValueError: If the MCAP file cannot be read or has invalid format.
        r   )make_readerNTF)r+   r   r   log_time_orderreverse)mcap.readerr?   iter_messagesr5   listr7   r   r   r   _should_include_message_message_to_dictaddnum_rowsbuild)r   r;   r<   r?   readermessagesbuilderschemachannelmessagemessage_datas              r   _read_streamzMCAPDatasource._read_stream}   s9     " 	,+++++Q
 '')-?4%%%46:6FPt'22D262BLT%.. ( 
 
 )**(0 	& 	&$FGW//II   00'4PPLKK%%%% !!--//!!!!! "!r   rM   r   rN   r   rO   r   c                 8    | j         r|r|j        | j         vrdS dS )a_  Check if a message should be included based on filters.

        This method applies Python-level filtering that cannot be pushed down
        to the MCAP library level. Topic filters are already handled by the
        MCAP reader, so only message_types filtering is needed here.

        Args:
            schema: MCAP schema object containing message type information.
            channel: MCAP channel object containing topic and metadata.
            message: MCAP message object containing the actual data.

        Returns:
            True if the message should be included, False otherwise.
        FT)r6   name)r   rM   rN   rO   s       r   rE   z&MCAPDatasource._should_include_message   s/    $  	6 	fkAT.T.T5tr   c                    |j         }|j        dk    rkt          |j         t                    rQ	 t	          j        |j                             d                    }n## t          j        t          f$ r
 |j         }Y nw xY w||j	        |j
        |j        |j        d}| j        r@|                    |j        |j        |r|j        nd|r|j        nd|r|j         ndd           t%          | dd          r||d<   |S )	aV  Convert MCAP message to dictionary format.

        This method converts MCAP message objects into a standardized dictionary
        format suitable for Ray Data processing.

        Args:
            schema: MCAP schema object containing message type and encoding info.
            channel: MCAP channel object containing topic and channel metadata.
            message: MCAP message object containing the actual message data.
            path: Path to the source file (for include_paths functionality).

        Returns:
            Dictionary containing message data in Ray Data format.
        jsonzutf-8)datatopiclog_timepublish_timesequenceN)
channel_idmessage_encodingschema_nameschema_encodingschema_datainclude_pathsFr<   )rV   r\   
isinstancebytesrU   loadsdecodeJSONDecodeErrorUnicodeDecodeErrorrW   rX   rY   rZ   r8   updater[   rS   encodinggetattr)r   rM   rN   rO   r<   decoded_datarP   s          r   rF   zMCAPDatasource._message_to_dict   s2   $ |#v--*W\52Q2Q-,#z',*=*=g*F*FGG(*<= , , ,&|, !](#0(
 
 ! 		")"4(/(@28#B6;;d:@'Jvd28#B6;;d    4%00 	(#'L s   ,A A;:A;c                     dS )z1Return a human-readable name for this datasource.MCAPr&   r   s    r   get_namezMCAPDatasource.get_name   s    vr   c                     dS )zWhether this datasource supports distributed reads.

        MCAP files can be read in parallel across multiple files.
        Tr&   r   s    r   supports_distributed_readsz)MCAPDatasource.supports_distributed_reads   s	     tr   )NNNT)r    r!   r"   r#   _FILE_EXTENSIONSr   strr   r	   r
   r   boolr3   r   r   rQ   rE   r   r   rF   rm   propertyro   __classcell__)r:   s   @r   r(   r(   4   s          D x
 8<*.>B!%!2 !2S$s)^$!2 tCy#c(234!2 Y'	!2
  d3iS&9 :;!2 !2 !2 !2 !2 !2 !2F-"2 -"# -"(5/ -" -" -" -"^)2=F	   .33)23=F3NQ3	c3h3 3 3 3j#     D    X    r   r(   )"r#   rU   loggingdataclassesr   typingr   r   r   r   r   r	   r
   r   +ray.data._internal.delegating_block_builderr   ray.data._internal.utilr   ray.data.blockr   )ray.data.datasource.file_based_datasourcer   ray.util.annotationsr   pyarrowrB   r   r   r   	getLoggerr    loggerr   r(   r&   r   r   <module>r      s      ! ! ! ! ! ! Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q Q N N N N N N 1 1 1 1 1 1             I I I I I I - - - - - - 5NNN4444444444		8	$	$        2 M M M M M( M M M M Mr   