
    &`i                         d dl Z d dlmZmZmZmZ d dlmZmZ d dl	m
Z
mZ erd dlZ e j        e          Z G d de
          Zdedefd	ZdS )
    N)TYPE_CHECKINGDictListOptional)BlockBlockMetadata)
DatasourceReadTaskc                       e Zd ZdZ	 	 ddedededeee                  ded         f
d	Zd
ee	         fdZ
dee         d
efdZd Z	 dde	dee	         d
ee         fdZdS )MongoDatasourcez3Datasource for reading from and writing to MongoDB.Nuridatabase
collectionpipelineschemapymongoarrow.api.Schemac                     || _         || _        || _        || _        || _        || _        |sddddiiig| _        d | _        d S )N$match_idz$existstrue)_uri	_database_collection	_pipeline_schema_mongo_args_client)selfr   r   r   r   r   
mongo_argss          /home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/data/_internal/datasource/mongo_datasource.py__init__zMongoDatasource.__init__   s]     	!%!% 	H'%)V1D)EFGDN    returnc                     d S N )r   s    r    estimate_inmemory_data_sizez+MongoDatasource.estimate_inmemory_data_size%   s    tr"   c                 \    t          |          dk    s
d|d         vri S |d         d         S )Nr   r   )len)r   r   s     r    _get_match_queryz MongoDatasource._get_match_query)   s5    x==A!!<!<I{8$$r"   c                 
   dd l }| j        w|                    | j                  | _        t	          | j        | j        | j                   | j        | j                                     d| j                  d         | _        d S d S )Nr   	collStats
avgObjSize)	pymongor   MongoClientr   #_validate_database_collection_existr   r   command_avg_obj_size)r   r.   s     r    _get_or_create_clientz%MongoDatasource._get_or_create_client.   s    <"..ty99DL/dnd.>   "&dn!=!E!ET-" ""D  r"   parallelismper_task_row_limitc                    ddl m} |                                  | j        | j                 | j                 }|                     | j                  }t          |	                    d|idd|digd                    }d	t          d
t          dt          dt          t                   d|d|dt          dddt          dt          fdg }t!          |          D ]\  }}	t#          |	d         |	d         | j        z  d d           }
| j        | j        | j        | j        |	d         d         |	d         d         |t)          |          dz
  k    | j        | j        f	}t/          |ffd	|
|          }|                    |           |S )Nr   )ObjectIdr   z$bucketAutoz$_id)groupBybucketsT)allowDiskUser   r   r   r   min_idmax_idright_closedr   r   kwargsr#   c	                     dd l }	ddlm}
 ddd||rdnd|iiig}|	                    |           } |
||         |         ||z   fd|i|S )	Nr   )aggregate_arrow_allr   r   z$gtez$ltez$ltr   )r.   pymongoarrow.apir@   r/   )r   r   r   r   r;   r<   r=   r   r>   r.   r@   matchclients                r    
make_blockz2MongoDatasource.get_read_tasks.<locals>.make_blockL   s     NNN<<<<<<
 "F&2=FFv 	E ((--F&&x ,eh.> GMQW  r"   count)num_rows
size_bytesinput_files
exec_statsr   minmax   c                      |  gS r%   r&   )argsrD   s    r    <lambda>z0MongoDatasource.get_read_tasks.<locals>.<lambda>   s    jj$.?-@ r"   )r5   )bson.objectidr7   r3   r   r   r   r*   r   list	aggregatestrr   r   booldictr   	enumerater   r2   r   r)   r   r   r
   append)r   r4   r5   r7   collmatch_querypartitions_ids
read_tasksi	partitionmetadatamake_block_args	read_taskrD   s                @r    get_read_taskszMongoDatasource.get_read_tasks:   s    	+*****""$$$|DN+D,<=++DN;;NN{+";$O$OP "   
 
			 	 4j		
 	 	 	 .	 	 	 	 	 	< &(
%n55 	) 	)LAy$"7+$W-0BB 	  H 	 % '% 'S((1,, 
O !+@@@@@#5  I
 i((((r"   )NNr%   )__name__
__module____qualname____doc__rS   r   r   r   r!   intr'   r*   r3   r
   ra   r&   r"   r    r   r      s       == *.6:   	
 4:& 23   *Xc]    %d % % % % %

 
 
 EIK KK4<SMK	hK K K K K Kr"   r   r   r   c                     |                                  }||vrt          d| d          | |                                         }||vrt          d| d          d S )NzThe destination database z doesn't exist.zThe destination collection )list_database_names
ValueErrorlist_collection_names)rC   r   r   db_namescollection_namess        r    r0   r0      s}    ))++HxNXNNNOOOh'==??)))RzRRRSSS *)r"   )loggingtypingr   r   r   r   ray.data.blockr   r   ray.data.datasource.datasourcer	   r
   rA   pymongoarrow	getLoggerrb   loggerr   rS   r0   r&   r"   r    <module>rt      s     6 6 6 6 6 6 6 6 6 6 6 6 / / / / / / / / ? ? ? ? ? ? ? ? 		8	$	$x x x x xj x x xvT# T3 T T T T T Tr"   