
    &`iZ                         d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZ d dl	m
Z erd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ  e j        e          Zd	Zd
Z  G d ded                   Z!dS )    N)TYPE_CHECKINGIterableOptional)bigquery_datasource)TaskContext)cached_remote_fn)_check_import)BlockBlockAccessor)Datasink
      c                   z    e Zd Zedfdedededee         ddf
dZdd	ed
         ddfdZ	de
e         deddfdZdS )BigQueryDatasinkT
project_iddatasetmax_retry_cntoverwrite_tablereturnNc                     t          | dd           t          | dd           t          | dd           || _        || _        || _        || _        d S )Nzgoogle.cloudbigquery)modulepackagebigquery_storagezgoogle.api_core
exceptions)r	   r   r   r   r   )selfr   r   r   r   s        /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/bigquery_datasink.py__init__zBigQueryDatasink.__init__   si     	d>:FFFFd>;MNNNNd#4lKKKK$*.    schemaz	pa.Schemac                 ~   ddl m} | j        | j        t	          d          t          j        | j                  }| j                            dd          d         }	 |                    |           nN# |j	        $ rA |
                    | j         d| d           t                              d	|z              Y nw xY w| j        rMt                              d
| j         dz              |                    | j         d| j         d           d S t                              d| j         dz              d S )Nr   r   z(project_id and dataset are required argsr   .      )timeoutzCreated dataset zAttempting to delete table z9 if it already exists since kwarg overwrite_table = True.T)not_found_okzThe write will append to table z: if it already exists since kwarg overwrite_table = False.)google.api_corer   r   r   
ValueErrorr   _create_clientsplitget_datasetNotFoundcreate_datasetloggerinfor   delete_table)r   r    r   client
dataset_ids        r   on_write_startzBigQueryDatasink.on_write_start,   s   ......?"dl&:GHHH %3tOOO\''Q//2
	9z****" 	9 	9 	9!!T_"C"Cz"C"CR!PPPKK*Z788888	9
  
	KK<dl<<MN   4? C CT\ C CRVWWWWWKK@$,@@NO    s    A6 6AC Cblocksctxc                      dt           dt          dt          dd f fdt                    t          j         fd|D                        d S )Nblockr   r   r   c                    ddl m} ddlm} t	          j        |                                           } t          j        |          }|	                    d          }|j
        j        |_        |j        j        |_        t!          j                    5 }t$          j                            |dt+          j                     d          }t/          j        | |d	
           d}	|	j        k    rt5          |d          5 }
|                    |
||          }d d d            n# 1 swxY w Y   	 t8                              |                                           n# |j        |j         f$ rm}|	dz  }	|	j        k    rY d }~nft8                              dd|	 dz              tC          j"        |           tG          j$        tJ                     Y d }~nd }~ww xY w|	j        k    |	j        k    r>t8                              dj         ddz              tM          d|	 dz   dz             	 d d d            d S # 1 swxY w Y   d S )Nr   r"   )r   r#   T)
autodetectblock_z.parquetSNAPPY)compressionrb)
job_configr%   z5A block write encountered a rate limit exceeded error z  time(s). Sleeping to try again.z	Maximum (z) retry count exceeded. Rayz; will attempt to retry the block write via fault tolerance.zWrite failed due to z5 repeated API rate limit exceeded responses. Considerz9 specifiying the max_retry_cnt kwarg with a higher value.)'r)   r   google.cloudr   r   	for_blockto_arrowr   r+   LoadJobConfigSourceFormatPARQUETsource_formatWriteDispositionWRITE_APPENDwrite_dispositiontempfileTemporaryDirectoryospathjoinuuiduuid4pqwrite_tabler   openload_table_from_filer0   r1   result	ForbiddenTooManyRequestsloggingdebugtimesleepRATE_LIMIT_EXCEEDED_SLEEP_TIMERuntimeError)r9   r   r   r   r   r3   r@   temp_dirfp	retry_cntsource_filejober   s                r   _write_single_blockz3BigQueryDatasink.write.<locals>._write_single_blockM   s"   222222------!+E22;;==E(7:NNNF!//4/@@J'/'<'DJ$+3+D+QJ(,.. "(W\\(,KTZ\\,K,K,KLLubh????	4#555b$ ;$99'Z :                CCJJLL111&0*2LM 	C 	C 	C!Q	$t'999!EEEESM)MMMN    a(((
#ABBBBBBBB	C  4#555( t111KKSD$6SSSWX   ':y::QRUV   23" " " " " " " " " " " " " " " " " "so   A*H?>D#H?#D'	'H?*D'	+H?/,EH?G-G=H?AGH?GAH??IIc                 R    g | ]#}                     |j        j                  $S  )remoter   r   ).0r9   rf   r   s     r   
<listcomp>z*BigQueryDatasink.write.<locals>.<listcomp>   s?        $**5$/4<PP  r   )r
   strr   rayget)r   r6   r7   rf   s   `  @r   writezBigQueryDatasink.writeH   s    
-	u -	# -	 -	PT -	 -	 -	 -	 -	 -	^ //BCC 	    #  	
 	
 	
 	
 	
r   )N)__name__
__module____qualname__DEFAULT_MAX_RETRY_CNTrl   intr   boolr   r5   r   r
   r   ro   rh   r   r   r   r      s        
 3*./ // / 	/
 "$/ 
/ / / /  Xk%: d    8<
<
 <
 
	<
 <
 <
 <
 <
 <
r   r   )"rZ   rN   rL   r\   rQ   typingr   r   r   pyarrow.parquetparquetrS   pyarrowparm   ray.data._internal.datasourcer   'ray.data._internal.execution.interfacesr   ray.data._internal.remote_fnr   ray.data._internal.utilr	   ray.data.blockr
   r   ray.data.datasource.datasinkr   	getLoggerrp   r0   rs   r^   r   rh   r   r   <module>r      sQ    				    4 4 4 4 4 4 4 4 4 4        



 = = = = = = ? ? ? ? ? ? 9 9 9 9 9 9 1 1 1 1 1 1 / / / / / / / / 1 1 1 1 1 1		8	$	$ !# i
 i
 i
 i
 i
x~ i
 i
 i
 i
 i
r   