| | |
| |
|
| | |
| |
|
| | |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | from math import prod |
| | import os |
| | import sys |
| | from pathlib import Path |
| | import ctypes |
| | import logging |
| | import numpy as np |
| |
|
| | |
| | if "NO_LOCAL_GGUF" not in os.environ and (Path(__file__).parent.parent.parent / 'gguf-py').exists(): |
| | sys.path.insert(0, str(Path(__file__).parent.parent)) |
| |
|
| | import gguf |
| | from gguf.constants import GGMLQuantizationType |
| |
|
| |
|
| | logger = logging.getLogger("test-quants") |
| |
|
| |
|
| | c_float_p = ctypes.POINTER(ctypes.c_float) |
| |
|
| |
|
| | class ggml_init_params(ctypes.Structure): |
| | _fields_ = [ |
| | ("mem_size", ctypes.c_size_t), |
| | ("mem_buffer", ctypes.c_void_p), |
| | ("no_alloc", ctypes.c_bool), |
| | ] |
| |
|
| |
|
| | class GGMLQuants: |
| | libggml: ctypes.CDLL |
| |
|
| | def __init__(self, libggml: Path): |
| | self.libggml = ctypes.CDLL(str(libggml)) |
| | self.libggml.ggml_quantize_chunk.restype = ctypes.c_size_t |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | self.libggml.ggml_quantize_chunk.argtypes = ( |
| | ctypes.c_int, |
| | ctypes.POINTER(ctypes.c_float), |
| | ctypes.c_void_p, |
| | ctypes.c_int64, |
| | ctypes.c_int64, |
| | ctypes.c_int64, |
| | ctypes.POINTER(ctypes.c_float), |
| | ) |
| |
|
| | self.libggml.ggml_quantize_requires_imatrix.restype = ctypes.c_bool |
| | self.libggml.ggml_quantize_requires_imatrix.argtypes = (ctypes.c_int,) |
| |
|
| | for t in ( |
| | "q4_0", "q4_1", "q5_0", "q5_1", "q8_0", |
| | "q2_K", "q3_K", "q4_K", "q5_K", "q6_K", |
| | "tq1_0", "tq2_0", |
| | "iq2_xxs", "iq2_xs", "iq2_s", "iq3_xxs", "iq3_s", "iq1_s", "iq1_m", |
| | "iq4_nl", "iq4_xs", |
| | ): |
| | dequant_func: ctypes._NamedFuncPointer = getattr(self.libggml, "dequantize_row_" + t) |
| | dequant_func.restype = None |
| | dequant_func.argtypes = (ctypes.c_void_p, ctypes.POINTER(ctypes.c_float), ctypes.c_int64) |
| |
|
| | self.libggml.ggml_fp16_to_fp32_row.restype = None |
| | self.libggml.ggml_fp16_to_fp32_row.argtypes = (ctypes.POINTER(ctypes.c_uint16), ctypes.POINTER(ctypes.c_float), ctypes.c_int64) |
| | self.libggml.ggml_bf16_to_fp32_row.restype = None |
| | self.libggml.ggml_bf16_to_fp32_row.argtypes = (ctypes.POINTER(ctypes.c_uint16), ctypes.POINTER(ctypes.c_float), ctypes.c_int64) |
| |
|
| | self.libggml.ggml_init.argtypes = (ggml_init_params,) |
| |
|
| | self.libggml.ggml_init(ggml_init_params(1 * 1024 * 1024, 0, False)) |
| |
|
| | def dequantize(self, tensor: np.ndarray, qtype: GGMLQuantizationType) -> np.ndarray: |
| | result = np.zeros(gguf.quant_shape_from_byte_shape(tensor.shape, qtype), dtype=np.float32, order="C") |
| | if qtype == GGMLQuantizationType.F32: |
| | |
| | result = tensor.view(np.float32) |
| | elif qtype == GGMLQuantizationType.F16: |
| | self.libggml.ggml_fp16_to_fp32_row(tensor.ctypes.data_as(ctypes.POINTER(ctypes.c_uint16)), result.ctypes.data_as(c_float_p), result.size) |
| | elif qtype == GGMLQuantizationType.BF16: |
| | self.libggml.ggml_bf16_to_fp32_row(tensor.ctypes.data_as(ctypes.POINTER(ctypes.c_uint16)), result.ctypes.data_as(c_float_p), result.size) |
| | else: |
| | lw_qname = qtype.name.lower() |
| | if lw_qname[-1] == "k": |
| | lw_qname = lw_qname[:-1] + "K" |
| | dequant_func: ctypes._NamedFuncPointer = getattr(self.libggml, "dequantize_row_" + lw_qname) |
| | dequant_func(tensor.ctypes.data_as(ctypes.c_void_p), result.ctypes.data_as(c_float_p), result.size) |
| | return result |
| |
|
| | def quantize(self, data: np.ndarray, qtype: GGMLQuantizationType) -> np.ndarray: |
| | result = np.zeros(gguf.quant_shape_to_byte_shape(data.shape, qtype), dtype=np.uint8, order="C") |
| | if self.libggml.ggml_quantize_requires_imatrix(qtype.value): |
| | |
| | qw = np.sum((data * data).reshape((-1, data.shape[-1])), axis=0).ctypes.data_as(c_float_p) |
| | else: |
| | qw = ctypes.cast(0, c_float_p) |
| | result_size = self.libggml.ggml_quantize_chunk(qtype.value, data.ctypes.data_as(c_float_p), result.ctypes.data_as(ctypes.c_void_p), 0, prod(data.shape[:-1]), data.shape[-1], qw) |
| | assert result.size == result_size |
| | return result |
| |
|
| |
|
| | def compare_tensors(t1: np.ndarray, t2: np.ndarray, qtype: GGMLQuantizationType) -> bool: |
| | same = np.array_equal(t1, t2) |
| | if same: |
| | return True |
| | else: |
| | block_size, type_size = gguf.GGML_QUANT_SIZES[qtype] |
| | if t1.dtype == np.float32: |
| | t1 = t1.reshape((-1, block_size)) |
| | t2 = t2.reshape((-1, block_size)) |
| | else: |
| | t1 = t1.reshape((-1, type_size)) |
| | t2 = t2.reshape((-1, type_size)) |
| | x = t1.view(np.uint8) ^ t2.view(np.uint8) |
| | diff_bits = np.count_nonzero(np.unpackbits(x, axis=-1), axis=-1) |
| | num_bad_blocks = np.count_nonzero(diff_bits, axis=0) |
| | if num_bad_blocks == 0 and t1.shape == t2.shape: |
| | logger.debug("Bits are equal, but arrays don't match, likely contains NANs") |
| | return True |
| | logger.debug(f"{num_bad_blocks} bad blocks ({100 * num_bad_blocks / x.shape[0]:.6f}%)") |
| | bad_block_id = np.argmax(diff_bits, axis=0) |
| | logger.debug(f"Worst block id: {bad_block_id}") |
| | logger.debug(f"Sample bad block ({diff_bits[bad_block_id]} differing bits):\n{t1[bad_block_id]}\nReference:\n{t2[bad_block_id]}") |
| |
|
| | sum_diff_bits = np.sum(diff_bits) |
| | logger.debug(f"{sum_diff_bits} bits differ ({100 * sum_diff_bits / (x.size * 8):.6f}%)") |
| | return False |
| |
|
| |
|
| | def do_test(libggml_path: Path, quick: bool = False): |
| | ggml_quants = GGMLQuants(libggml_path) |
| |
|
| | np.set_printoptions(precision=None, threshold=(4 * 256) + 1, formatter={"int": lambda n: "0x%02X" % n}) |
| |
|
| | r = np.random.randn(8, 1024, 1024).astype(np.float32, copy=False) |
| |
|
| | for qtype in (GGMLQuantizationType.F16, *gguf.quants._type_traits.keys()): |
| | has_dequantize = False |
| | has_quantize = False |
| |
|
| | try: |
| | gguf.dequantize(np.zeros((gguf.GGML_QUANT_SIZES[qtype][1]), dtype=np.uint8), qtype) |
| | has_dequantize = True |
| | except (NotImplementedError, AssertionError) as e: |
| | if isinstance(e, AssertionError): |
| | logger.error(f"Error with {qtype.name}: {e}") |
| | raise e |
| | try: |
| | gguf.quantize(np.zeros((gguf.GGML_QUANT_SIZES[qtype][0]), dtype=np.float32), qtype) |
| | has_quantize = True |
| | except (NotImplementedError, AssertionError) as e: |
| | if isinstance(e, AssertionError): |
| | logger.error(f"Error with {qtype.name}: {e}") |
| | raise e |
| |
|
| | if not has_dequantize and not has_quantize: |
| | continue |
| |
|
| | logger.info(f"Testing {qtype.name}") |
| |
|
| | rc = r.copy(order="C") |
| |
|
| | pyq = None |
| | ggq = None |
| |
|
| | if has_quantize: |
| | logger.debug(f"Quantizing to {qtype.name} with Python") |
| | pyq = gguf.quants.quantize(rc, qtype) |
| |
|
| | logger.debug(f"Quantizing to {qtype.name} with C") |
| | ggq = ggml_quants.quantize(rc, qtype) |
| |
|
| | if qtype == GGMLQuantizationType.F16: |
| | pyq = pyq.view(np.uint8) |
| | quant_equal = compare_tensors(pyq, ggq, qtype) |
| |
|
| | if not quant_equal: |
| | logger.error(f"Quantization to {qtype.name} does not match ❌") |
| | else: |
| | logger.info(f"Quantization to {qtype.name} matches exactly ✅") |
| |
|
| | if has_dequantize: |
| | if ggq is None and not quick: |
| | logger.debug(f"Quantizing to {qtype.name} with C") |
| | ggq = ggml_quants.quantize(rc, qtype) |
| |
|
| | if ggq is not None: |
| | logger.debug(f"Dequantizing from {qtype.name} with Python") |
| | pydq = gguf.quants.dequantize(ggq, qtype) |
| | logger.debug(f"Dequantizing from {qtype.name} with C") |
| | ggdq = ggml_quants.dequantize(ggq, qtype) |
| |
|
| | dequant_equal = compare_tensors(pydq, ggdq, qtype) |
| |
|
| | if not dequant_equal: |
| | logger.error(f"Dequantization from {qtype.name} does not match ❌") |
| | else: |
| | logger.info(f"Dequantization from {qtype.name} matches exactly ✅") |
| |
|
| | rq_shape = gguf.quants.quant_shape_to_byte_shape((8, 1024, 1024 // 2), qtype) |
| | rq = np.random.random(rq_shape).astype(np.float16).view(np.uint8) |
| |
|
| | logger.debug(f"Dequantizing random f16 data as {qtype.name} with Python") |
| | pydq = gguf.quants.dequantize(rq, qtype) |
| | logger.debug(f"Dequantizing random f16 data as {qtype.name} with C") |
| | ggdq = ggml_quants.dequantize(rq, qtype) |
| |
|
| | dequant_equal = compare_tensors(pydq, ggdq, qtype) |
| |
|
| | if not dequant_equal: |
| | logger.error(f"Dequantization from random f16 data as {qtype.name} does not match ❌") |
| | else: |
| | logger.info(f"Dequantization from random f16 data as {qtype.name} matches exactly ✅") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | parser = argparse.ArgumentParser(description="Test Python (de)quantization against the reference C implementation") |
| | parser.add_argument("--libggml", type=Path, default=Path(__file__).parent.parent.parent / "build" / "ggml" / "src" / "libggml.so", help="The path to libggml.so") |
| | parser.add_argument("--quick", action="store_true", help="Don't quantize with C when it's not strictly necessary") |
| |
|
| | args = parser.parse_args() |
| |
|
| | logging.basicConfig(level=logging.DEBUG) |
| |
|
| | do_test(args.libggml, args.quick) |
| |
|