
    &`i8W                     ,   d Z ddlZddlZddlmZmZ ddlmZmZm	Z	m
Z
mZmZmZmZmZ ddlZerddlmZmZ ddlmZmZ ddlmZ ddlmZmZ dd	lmZ dd
lm Z m!Z!  ej"        e#          Z$e G d d                      Z%de	e&ef         dee%         ddfdZ'dee&         dee%         de	e&ef         fdZ(dee&         dee%         de	e&ef         fdZ)dddddee*ed         f         dee*ed         f         dee*e*f         f
dZ+ G d de           Z,dS )a  Kafka datasource for bounded data reads.

This module provides a Kafka datasource implementation for Ray Data that supports
bounded reads with offset-based range queries.

Message keys and values are returned as raw bytes to support any serialization format
(JSON, Avro, Protobuf, etc.). Users can decode them using map operations.

Requires:
    - kafka-python: https://kafka-python.readthedocs.io/
    N)	dataclassfields)	TYPE_CHECKINGAnyDictIterableListLiteralOptionalTupleUnionKafkaConsumerTopicPartition)BlockOutputBufferOutputBlockSizeOption)_check_import)BlockBlockMetadata)DataContext)
DatasourceReadTaskc                      e Zd ZU dZdZee         ed<   dZee         ed<   dZ	ee         ed<   dZ
ee         ed<   dZee         ed<   dZee         ed<   dZee         ed	<   dZee         ed
<   dZee         ed<   dZee         ed<   dZee         ed<   dZee         ed<   dZee         ed<   dZee         ed<   dZee         ed<   dZee         ed<   dS )KafkaAuthConfiga`  Authentication configuration for Kafka connections.

    Uses standard kafka-python parameter names. See kafka-python documentation
    for full details: https://kafka-python.readthedocs.io/

    security_protocol: Protocol used to communicate with brokers.
        Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
        Default: PLAINTEXT.
    sasl_mechanism: Authentication mechanism when security_protocol
        is configured for SASL_PLAINTEXT or SASL_SSL. Valid values are:
        PLAIN, GSSAPI, OAUTHBEARER, SCRAM-SHA-256, SCRAM-SHA-512.
    sasl_plain_username: username for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_plain_password: password for sasl PLAIN and SCRAM authentication.
        Required if sasl_mechanism is PLAIN or one of the SCRAM mechanisms.
    sasl_kerberos_name: Constructed gssapi.Name for use with
        sasl mechanism handshake. If provided, sasl_kerberos_service_name and
        sasl_kerberos_domain name are ignored. Default: None.
    sasl_kerberos_service_name: Service name to include in GSSAPI
        sasl mechanism handshake. Default: 'kafka'
    sasl_kerberos_domain_name: kerberos domain name to use in GSSAPI
        sasl mechanism handshake. Default: one of bootstrap servers
    sasl_oauth_token_provider: OAuthBearer
        token provider instance. Default: None
    ssl_context: Pre-configured SSLContext for wrapping
        socket connections. If provided, all other ssl_* configurations
        will be ignored. Default: None.
    ssl_check_hostname: Flag to configure whether ssl handshake
        should verify that the certificate matches the brokers hostname.
        Default: True.
    ssl_cafile: Optional filename of ca file to use in certificate
        verification. Default: None.
    ssl_certfile: Optional filename of file in pem format containing
        the client certificate, as well as any ca certificates needed to
        establish the certificate's authenticity. Default: None.
    ssl_keyfile: Optional filename containing the client private key.
        Default: None.
    ssl_password: Optional password to be used when loading the
        certificate chain. Default: None.
    ssl_crlfile: Optional filename containing the CRL to check for
        certificate expiration. By default, no CRL check is done. When
        providing a file, only the leaf certificate will be checked against
        this CRL. The CRL can only be checked with Python 3.4+ or 2.7.9+.
        Default: None.
    ssl_ciphers: optionally set the available ciphers for ssl
        connections. It should be a string in the OpenSSL cipher list
        format. If no cipher can be selected (because compile-time options
        or other configuration forbids use of all the specified ciphers),
        an ssl.SSLError will be raised. See ssl.SSLContext.set_ciphers
    Nsecurity_protocolsasl_mechanismsasl_plain_usernamesasl_plain_passwordsasl_kerberos_namesasl_kerberos_service_namesasl_kerberos_domain_namesasl_oauth_token_providerssl_contextssl_check_hostname
ssl_cafilessl_certfilessl_keyfilessl_passwordssl_ciphersssl_crlfile)__name__
__module____qualname____doc__r   r   str__annotations__r   r   r   r   r    r!   r"   r   r#   r$   boolr%   r&   r'   r(   r)   r*        /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/kafka_datasource.pyr   r   *   sk        1 1h (,x}+++ %)NHSM((()-#---)-#---(,,,,04444/3x}333/3x}333 "&K#%%%)---- $J$$$"&L(3-&&&!%K#%%%"&L(3-&&&!%K#%%%!%K#%%%%%r3   r   configkafka_auth_configreturnc                 t    |r3t          |          D ]%}t          ||j                  }|
|| |j        <   $dS dS )zAdd authentication configuration to consumer config in-place.

    Args:
        config: Consumer config dict to modify.
        kafka_auth_config: Authentication configuration.
    N)r   getattrname)r5   r6   fieldvalues       r4   _add_authentication_to_configr=   v   s]      +-.. 	+ 	+E-uz::E %*uz"+ +	+ 	+r3   bootstrap_serversc                 2    | ddd}t          ||           |S )zBuild minimal consumer config for partition discovery.

    Args:
        bootstrap_servers: List of Kafka broker addresses.
        kafka_auth_config: Authentication configuration.

    Returns:
        Consumer configuration dict for discovery.
    F  )r>   enable_auto_commitconsumer_timeout_msr=   r>   r6   r5   s      r4   $_build_consumer_config_for_discoveryrE      s1     /## F
 "&*;<<<Mr3   c                 8    | dd d d}t          ||           |S )zBuild full consumer config for reading messages.

    Args:
        bootstrap_servers: List of Kafka broker addresses.
        kafka_auth_config: Authentication configuration.

    Returns:
        Consumer configuration dict for reading.
    Fc                     | S Nr2   )vs    r4   <lambda>z1_build_consumer_config_for_read.<locals>.<lambda>   s     r3   c                     | S rH   r2   )ks    r4   rJ   z1_build_consumer_config_for_read.<locals>.<lambda>   s    a r3   )r>   rA   value_deserializerkey_deserializerrC   rD   s      r4   _build_consumer_config_for_readrO      s8     /#)k'K	 F "&*;<<<Mr3   consumerr   topic_partitionr   start_offsetearliest
end_offsetlatestc           
      D   |                      |g          |         }|                     |g          |         }|}|}|dk    s||}|dk    s||}||k    rG||k    r| n| d| d}||k    r| n| d| d}	t          d| d|	 d|j         d	|j                   ||fS )
aj  Resolve start and end offsets to actual integer offsets.

    Args:
        consumer: Kafka consumer instance.
        topic_partition: TopicPartition to resolve offsets for.
        start_offset: Start offset (int or "earliest").
        end_offset: End offset (int or "latest").

    Returns:
        Tuple of (resolved_start_offset, resolved_end_offset).
    rS   NrU   z (resolved to )zstart_offset (z) > end_offset (z) for partition z
 in topic )beginning_offsetsend_offsets
ValueError	partitiontopic)
rP   rQ   rR   rT   earliest_offsetlatest_offsetoriginal_startoriginal_end	start_strend_strs
             r4   _resolve_offsetsrc      sD   " 00/1BCCOTO((/):;;OLM "NLz!!\%9&X!3"
j   -- "AA,AAA 	 z))  ==
=== 	
 ZY Z Z Z Z,6Z ZBQBWZ Z
 
 	
 ##r3   c                      e Zd ZdZdZ	 	 	 	 ddeeee         f         deeee         f         d	eee	d         f         d
eee	d         f         de
e         defdZde
e         fdZ	 ddede
e         dee         fdZdS )KafkaDatasourcezBKafka datasource for reading from Kafka topics with bounded reads.r@   rS   rU   N'  topicsr>   rR   rT   r6   
timeout_msc                    t          | dd           |st          d          |st          d          |dk    rt          d          t          |t                    r*t          |t                    r||k    rt          d          t          |t                    r|d	k    rt          d
          t          |t                    r|dk    rt          d          t          |t                    r|rd|vrt          d| d          nWt          |t
                    rB|st          d          |D ].}t          |t                    rd|vrt          d| d          /t          |t
                    r|n|g| _        t          |t
                    r|n|g| _        || _        || _	        || _
        || _        t          j                    j        | _        dS )a  Initialize Kafka datasource.

        Args:
            topics: Kafka topic name(s) to read from.
            bootstrap_servers: Kafka broker addresses (string or list of strings).
            start_offset: Starting position. Can be:
                - int: Offset number
                - str: "earliest"
            end_offset: Ending position. Can be:
                - int: Offset number
                - str: "latest"
            kafka_auth_config: Authentication configuration. See KafkaAuthConfig for details.
            timeout_ms: Timeout in milliseconds for every read task to poll until reaching end_offset (default 10000ms).
                If the read task does not reach end_offset within the timeout, it will stop polling and return the messages
                it has read so far.

        Raises:
            ValueError: If required configuration is missing.
            ImportError: If kafka-python is not installed.
        kafkazkafka-python)modulepackageztopics cannot be emptyz!bootstrap_servers cannot be emptyr   ztimeout_ms must be positivez)start_offset must be less than end_offsetrU   zstart_offset cannot be 'latest'rS   zend_offset cannot be 'earliest':z"Invalid bootstrap_servers format: z6. Expected 'host:port' or list of 'host:port' strings.z&bootstrap_servers cannot be empty listz. Expected 'host:port' string.N)r   rZ   
isinstanceintr/   list_topics_bootstrap_servers_start_offset_end_offset_kafka_auth_config_timeout_msr   get_currenttarget_max_block_size_target_max_block_size)selfrg   r>   rR   rT   r6   rh   servers           r4   __init__zKafkaDatasource.__init__   s[   : 	d7NCCCC 	75666  	B@AAA??:;;;lC(( 	NZ
C-H-H 	Nj(( !LMMMlC(( 	@\X-E-E>???j#&& 	@:+C+C>??? '-- 	$ 3D(D(D K9J K K K   )E
 )400 	$ K !IJJJ+  !&#.. #V2C2C$7V 7 7 7   3D ",FD!9!9Gvvx +T22%#$ 	
 *%"3%&1&=&?&?&U###r3   r7   c                     dS )zBReturn an estimate of the in-memory data size, or None if unknown.Nr2   )rz   s    r4   estimate_inmemory_data_sizez+KafkaDatasource.estimate_inmemory_data_size6  s    tr3   parallelismper_task_row_limitc                 ^   ddl m} t          | j        | j                  }g }d}	  |di |}| j        D ]H}|                    |          }|st          d| d          |D ]}	|                    ||	f           I	 |r|	                                 n# |r|	                                 w w xY w| j        }
| j
        }| j        }| j        }| j        }| j        }g }t          j        dt          j                    fdt          j                    fdt          j                    fd	t          j                    fd
t          j                    fdt          j                    fdt          j                    fdt          j        t          j                    t          j                              fg          }|D ]\  }}|||
|||||fdt*          dt,          dt.          t*                   dt0          t2          t,          t4          d         f                  dt0          t2          t,          t4          d         f                  dt0          t6                   dt,          dt,          fd}t9          ddd| d| gd          } |||          }t;          ||||          }|                    |           |S )aw  Create read tasks for Kafka partitions.

        Creates one read task per partition.
        Each task reads from a single partition of a single topic.

        Args:
            parallelism: This argument is deprecated.
            per_task_row_limit: Maximum number of rows per read task.

        Returns:
            List of ReadTask objects, one per partition.
        r   )r   NzTopic z# has no partitions or doesn't existoffsetkeyr<   r\   r[   	timestamptimestamp_typeheaders
topic_namepartition_idr>   rR   rS   rT   rU   r6   rh   rx   c           	      T     dt           t                   f fd}|S )a  Create a Kafka read function with captured variables.

                This factory function captures configuration variables as default arguments
                to avoid serialization issues when the read function is executed remotely
                by Ray. Using default arguments ensures all needed config is available
                in the remote task without requiring 'self' to be serialized.
                r7   c               3   p  K   ddl m} m} t                    } | di |}	  |          }|                    |g           t          ||          \  }}|                    ||           g }t          t          j	                            }d}	t          j
                    }
dz  }|	st          j
                    |
z
  }||k    r9t                              d d d d	| d
t          |           d           ny|                    |          }||k    rn\t          ||z
  dz            }|                    t#          |d                    }|s|                    |g           }|D ]}||j        |k    rd}	 n|j        rt+          |j                  ni }|                    |j        |j        |j        |j        |j        |j        |j        |d           t          |          t:          j        k    rtt>          j         !                    |          }|"                    |           |#                                r*|$                                V  |#                                *g }|	|r4t>          j         !                    |          }|"                    |           |%                                 |#                                r*|$                                V  |#                                *|&                                 dS # |&                                 w xY w)aT  Read function for a single Kafka partition using kafka-python.

                    This function runs remotely in a Ray task. It creates a KafkaConsumer,
                    reads messages from a single assigned partition, and yields PyArrow tables
                    incrementally for efficient streaming processing.
                    r   r   )rx   Fg     @@z Kafka read task timed out after zms while reading partition z
 of topic z; end_offset z was not reached. Returning z- messages collected in this read task so far.r@   rf   )rh   NT)r   r   r<   r\   r[   r   r   r   r2   )'rj   r   r   rO   assignrc   seekr   r   oftimeloggerwarninglenpositionro   pollmingetr   r   dictappendr   r<   r\   r[   r   r   re   BATCH_SIZE_FOR_YIELDpaTablefrom_pylist	add_blockhas_nextnextfinalizeclose)r   r   consumer_configrP   rQ   	start_offend_offrecordsoutput_bufferpartition_done
start_timetimeout_secondselapsed_timecurrent_positionremaining_timeout_ms	msg_batchmessagesmsgheaders_dicttabler>   rT   r6   r   rR   rx   rh   r   s                       r4   kafka_read_fnzSKafkaDatasource.get_read_tasks.<locals>.create_kafka_read_fn.<locals>.kafka_read_fn  s      DCCCCCCC 'F)+<' 'O
  -}????Ha)*8.\*R*R (9:::-=$o|Z. .*	7 !oyAAA"$(9146K  ) ) */%)Y[[
*4v*="0 <1+/9;;+CL+>> &%Sz %S %Snz %S %S  GQ %S %S29%S %SWZ[bWcWc%S %S %S!" !" !" !& 08/@/@/Q/Q,/7:: % 47!0<!?4 G4 40
 )1+./CU+K+K )6 ) )I $- ) ('0}}_b'I'IH'/ 1 1 $+#63:;P;P59N$)E EHK/WtCK/@/@/@UW '25*/2w141458]58]:=:L3?	%& 	%&!" !" !" $'w<<?3W#W#W,.H,@,@,I,IE$1$;$;E$B$B$B*7*@*@*B*B %C.;.@.@.B.B(B(B(B +8*@*@*B*B %C.0Gy #1 <1~ # ;$&H$8$8$A$AE)33E:::%..000+4466 7"/"4"4"6"6666 ,4466 7
 !(((((((((s   K$L L5)r   r   )	r   r   r>   rR   rT   r6   rh   rx   r   s	   ```````` r4   create_kafka_read_fnz<KafkaDatasource.get_read_tasks.<locals>.create_kafka_read_fn{  sm    $q)x q) q) q) q) q) q) q) q) q) q) q) q) q)f %$r3   zkafka:///)num_rows
size_bytesinput_files
exec_stats)read_fnmetadataschemar   r2   )rj   r   rE   rr   ru   rq   partitions_for_topicrZ   r   r   rs   rt   rv   ry   r   r   int64binarystringint32map_r/   ro   r	   r   r   r
   r   r   r   )rz   r   r   r   r   topic_partitionsdiscovery_consumerr\   
partitionsr[   r>   rR   rT   rh   r6   rx   tasksr   r   r   r   r   r   tasks                           r4   get_read_taskszKafkaDatasource.get_read_tasks:  sV   $ 	(''''' ?#T%<
 

 !	+!.!A!A!A!A @ @/DDUKK
! $KKKK   ", @ @I$++UI,>????@@ " +"((*** " +"((****+ !3)%
%
 3 $ ;28::&	$")++&")++&bhjj)bhjj)!28::.BGBIKK==>	
 
 )9 Y	 Y	$J #-$0/@JVFP?P",-BE% E%E%!E% $(9E% 'uS'*2E-E'FG	E%
 %U30A+A%BCE% $,O#<E%  E% (+E% E% E% E%P %C
CC\CCD	  H 10\JJM%!#5	  D LLs   AB B/)rS   rU   Nrf   rH   )r+   r,   r-   r.   r   r   r/   r	   ro   r
   r   r   r|   r~   r   r   r2   r3   r4   re   re      sA       LL   9C4<7;LV LVc49n%LV !d3i0LV C!445	LV
 #wx001LV $O4LV LV LV LV LV\Xc]    
 EIZ ZZ4<SMZ	hZ Z Z Z Z Zr3   re   )-r.   loggingr   dataclassesr   r   typingr   r   r   r   r	   r
   r   r   r   pyarrowr   rj   r   r    ray.data._internal.output_bufferr   r   ray.data._internal.utilr   ray.data.blockr   r   ray.data.contextr   ray.data.datasourcer   r   	getLoggerr+   r   r   r/   r=   rE   rO   ro   rc   re   r2   r3   r4   <module>r      s  
 
   ) ) ) ) ) ) ) )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
     433333333 U U U U U U U U 1 1 1 1 1 1 / / / / / / / / ( ( ( ( ( ( 4 4 4 4 4 4 4 4		8	$	$ H& H& H& H& H& H& H& H&V+cN+/7/H+	+ + + +"Cy5=o5N	#s(^   *Cy0 
#s(^   .,$,$%,$ WZ001,$ c78,,-	,$
 38_,$ ,$ ,$ ,$^r r r r rj r r r r rr3   