# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD 3-Clause license found in the
# LICENSE file in the root directory of this source tree.

import math
from typing import Tuple

import torch
from torch.utils._triton import has_triton

if has_triton():
    import triton
    import triton.language as tl
    from triton import Config

    # Original implementation at https://github.com/deepseek-ai/DeepSeek-V3/blob/main/inference/kernel.py

    fp8_gemm_configs = [
        Config(
            {"BLOCK_SIZE_M": block_m, "BLOCK_SIZE_N": block_n},
            num_stages=num_stages,
            num_warps=8,
        )
        for block_m in [16, 32, 64, 128]
        for block_n in [32, 64, 128]
        for num_stages in [3, 4, 5, 6]
    ]

    @triton.autotune(
        configs=fp8_gemm_configs, key=["N", "K", "M_BUCKET", "BLOCK_SIZE_K"]
    )
    @triton.jit
    def blockwise_fp8_gemm_kernel(
        a_ptr,
        b_ptr,
        c_ptr,
        a_s_ptr,
        b_s_ptr,
        M,
        N: tl.constexpr,
        K: tl.constexpr,
        M_BUCKET: tl.constexpr,
        BLOCK_SIZE_M: tl.constexpr,
        BLOCK_SIZE_N: tl.constexpr,
        BLOCK_SIZE_K: tl.constexpr,
    ):
        pid_m = tl.program_id(axis=0)
        pid_n = tl.program_id(axis=1)
        k = tl.cdiv(K, BLOCK_SIZE_K)
        offs_m = (pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)) % M
        offs_n = (pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)) % N
        offs_k = tl.arange(0, BLOCK_SIZE_K)
        a_ptrs = a_ptr + offs_m[:, None] * K + offs_k[None, :]
        b_ptrs = b_ptr + offs_n[None, :] * K + offs_k[:, None]
        a_s_ptrs = a_s_ptr + offs_m * k
        b_s_ptrs = b_s_ptr + (offs_n // BLOCK_SIZE_K) * k

        accumulator = tl.zeros((BLOCK_SIZE_M, BLOCK_SIZE_N), dtype=tl.float32)
        for i in range(k):
            a = tl.load(a_ptrs, mask=offs_k[None, :] < K - i * BLOCK_SIZE_K, other=0.0)
            b = tl.load(b_ptrs, mask=offs_k[:, None] < K - i * BLOCK_SIZE_K, other=0.0)
            a_s = tl.load(a_s_ptrs)
            b_s = tl.load(b_s_ptrs)
            accumulator += tl.dot(a, b) * a_s[:, None] * b_s[None, :]
            a_ptrs += BLOCK_SIZE_K
            b_ptrs += BLOCK_SIZE_K
            a_s_ptrs += 1
            b_s_ptrs += 1

        c = accumulator.to(c_ptr.dtype.element_ty)
        offs_m = pid_m * BLOCK_SIZE_M + tl.arange(0, BLOCK_SIZE_M)
        offs_n = pid_n * BLOCK_SIZE_N + tl.arange(0, BLOCK_SIZE_N)
        c_ptrs = c_ptr + offs_m[:, None] * N + offs_n[None, :]
        mask = (offs_m[:, None] < M) & (offs_n[None, :] < N)
        tl.store(c_ptrs, c, mask=mask)

    @torch.library.custom_op("ao::blockwise_fp8_gemm", mutates_args=())
    def blockwise_fp8_gemm(
        a: torch.Tensor,
        a_s: torch.Tensor,
        b: torch.Tensor,
        b_s: torch.Tensor,
        block_size: int = 128,
    ) -> torch.Tensor:
        assert a.is_contiguous()
        assert b.is_contiguous()
        assert a_s.is_contiguous()
        assert b_s.is_contiguous()
        K = a.size(-1)
        M = a.numel() // K
        N = b.size(0)
        M_BUCKET = math.ceil(math.log2(M))
        c = a.new_empty(*a.size()[:-1], N, dtype=torch.bfloat16)
        grid = lambda META: (
            triton.cdiv(M, META["BLOCK_SIZE_M"]),
            triton.cdiv(N, META["BLOCK_SIZE_N"]),
        )
        blockwise_fp8_gemm_kernel[grid](
            a, b, c, a_s, b_s, M, N, K, M_BUCKET, BLOCK_SIZE_K=block_size
        )
        return c

    @blockwise_fp8_gemm.register_fake
    def _(a, a_s, b, b_s, block_size=128):
        N = b.size(0)
        c = a.new_empty(*a.size()[:-1], N, dtype=torch.bfloat16)
        return c

    @triton.jit
    def fp8_blockwise_act_quant_kernel(x_ptr, y_ptr, s_ptr, BLOCK_SIZE: tl.constexpr):
        """
        Quantizes the input tensor `x_ptr` and stores the result in `y_ptr` and the scaling factor in `s_ptr`.

        Args:
            x_ptr (triton.Pointer): Pointer to the input tensor.
            y_ptr (triton.Pointer): Pointer to the output tensor where quantized values will be stored.
            s_ptr (triton.Pointer): Pointer to the output tensor where scaling factors will be stored.
            BLOCK_SIZE (tl.constexpr): The size of the block to be processed by each program instance.

        Returns:
            None
        """
        pid = tl.program_id(axis=0)
        offs = pid * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
        x = tl.load(x_ptr + offs).to(tl.float32)
        s = tl.max(tl.abs(x)) / 448.0
        y = x / s
        y = y.to(y_ptr.dtype.element_ty)
        tl.store(y_ptr + offs, y)
        tl.store(s_ptr + pid, s)

    def fp8_blockwise_act_quant(
        x: torch.Tensor, block_size: int = 128, dtype: torch.dtype = torch.float8_e4m3fn
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Quantizes the input tensor `x` using block-wise quantization with block size being BLOCK_SIZEx1.

        Args:
            x (torch.Tensor): The input tensor to be quantized. Must be contiguous and its last dimension size must be divisible by `block_size`.
            block_size (int, optional): The size of the blocks to be used for quantization. Default is 128.
            dtype (torch.dtype, optional): The dtype to use for the quantized tensor. Default is `torch.float8_e4m3fn`.


        Returns:
            Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
                - The quantized tensor with dtype `dtype`.
                - A tensor of scaling factors with dtype `torch.float32`.
        """
        assert x.is_contiguous(), "Input tensor must be contiguous"
        assert x.size(-1) % block_size == 0, (
            f"Last dimension size must be divisible by block_size (block_size={block_size})"
        )
        assert dtype in [
            torch.float8_e4m3fn,
            torch.float8_e5m2,
        ], "dtype must be torch.float8_e4m3fn or torch.float8_e5m2"
        y = torch.empty_like(x, dtype=dtype)
        s = x.new_empty(*x.size()[:-1], x.size(-1) // block_size, dtype=torch.float32)
        grid = lambda meta: (triton.cdiv(x.numel(), meta["BLOCK_SIZE"]),)
        fp8_blockwise_act_quant_kernel[grid](x, y, s, BLOCK_SIZE=block_size)
        return y, s

    @triton.jit
    def fp8_blockwise_weight_quant_kernel(
        x_ptr, y_ptr, s_ptr, M, N, BLOCK_SIZE: tl.constexpr
    ):
        """
        Quantizes the input tensor `x_ptr` and stores the result in `y_ptr` and the scaling factors in `s_ptr`.

        Args:
            x_ptr (tl.pointer): Pointer to the input tensor.
            y_ptr (tl.pointer): Pointer to the output tensor where quantized values will be stored.
            s_ptr (tl.pointer): Pointer to the output tensor where scaling factors will be stored.
            M (int): Number of rows in the weight matrix.
            N (int): Number of columns in the weight matrix.
            BLOCK_SIZE (tl.constexpr): The size of the block to be processed by each program instance.
        """
        pid_m = tl.program_id(axis=0)
        pid_n = tl.program_id(axis=1)
        n = tl.cdiv(N, BLOCK_SIZE)
        offs_m = pid_m * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
        offs_n = pid_n * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
        offs = offs_m[:, None] * N + offs_n[None, :]
        mask = (offs_m[:, None] < M) & (offs_n[None, :] < N)
        x = tl.load(x_ptr + offs, mask=mask).to(tl.float32)
        s = tl.max(tl.abs(x)) / 448.0
        y = x / s
        y = y.to(y_ptr.dtype.element_ty)
        tl.store(y_ptr + offs, y, mask=mask)
        tl.store(s_ptr + pid_m * n + pid_n, s)

    def fp8_blockwise_weight_quant(
        x: torch.Tensor, block_size: int = 128, dtype=torch.float8_e4m3fn
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Quantizes the given weight tensor using block-wise quantization with block size being BLOCK_SIZExBLOCK_SIZE.

        Args:
            x (torch.Tensor): The weight tensor to be quantized.
            block_size (int, optional): The block size to use for quantization. Defaults to 128.
            dtype (torch.dtype, optional): The dtype to use for the quantized tensor. Defaults to `torch.float8_e4m3fn`.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: A tuple containing:
                - The quantized weight tensor with dtype `dtype`.
                - A tensor of scaling factors with dtype `torch.float32`.
        """
        assert x.is_contiguous(), "Input tensor must be contiguous"
        assert x.dim() == 2, "Input tensor must have 2 dimensions"
        assert x.size(0) % block_size == 0 and x.size(1) % block_size == 0, (
            f"Both dimensions of x must be divisible by block_size (block_size={block_size})"
        )
        assert dtype in [
            torch.float8_e4m3fn,
            torch.float8_e5m2,
        ], "dtype must be torch.float8_e4m3fn or torch.float8_e5m2"
        M, N = x.size()
        y = torch.empty_like(x, dtype=dtype)
        s = x.new_empty(M // block_size, N // block_size, dtype=torch.float32)
        grid = lambda meta: (
            triton.cdiv(M, meta["BLOCK_SIZE"]),
            triton.cdiv(N, meta["BLOCK_SIZE"]),
        )
        fp8_blockwise_weight_quant_kernel[grid](x, y, s, M, N, BLOCK_SIZE=block_size)
        return y, s

    @triton.jit
    def fp8_blockwise_weight_dequant_kernel(
        x_ptr, s_ptr, y_ptr, M, N, BLOCK_SIZE: tl.constexpr
    ):
        """
        Dequantizes weights using the provided scaling factors and stores the result.

        Args:
            x_ptr (tl.pointer): Pointer to the quantized weights.
            s_ptr (tl.pointer): Pointer to the scaling factors.
            y_ptr (tl.pointer): Pointer to the output buffer for dequantized weights.
            M (int): Number of rows in the weight matrix.
            N (int): Number of columns in the weight matrix.
            BLOCK_SIZE (tl.constexpr): Size of the block for tiling.

        Returns:
            None
        """
        pid_m = tl.program_id(axis=0)
        pid_n = tl.program_id(axis=1)
        n = tl.cdiv(N, BLOCK_SIZE)
        offs_m = pid_m * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
        offs_n = pid_n * BLOCK_SIZE + tl.arange(0, BLOCK_SIZE)
        offs = offs_m[:, None] * N + offs_n[None, :]
        mask = (offs_m[:, None] < M) & (offs_n[None, :] < N)
        x = tl.load(x_ptr + offs, mask=mask).to(tl.float32)
        s = tl.load(s_ptr + pid_m * n + pid_n)
        y = x * s
        tl.store(y_ptr + offs, y, mask=mask)

    def fp8_blockwise_weight_dequant(
        x: torch.Tensor, s: torch.Tensor, block_size: int = 128
    ) -> torch.Tensor:
        """
        Dequantizes the given weight tensor using the provided scale tensor.

        Args:
            x (torch.Tensor): The quantized weight tensor of shape (M, N).
            s (torch.Tensor): The scale tensor of shape (M, N).
            block_size (int, optional): The block size to use for dequantization. Defaults to 128.

        Returns:
            torch.Tensor: The dequantized weight tensor of the same shape as `x`.

        Raises:
            AssertionError: If `x` or `s` are not contiguous or if their dimensions are not 2.
        """
        assert x.is_contiguous() and s.is_contiguous(), (
            "Input tensors must be contiguous"
        )
        assert x.dim() == 2 and s.dim() == 2, "Input tensors must have 2 dimensions"
        M, N = x.size()
        y = torch.empty_like(x, dtype=torch.get_default_dtype())
        grid = lambda meta: (
            triton.cdiv(M, meta["BLOCK_SIZE"]),
            triton.cdiv(N, meta["BLOCK_SIZE"]),
        )
        fp8_blockwise_weight_dequant_kernel[grid](x, s, y, M, N, BLOCK_SIZE=block_size)
        return y

else:

    def blockwise_fp8_gemm(
        a: torch.Tensor,
        a_s: torch.Tensor,
        b: torch.Tensor,
        b_s: torch.Tensor,
        block_size: int = 128,
    ) -> torch.Tensor:
        raise AssertionError("unsupported without triton")

    def fp8_blockwise_act_quant(
        x: torch.Tensor, block_size: int = 128, dtype: torch.dtype = torch.float8_e4m3fn
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        raise AssertionError("unsupported without triton")

    def fp8_blockwise_weight_quant(
        x: torch.Tensor, block_size: int = 128, dtype=torch.float8_e4m3fn
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        raise AssertionError("unsupported without triton")
