summaryrefslogtreecommitdiff
path: root/llama.cpp/gguf-py/gguf/utility.py
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-02-12 20:57:17 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-02-12 20:57:17 +0100
commitb333b06772c89d96aacb5490d6a219fba7c09cc6 (patch)
tree211df60083a5946baa2ed61d33d8121b7e251b06 /llama.cpp/gguf-py/gguf/utility.py
downloadllmnpc-b333b06772c89d96aacb5490d6a219fba7c09cc6.tar.gz
Engage!
Diffstat (limited to 'llama.cpp/gguf-py/gguf/utility.py')
-rw-r--r--llama.cpp/gguf-py/gguf/utility.py340
1 files changed, 340 insertions, 0 deletions
diff --git a/llama.cpp/gguf-py/gguf/utility.py b/llama.cpp/gguf-py/gguf/utility.py
new file mode 100644
index 0000000..154351d
--- /dev/null
+++ b/llama.cpp/gguf-py/gguf/utility.py
@@ -0,0 +1,340 @@
+from __future__ import annotations
+
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Literal
+
+import os
+import json
+import numpy as np
+
+
+def fill_templated_filename(filename: str, output_type: str | None) -> str:
+ # Given a file name fill in any type templates e.g. 'some-model-name.{ftype}.gguf'
+ ftype_lowercase: str = output_type.lower() if output_type is not None else ""
+ ftype_uppercase: str = output_type.upper() if output_type is not None else ""
+ return filename.format(ftype_lowercase,
+ outtype=ftype_lowercase, ftype=ftype_lowercase,
+ OUTTYPE=ftype_uppercase, FTYPE=ftype_uppercase)
+
+
+def model_weight_count_rounded_notation(model_params_count: int, min_digits: int = 2) -> str:
+ if model_params_count > 1e12 :
+ # Trillions Of Parameters
+ scaled_model_params = model_params_count * 1e-12
+ scale_suffix = "T"
+ elif model_params_count > 1e9 :
+ # Billions Of Parameters
+ scaled_model_params = model_params_count * 1e-9
+ scale_suffix = "B"
+ elif model_params_count > 1e6 :
+ # Millions Of Parameters
+ scaled_model_params = model_params_count * 1e-6
+ scale_suffix = "M"
+ else:
+ # Thousands Of Parameters
+ scaled_model_params = model_params_count * 1e-3
+ scale_suffix = "K"
+
+ fix = max(min_digits - len(str(round(scaled_model_params)).lstrip('0')), 0)
+
+ return f"{scaled_model_params:.{fix}f}{scale_suffix}"
+
+
+def size_label(total_params: int, shared_params: int, expert_params: int, expert_count: int) -> str:
+
+ if expert_count > 0:
+ pretty_size = model_weight_count_rounded_notation(abs(shared_params) + abs(expert_params), min_digits=2)
+ size_class = f"{expert_count}x{pretty_size}"
+ else:
+ size_class = model_weight_count_rounded_notation(abs(total_params), min_digits=2)
+
+ return size_class
+
+
+def naming_convention(model_name: str | None, base_name: str | None, finetune_string: str | None, version_string: str | None, size_label: str | None, output_type: str | None, model_type: Literal['vocab', 'LoRA'] | None = None) -> str:
+ # Reference: https://github.com/ggml-org/ggml/blob/master/docs/gguf.md#gguf-naming-convention
+
+ if base_name is not None:
+ name = base_name.strip().replace(' ', '-').replace('/', '-')
+ elif model_name is not None:
+ name = model_name.strip().replace(' ', '-').replace('/', '-')
+ else:
+ name = "ggml-model"
+
+ parameters = f"-{size_label}" if size_label is not None else ""
+
+ finetune = f"-{finetune_string.strip().replace(' ', '-')}" if finetune_string is not None else ""
+
+ version = f"-{version_string.strip().replace(' ', '-')}" if version_string is not None else ""
+
+ encoding = f"-{output_type.strip().replace(' ', '-').upper()}" if output_type is not None else ""
+
+ kind = f"-{model_type.strip().replace(' ', '-')}" if model_type is not None else ""
+
+ return f"{name}{parameters}{finetune}{version}{encoding}{kind}"
+
+
+@dataclass
+class RemoteTensor:
+ dtype: str
+ shape: tuple[int, ...]
+ offset_start: int
+ size: int
+ url: str
+
+ def data(self) -> bytearray:
+ # TODO: handle request errors (maybe with limited retries?)
+ # NOTE: using a bytearray, otherwise PyTorch complains the buffer is not writeable
+ data = bytearray(SafetensorRemote.get_data_by_range(url=self.url, start=self.offset_start, size=self.size))
+ return data
+
+
+class SafetensorRemote:
+ """
+ Uility class to handle remote safetensor files.
+ This class is designed to work with Hugging Face model repositories.
+
+ Example (one model has single safetensor file, the other has multiple):
+ for model_id in ["ngxson/TEST-Tiny-Llama4", "Qwen/Qwen2.5-7B-Instruct"]:
+ tensors = SafetensorRemote.get_list_tensors_hf_model(model_id)
+ print(tensors)
+
+ Example reading tensor data:
+ tensors = SafetensorRemote.get_list_tensors_hf_model(model_id)
+ for name, meta in tensors.items():
+ dtype, shape, offset_start, size, remote_safetensor_url = meta
+ # read the tensor data
+ data = SafetensorRemote.get_data_by_range(remote_safetensor_url, offset_start, size)
+ print(data)
+ """
+
+ BASE_DOMAIN = "https://huggingface.co"
+
+ @classmethod
+ def get_list_tensors_hf_model(cls, model_id: str) -> dict[str, RemoteTensor]:
+ """
+ Get list of tensors from a Hugging Face model repository.
+
+ Returns a dictionary of tensor names and their metadata.
+ Each tensor is represented as a tuple of (dtype, shape, offset_start, size, remote_safetensor_url)
+ """
+ # case 1: model has only one single model.safetensor file
+ is_single_file = cls.check_file_exist(f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors")
+ if is_single_file:
+ url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors"
+ return cls.get_list_tensors(url)
+
+ # case 2: model has multiple files
+ index_url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors.index.json"
+ is_multiple_files = cls.check_file_exist(index_url)
+ if is_multiple_files:
+ # read the index file
+ index_data = cls.get_data_by_range(index_url, 0)
+ index_str = index_data.decode('utf-8')
+ index_json = json.loads(index_str)
+ assert index_json.get("weight_map") is not None, "weight_map not found in index file"
+ weight_map = index_json["weight_map"]
+ # get the list of files
+ all_files = list(set(weight_map.values()))
+ all_files.sort() # make sure we load shard files in order
+ # get the list of tensors
+ tensors: dict[str, RemoteTensor] = {}
+ for file in all_files:
+ url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/{file}"
+ for key, val in cls.get_list_tensors(url).items():
+ tensors[key] = val
+ return tensors
+
+ raise ValueError(
+ f"No safetensor file has been found for model {model_id}."
+ "If the repo has safetensor files, make sure the model is public or you have a "
+ "valid Hugging Face token set in the environment variable HF_TOKEN."
+ )
+
+ @classmethod
+ def get_list_tensors(cls, url: str) -> dict[str, RemoteTensor]:
+ """
+ Get list of tensors from a remote safetensor file.
+
+ Returns a dictionary of tensor names and their metadata.
+ Each tensor is represented as a tuple of (dtype, shape, offset_start, size)
+ """
+ metadata, data_start_offset = cls.get_metadata(url)
+ res: dict[str, RemoteTensor] = {}
+
+ for name, meta in metadata.items():
+ if name == "__metadata__":
+ continue
+ if not isinstance(meta, dict):
+ raise ValueError(f"Invalid metadata for tensor '{name}': {meta}")
+ try:
+ dtype = meta["dtype"]
+ shape = meta["shape"]
+ offset_start_relative, offset_end_relative = meta["data_offsets"]
+ size = offset_end_relative - offset_start_relative
+ offset_start = data_start_offset + offset_start_relative
+ res[name] = RemoteTensor(dtype=dtype, shape=tuple(shape), offset_start=offset_start, size=size, url=url)
+ except KeyError as e:
+ raise ValueError(f"Missing key in metadata for tensor '{name}': {e}, meta = {meta}")
+
+ # order by name (same as default safetensors behavior)
+ # ref: https://github.com/huggingface/safetensors/blob/0816a1ae1d6b731cefd67f061d80d1cadd0dd7bb/bindings/python/src/lib.rs#L606
+ res = dict(sorted(res.items(), key=lambda t: t[0]))
+
+ return res
+
+ @classmethod
+ def get_metadata(cls, url: str) -> tuple[dict, int]:
+ """
+ Get JSON metadata from a remote safetensor file.
+
+ Returns tuple of (metadata, data_start_offset)
+ """
+ # Request first 5MB of the file (hopefully enough for metadata)
+ read_size = 5 * 1024 * 1024
+ raw_data = cls.get_data_by_range(url, 0, read_size)
+
+ # Parse header
+ # First 8 bytes contain the metadata length as u64 little-endian
+ if len(raw_data) < 8:
+ raise ValueError("Not enough data to read metadata size")
+ metadata_length = int.from_bytes(raw_data[:8], byteorder='little')
+
+ # Calculate the data start offset
+ data_start_offset = 8 + metadata_length
+
+ # Check if we have enough data to read the metadata
+ if len(raw_data) < 8 + metadata_length:
+ raise ValueError(f"Could not read complete metadata. Need {8 + metadata_length} bytes, got {len(raw_data)}")
+
+ # Extract metadata bytes and parse as JSON
+ metadata_bytes = raw_data[8:8 + metadata_length]
+ metadata_str = metadata_bytes.decode('utf-8')
+ try:
+ metadata = json.loads(metadata_str)
+ return metadata, data_start_offset
+ except json.JSONDecodeError as e:
+ raise ValueError(f"Failed to parse safetensor metadata as JSON: {e}")
+
+ @classmethod
+ def get_data_by_range(cls, url: str, start: int, size: int = -1) -> bytes:
+ """
+ Get raw byte data from a remote file by range.
+ If size is not specified, it will read the entire file.
+ """
+ import requests
+ from urllib.parse import urlparse
+
+ parsed_url = urlparse(url)
+ if not parsed_url.scheme or not parsed_url.netloc:
+ raise ValueError(f"Invalid URL: {url}")
+
+ headers = cls._get_request_headers()
+ if size > -1:
+ headers["Range"] = f"bytes={start}-{start + size}"
+ response = requests.get(url, allow_redirects=True, headers=headers)
+ response.raise_for_status()
+
+ # Get raw byte data
+ return response.content[slice(size if size > -1 else None)]
+
+ @classmethod
+ def check_file_exist(cls, url: str) -> bool:
+ """
+ Check if a file exists at the given URL.
+ Returns True if the file exists, False otherwise.
+ """
+ import requests
+ from urllib.parse import urlparse
+
+ parsed_url = urlparse(url)
+ if not parsed_url.scheme or not parsed_url.netloc:
+ raise ValueError(f"Invalid URL: {url}")
+
+ try:
+ headers = cls._get_request_headers()
+ headers["Range"] = "bytes=0-0"
+ response = requests.head(url, allow_redirects=True, headers=headers)
+ # Success (2xx) or redirect (3xx)
+ return 200 <= response.status_code < 400
+ except requests.RequestException:
+ return False
+
+ @classmethod
+ def _get_request_headers(cls) -> dict[str, str]:
+ """Prepare common headers for requests."""
+ headers = {"User-Agent": "convert_hf_to_gguf"}
+ if os.environ.get("HF_TOKEN"):
+ headers["Authorization"] = f"Bearer {os.environ['HF_TOKEN']}"
+ return headers
+
+
+@dataclass
+class LocalTensorRange:
+ filename: Path
+ offset: int
+ size: int
+
+
+@dataclass
+class LocalTensor:
+ dtype: str
+ shape: tuple[int, ...]
+ data_range: LocalTensorRange
+
+ def mmap_bytes(self) -> np.ndarray:
+ return np.memmap(self.data_range.filename, mode='c', offset=self.data_range.offset, shape=self.data_range.size)
+
+
+class SafetensorsLocal:
+ """
+ Read a safetensors file from the local filesystem.
+
+ Custom parsing gives a bit more control over the memory usage.
+ The official safetensors library doesn't expose file ranges.
+ """
+
+ tensors: dict[str, LocalTensor]
+
+ def __init__(self, filename: Path):
+ with open(filename, "rb") as f:
+ metadata_length = int.from_bytes(f.read(8), byteorder='little')
+ file_size = os.stat(filename).st_size
+ if file_size < 8 + metadata_length:
+ raise ValueError(f"Could not read complete metadata. Need {8 + metadata_length} bytes, got {file_size}")
+
+ metadata_str = f.read(metadata_length).decode('utf-8')
+ try:
+ metadata = json.loads(metadata_str)
+ except json.JSONDecodeError as e:
+ raise ValueError(f"Failed to parse safetensors metadata as JSON: {e}")
+
+ data_start_offset = f.tell()
+
+ tensors: dict[str, LocalTensor] = {}
+ for name, meta in metadata.items():
+ if name == "__metadata__":
+ # ignore metadata, it's not a tensor
+ continue
+
+ tensors[name] = LocalTensor(
+ dtype=meta["dtype"],
+ shape=tuple(meta["shape"]),
+ data_range=LocalTensorRange(
+ filename,
+ data_start_offset + meta["data_offsets"][0],
+ meta["data_offsets"][1] - meta["data_offsets"][0],
+ ),
+ )
+
+ # order by name (same as default safetensors behavior)
+ # ref: https://github.com/huggingface/safetensors/blob/0816a1ae1d6b731cefd67f061d80d1cadd0dd7bb/bindings/python/src/lib.rs#L606
+ self.tensors = dict(sorted(tensors.items(), key=lambda t: t[0]))
+
+ def __enter__(self, *args, **kwargs):
+ del args, kwargs # unused
+ return self.tensors
+
+ def __exit__(self, *args, **kwargs):
+ del args, kwargs # unused