
    &`i]D                        d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZmZ  e j        e          Zd
Z dZ!dej"        de#fdZ$dej%        de#fdZ&e G d d                      Z' ed           G d de                      Z(e G d de                      Z)dS )    N)	dataclass)IntEnum)AnyDictIterableOptional)TaskContext)_check_import)BlockBlockAccessor)DatasinkWriteReturnType)DeveloperAPI	PublicAPI&   
   schemareturnc                    t          |           dk    rdS | D ]$}t          j        |j                  r	|j        c S %| D ]=}t          j        |j                  s"t          j        |j                  s	|j        c S >| d         j        S )Nr   ztuple())lenpatis_timestamptypename	is_stringis_large_string)r   fs     /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/clickhouse_datasink.py#_pick_best_arrow_field_for_order_byr      s    
6{{ay  AF## 	6MMM	   af%% 	)<QV)D)D 	6MMM!9>    fieldc                    | j         }t          j        |          r%|j        pt          }|j        pt          }d| d| dS t          j        |          rdS t          j        |          rdS t          j	        |          rdS t          j
        |          rdS t          j        |          rdS t          j        |          rdS t          j        |          rd	S t          j        |          rd
S t          j        |          rdS t          j        |          rdS t          j        |          rdS t          j        |          rdS t          j        |          rdS dS )zAConvert a PyArrow field to an appropriate ClickHouse column type.zDecimal(z, )UInt8Int8Int16Int32Int64UInt16UInt32UInt64Float32Float64zDateTime64(3)String)r   r   
is_decimal	precisionDEFAULT_DECIMAL_PRECISIONscaleDEFAULT_DECIMAL_SCALE
is_booleanis_int8is_int16is_int32is_int64is_uint8	is_uint16	is_uint32	is_uint64
is_float16
is_float32
is_float64r   )r!   tr0   r2   s       r   _arrow_to_clickhouse_typerA   +   su   
A
~a 0K<#<	00/)//u////
~a w
{1~~ v
|A w
|A w
|A w
|A w
}Q x
}Q x
}Q x
~a y
~a y
~a y
 8r    c                       e Zd ZU dZdZeed<   dZee         ed<   dZ	ee         ed<   dZ
ee         ed<   dZee         ed<   dS )	ClickHouseTableSettingsa  
    Additional table creation instructions for ClickHouse.

    Attributes:
        engine: The engine definition for the created table. Defaults
            to "MergeTree()".
        order_by: The ORDER BY clause for the table.
        partition_by: The PARTITION BY clause for the table.
        primary_key: The PRIMARY KEY clause for the table.
        settings: Additional SETTINGS clause for the table
            (comma-separated or any valid string).
    zMergeTree()engineNorder_bypartition_byprimary_keysettings)__name__
__module____qualname____doc__rD   str__annotations__rE   r   rF   rG   rH    r    r   rC   rC   O   s~            FC"Hhsm""""&L(3-&&&!%K#%%%"Hhsm"""""r    rC   alpha)	stabilityc                       e Zd ZdZdZdZdZdS )SinkModea8  
    Enum of possible modes for sinking data

    Attributes:
        CREATE: Create a new table; fail if that table already exists.
        APPEND: Use an existing table if present, otherwise create one; then append data.
        OVERWRITE: Drop the table if it already exists, then re-create it and write.
             N)rI   rJ   rK   rL   CREATEAPPEND	OVERWRITErO   r    r   rS   rS   e   s-          F F IIIr    rS   c                   z   e Zd ZdZdZdZdZdZdZe	j
        dddddfded	ed
e	deej                 deeeef                  deeeef                  dee         dee         ddfdZd Zdej        defdZdefdZdee         fdZedefd            Zdded         ddfdZdefdZdee         dede fdZ!dS )ClickHouseDatasinku%	  ClickHouse Ray Datasink.

    A Ray Datasink for writing data into ClickHouse, with support for distributed
    writes and mode-based table management (create, append, or overwrite).

    Args:
        table: Fully qualified table identifier (e.g., "default.my_table").
        dsn: A string in DSN (Data Source Name) HTTP format
            (e.g., "clickhouse+http://username:password@host:8123/default").
            For more information, see `ClickHouse Connection String doc
            <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
        mode: One of SinkMode.CREATE, SinkMode.APPEND,
            or SinkMode.OVERWRITE.
            - **CREATE**: Create a new table; fail if that table already exists.
              Requires a user-supplied schema if the table doesn’t already exist.
            - **APPEND**: Use an existing table if present, otherwise create one.
              If the table does not exist, the user must supply a schema. Data
              is then appended to the table.
            - **OVERWRITE**: Drop the table if it exists, then re-create it.
              **Always requires** a user-supplied schema to define the new table.
        schema: An optional PyArrow schema object that, if provided, will
            override any inferred schema for table creation.
            - If you are creating a new table (CREATE or APPEND when the table
              doesn’t exist) or overwriting an existing table, you **must**
              provide a schema.
            - If you’re appending to an already-existing table, the schema is
              not strictly required unless you want to cast data or enforce
              column types. If omitted, the existing table definition is used.
        client_settings: Optional ClickHouse server settings to be used with the
            session/every request.
        client_kwargs: Additional keyword arguments to pass to the
            ClickHouse client.
        table_settings: An optional dataclass with additional table creation
            instructions (e.g., engine, order_by, partition_by, primary_key, settings).
        max_insert_block_rows: If you have extremely large blocks, specifying
            a limit here will chunk the insert into multiple smaller insert calls.
            Defaults to None (no chunking).
    
ClickHousez
        CREATE TABLE IF NOT EXISTS {table_name} (
            {columns}
        )
        ENGINE = {engine}
        ORDER BY {order_by}
        {additional_props}
    z!DROP TABLE IF EXISTS {table_name}zEXISTS {table_name}zSHOW CREATE TABLE {table_name}Ntabledsnmoder   client_settingsclient_kwargstable_settingsmax_insert_block_rowsr   c	                     || _         || _        || _        || _        |pi | _        |pi | _        |pt                      | _        || _        d| _	        d S )NF)
_table_dsn_mode_schema_client_settings_client_kwargsrC   _table_settings_max_insert_block_rows_table_dropped)	selfr]   r^   r_   r   r`   ra   rb   rc   s	            r   __init__zClickHouseDatasink.__init__   sd     	
 / 52+1r-J1H1J1J&;##r    c                 l    t          | dd           dd l} |j        d| j        | j        d| j        S )Nclickhouse_connectclickhouse-connectmodulepackager   )r^   rH   rO   )r
   rq   
get_clientrf   ri   rj   )rn   rq   s     r   _init_clientzClickHouseDatasink._init_client   s_    d#7AUVVVV!!!!,!, 
	*
 
 !
 
 	
r    c                    | j         j        }| j         j        | j         j        }nt          |          }g }| j         j        "|                    d| j         j                    | j         j        #|                    d| j         j         d           | j         j        "|                    d| j         j                    d}|rdd                    |          z   }g }|D ]1}t          |          }|                    d|j
         d|            2d	                    |          }	| j                            | j        |	|||
          S )NzPARTITION BY zPRIMARY KEY (r#   z	SETTINGS  
`z` z,
    )
table_namecolumnsrD   rE   additional_props)rk   rD   rE   r   rF   appendrG   rH   joinrA   r   _CREATE_TABLE_TEMPLATEformatre   )
rn   r   rD   rE   additional_clausesr~   columns_defr!   ch_typecolumns_strs
             r   _generate_create_table_sqlz-ClickHouseDatasink._generate_create_table_sql   s    %,(4+4HH:6BBH,8%%C 4 ACC   +7%%C 4 @CCC   (4%%&Q$2F2O&Q&QRRR 	D#dii0B&C&CC 	< 	<E/66G:5:::::;;;;nn[11*11{- 2 
 
 	
r    c                    	 |                     | j                            | j                            }|dS dD ]}t	          | dd           ddlm} t          ||          rc	 t           t          ||                                c S # t          t          |f$ r&}t                              d	||           Y d }~d }~ww xY wd
D ]V}t          ||d           }|rA|d         }t          |t          t           f          r|r|d         nd}t          |          c S WdS # t"          $ r0}	t                              d| j         d|	            Y d }	~	dS d }	~	ww xY w)Nr|   F)scalar
first_itemfirst_valuerq   rr   rs   r   )Errorz(Helper %s failed: %s; will try fallbacks)result_rowsrowsdatazCould not verify if table z	 exists: )query_CHECK_TABLE_EXISTS_TEMPLATEr   re   r
   $clickhouse_connect.driver.exceptionsr   hasattrboolgetattr	TypeError
ValueErrorloggerdebug
isinstancelisttuple	Exceptionwarning)
rn   clientresulthelperCHErrorexcattrr   firstes
             r   _table_existsz ClickHouseDatasink._table_exists   s    	\\188DK8PP F ~uA  !5?S    RQQQQQ66** #$;GFF$;$;$=$=>>>>>%z7;   FPS        8 ' 'vtT22 ' GE!%$77 9,1 8aq;;&&&' 5 	 	 	NNQQQaQQRRR55555	sM   5D$ ,D$ &$B
D$ CC ;D$  CAD$ !D$ $
E.%EEc                    t                               d| j                    	 | j                            | j                  }|                    |          }t          |          }d}t          j        ||          }|r'|	                    d          
                                S d S # t          $ r0}t                               d| j         d|            Y d }~d S d }~ww xY w)Nz6Retrieving ORDER BY clause from SHOW CREATE TABLE for r   z)(?is)\border\s+by\s+(.*?)(?=\bengine\b|$)rT   z)Could not retrieve SHOW CREATE TABLE for : )r   r   re   _SHOW_CREATE_TABLE_TEMPLATEr   commandrM   researchgroupstripr   r   )rn   r   show_create_sqlr   ddl_strpatternmatchr   s           r   _get_existing_order_byz)ClickHouseDatasink._get_existing_order_by  s   RT[RR	
 	
 	
	">EE; F  O ^^O44F&kkGBGIgw//E .{{1~~++---4 	 	 	NNNDKNN1NN   44444		s   BB* *
C$4%CC$c                     dS )NTrO   rn   s    r   supports_distributed_writesz.ClickHouseDatasink.supports_distributed_writes,  s    tr    z	pa.Schemac           	         d }	 |                                  }|                     |          }| j        t          j        k    p!| j        t          j        t          j        fv o| }|rY| j        R| j        t          j        k    rt          d| j	         d          t          d| j	         d| j        j
         d          | j        t          j        k    r|rL| j        j        @|                     |          }|)|| j        _        t                              d|            | j                            | j	                  }t                              d|            |                    |           d	| _        |                     | j                  }|                    |           n?| j        t          j        k    rf|r4d| j	         d
}t                              |           t          |          |                     | j                  }|                    |           n| j        t          j        k    r|r~|                     |          }| j        j        }	|	'|r$||	k    rt          d| j	         d| d|	 d          nc|r1|| j        _        t                              d| j	         d|            n/|                     | j                  }|                    |           n9# t,          $ r,}
t                              d| j	         d|
            |
d }
~
ww xY w|r|                                 d S d S # |r|                                 w w xY w)NzOverwriting table u#    requires a user‑provided schema.zTable z does not exist in mode='zG' and no schema was provided. Cannot create the table without a schema.z-Reusing old ORDER BY from overwritten table: r   zMode=OVERWRITE => TzK already exists in mode='CREATE'. Use mode='APPEND' or 'OVERWRITE' instead.z+Conflict with order_by. The existing table z has ORDER BY z", but the user specified ORDER BY z?. Please drop or overwrite the table, or use the same ordering.z$Reusing existing ORDER BY for table r   z,Could not complete on_write_start for table )rw   r   rg   rS   rY   rW   rX   rh   r   re   r   rk   rE   r   r   info_DROP_TABLE_TEMPLATEr   r   rm   r   errorr   close)rn   r   r   table_existsschema_requiredexisting_order_bydrop_sql
create_sqlmsguser_order_byr   s              r   on_write_startz!ClickHouseDatasink.on_write_start0  s   Y	&&((F--f55L 
h00  J8?HO"DD )((   	4<#7:!333$]T[]]]   %\ \ \tz \ \ \  
 zX///   	D$8$A$I )-(C(CF(K(K%(48I,54 14 4  
  4;;t{;SS;;;<<<x(((&*#!<<T\JJ
z****x.. *D D D D  LL%%%$S//)!<<T\JJ
z****x.. /(,(C(CF(K(K%$($8$AM$0, 1Bm1S1S",!=dk != !=0A!= !=,9!= != !=# #  + 8I,5e4;eeRcee  
 "&!@!@!N!NJNN:... 	 	 	LLQt{QQaQQ   G		   v s*   K'K, +L? ,
L"6'LL""L? ?Mc                     | j         S N)NAMEr   s    r   get_namezClickHouseDatasink.get_name  s
    yr    blocksctxc                 4   |                                  }d}	 |D ]}t          j        |                                          }|j        }| j        |                    | j        d          }| j        | j        }n
|dk    r|nd}t          t          d||                    }	|	
                    |           t          t          |	          dz
            D ]S}
|	|
         }|	|
dz            }|                    |||z
            }|                    | j        |           ||j        z  }Tn9# t          $ r,}t                               d| j         d|            |d }~ww xY w	 |                                 n# |                                 w xY w|gS )Nr   T)saferT   z"Failed to write block(s) to table r   )rw   r   	for_blockto_arrownum_rowsrh   castrl   r   ranger   r   sliceinsert_arrowre   r   r   r   r   )rn   r   r   r   total_insertedblockarrow_table	row_countmax_chunk_sizeoffsetsistartendchunkr   s                  r   writezClickHouseDatasink.write  s   
 ""$$	 5 5+5e<<EEGG'0	<+"-"2"24<d"2"K"KK.:%)%@NN 3<a--YYQNuQ	>BBCCy)))s7||a/00 5 5A#AJE!!a%.C'--eS5[AAE''U;;;"en4NN55$  	 	 	LLPdkPPQPPQQQG	%5, LLNNNNFLLNNNNs*   DD/ .E> /
E%9'E  E%%E> >Fr   )"rI   rJ   rK   rL   r   r   r   r   r   rS   rW   rM   r   pyarrowSchemar   r   rC   intro   rw   r   r   r   r   propertyr   r   r   r   r   r	   r   r   rO   r    r   r[   r[   z   s       % %N D C#< "F "+/4826<@/3$ $$ $ 	$
 ($ "$sCx.1$  S#X/$ !!89$  (}$ 
$ $ $ $*
 
 
"
"
 
"
 "
 "
 "
H!t ! ! ! !F    * T    X[ [Xk%: [d [ [ [ [z#         
	           r    r[   )*loggingr   dataclassesr   enumr   typingr   r   r   r   r   papyarrow.typestypesr   'ray.data._internal.execution.interfacesr	   ray.data._internal.utilr
   ray.data.blockr   r   ray.data.datasource.datasinkr   r   ray.util.annotationsr   r   	getLoggerrI   r   r1   r3   r   rM   r   FieldrA   rC   rS   r[   rO   r    r   <module>r      s5    				 ! ! ! ! ! !                             ? ? ? ? ? ? 1 1 1 1 1 1 / / / / / / / / B B B B B B B B 8 8 8 8 8 8 8 8		8	$	$   3    !W] !s ! ! ! !H # # # # # # # #* W    w   ( t  t  t  t  t  t  t  t  t  t r    