
    &`i                          d dl mZmZmZmZmZ d dlmZmZ d dl	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ  ed	           G d
 de                      ZdS )    )AnyCallableDictOptionalUnion)	RunConfigScalingConfig)
Checkpoint
DataConfig)DataParallelTrainer)HorovodConfig)
GenDataset)	PublicAPIbeta)	stabilityc                       e Zd ZdZddddddddddeeg df         eegdf         f         dee         dee         dee	         dee
         d	ee         d
eeeef                  deeeef                  dee         f fdZ xZS )HorovodTrainerav  A Trainer for data parallel Horovod training.

    This Trainer runs the function ``train_loop_per_worker`` on multiple Ray
    Actors. These actors already have the necessary Horovod setup already
    configured for distributed Horovod training.

    The ``train_loop_per_worker`` function is expected to take in either 0 or 1
    arguments:

    .. testcode::

        def train_loop_per_worker():
            ...

    .. testcode::

        def train_loop_per_worker(config: Dict):
            ...

    If ``train_loop_per_worker`` accepts an argument, then
    ``train_loop_config`` will be passed in as the argument. This is useful if you
    want to tune the values in ``train_loop_config`` as hyperparameters.

    If the ``datasets`` dict contains a training dataset (denoted by
    the "train" key), then it will be split into multiple dataset
    shards that can then be accessed by ``ray.train.get_dataset_shard("train")`` inside
    ``train_loop_per_worker``. All the other datasets will not be split and
    ``ray.train.get_dataset_shard(...)`` will return the entire Dataset.

    Inside the ``train_loop_per_worker`` function, you can use any of the
    :ref:`Ray Train loop methods <train-loop-api>`.

    .. testcode::

        from ray import train

        def train_loop_per_worker():
            # Report intermediate results for callbacks or logging and
            # checkpoint data.
            train.report(...)

            # Returns dict of last saved checkpoint.
            train.get_checkpoint()

            # Returns the Dataset shard for the given key.
            train.get_dataset_shard("my_dataset")

            # Returns the total number of workers executing training.
            train.get_context().get_world_size()

            # Returns the rank of this worker.
            train.get_context().get_world_rank()

            # Returns the rank of the worker on the current node.
            train.get_context().get_local_rank()

    Any returns from the ``train_loop_per_worker`` will be discarded and not
    used or persisted anywhere.

    You could use ``TensorflowPredictor`` or ``TorchPredictor`` in conjunction with
    HorovodTrainer. You must save the model under the "model" kwarg in the
    ``Checkpoint`` passed to ``train.report()``, so that it can be used by
    corresponding predictors.

    Example:


    .. testcode::
        :skipif: True

        import os
        import tempfile

        import ray
        import horovod.torch as hvd
        import torch
        import torch.nn as nn

        from ray import train
        import ray.train.torch  # Need this to use `train.torch.get_device()`
        from ray.train import Checkpoint, ScalingConfig
        from ray.train.horovod import HorovodTrainer

        # If using GPUs, set this to True.
        use_gpu = False

        input_size = 1
        layer_size = 15
        output_size = 1
        num_epochs = 3

        class NeuralNetwork(nn.Module):
            def __init__(self):
                super(NeuralNetwork, self).__init__()
                self.layer1 = nn.Linear(input_size, layer_size)
                self.relu = nn.ReLU()
                self.layer2 = nn.Linear(layer_size, output_size)
            def forward(self, input):
                return self.layer2(self.relu(self.layer1(input)))

        def train_loop_per_worker():
            hvd.init()
            dataset_shard = train.get_dataset_shard("train")
            model = NeuralNetwork()
            device = train.torch.get_device()
            model.to(device)
            loss_fn = nn.MSELoss()
            lr_scaler = 1
            optimizer = torch.optim.SGD(model.parameters(), lr=0.1 * lr_scaler)
            # Horovod: wrap optimizer with DistributedOptimizer.
            optimizer = hvd.DistributedOptimizer(
                optimizer,
                named_parameters=model.named_parameters(),
                op=hvd.Average,
            )
            for epoch in range(num_epochs):
                model.train()
                for batch in dataset_shard.iter_torch_batches(
                    batch_size=32, dtypes=torch.float
                ):
                    inputs, labels = torch.unsqueeze(batch["x"], 1), batch["y"]
                    outputs = model(inputs)
                    loss = loss_fn(outputs, labels)
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                    print(f"epoch: {epoch}, loss: {loss.item()}")

                # Save a model checkpoint at the end of each epoch
                with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                    ckpt_path = os.path.join(temp_checkpoint_dir, "model.pt")
                    torch.save(model.state_dict(), ckpt_path)
                    train.report(
                        {"loss": loss.item(), "epoch": epoch},
                        checkpoint=Checkpoint.from_directory(temp_checkpoint_dir),
                    )

        train_dataset = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
        scaling_config = ScalingConfig(num_workers=3, use_gpu=use_gpu)
        trainer = HorovodTrainer(
            train_loop_per_worker=train_loop_per_worker,
            scaling_config=scaling_config,
            datasets={"train": train_dataset},
        )
        result = trainer.fit()

    Args:
        train_loop_per_worker: The training function to execute.
            This can either take in no arguments or a ``config`` dict.
        train_loop_config: Configurations to pass into
            ``train_loop_per_worker`` if it accepts an argument.
        horovod_config: Configuration for setting up the Horovod backend.
            If set to None, use the default configuration. This replaces the
            ``backend_config`` arg of ``DataParallelTrainer``.
        scaling_config: Configuration for how to scale data parallel training.
        dataset_config: Configuration for dataset ingest.
        run_config: Configuration for the execution of the training run.
        datasets: Any Datasets to use for training. Use
            the key "train" to denote which dataset is the training
            dataset.
        resume_from_checkpoint: A checkpoint to resume training from.
        metadata: Dict that should be made available via
            `ray.train.get_context().get_metadata()` and in `checkpoint.get_metadata()`
            for checkpoints saved from this Trainer. Must be JSON-serializable.
    N)train_loop_confighorovod_configscaling_configdataset_config
run_configdatasetsmetadataresume_from_checkpointtrain_loop_per_workerr   r   r   r   r   r   r   r   c                x    t                                          |||pt                      |||||	|	  	         d S )N)	r   r   backend_configr   r   r   r   r   r   )super__init__r   )selfr   r   r   r   r   r   r   r   r   	__class__s             u/home/jaya/work/projects/VOICE-AGENT/VIET/agent-env/lib/python3.11/site-packages/ray/train/horovod/horovod_trainer.pyr    zHorovodTrainer.__init__   sT     	"7/)<]__))!#9 	 
	
 
	
 
	
 
	
 
	
    )__name__
__module____qualname____doc__r   r   r   r   r   r	   r   r   strr   r   r
   r    __classcell__)r"   s   @r#   r   r      s       d dT -12626/3*.48-17;
 
 
$Xb$h%74&$,9O%OP
 $D>	

 !/
 !/
 !,
 Y'
 4Z01
 4S>*
 !) 4
 
 
 
 
 
 
 
 
 
r$   r   N)typingr   r   r   r   r   ray.air.configr   r	   	ray.trainr
   r   ray.train.data_parallel_trainerr   ray.train.horovod.configr   ray.train.trainerr   ray.util.annotationsr   r    r$   r#   <module>r3      s    7 7 7 7 7 7 7 7 7 7 7 7 7 7 3 3 3 3 3 3 3 3 , , , , , , , , ? ? ? ? ? ? 2 2 2 2 2 2 ( ( ( ( ( ( * * * * * * V~
 ~
 ~
 ~
 ~
( ~
 ~
 ~
 ~
 ~
r$   