aboutsummaryrefslogtreecommitdiff
path: root/llama.cpp/gguf-py/gguf/utility.py
blob: 154351d8ed63c50f8a66cc0bb9152ec020414077 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path
from typing import Literal

import os
import json
import numpy as np


def fill_templated_filename(filename: str, output_type: str | None) -> str:
    # Given a file name fill in any type templates e.g. 'some-model-name.{ftype}.gguf'
    ftype_lowercase: str = output_type.lower() if output_type is not None else ""
    ftype_uppercase: str = output_type.upper() if output_type is not None else ""
    return filename.format(ftype_lowercase,
                           outtype=ftype_lowercase, ftype=ftype_lowercase,
                           OUTTYPE=ftype_uppercase, FTYPE=ftype_uppercase)


def model_weight_count_rounded_notation(model_params_count: int, min_digits: int = 2) -> str:
    if model_params_count > 1e12 :
        # Trillions Of Parameters
        scaled_model_params = model_params_count * 1e-12
        scale_suffix = "T"
    elif model_params_count > 1e9 :
        # Billions Of Parameters
        scaled_model_params = model_params_count * 1e-9
        scale_suffix = "B"
    elif model_params_count > 1e6 :
        # Millions Of Parameters
        scaled_model_params = model_params_count * 1e-6
        scale_suffix = "M"
    else:
        # Thousands Of Parameters
        scaled_model_params = model_params_count * 1e-3
        scale_suffix = "K"

    fix = max(min_digits - len(str(round(scaled_model_params)).lstrip('0')), 0)

    return f"{scaled_model_params:.{fix}f}{scale_suffix}"


def size_label(total_params: int, shared_params: int, expert_params: int, expert_count: int) -> str:

    if expert_count > 0:
        pretty_size = model_weight_count_rounded_notation(abs(shared_params) + abs(expert_params), min_digits=2)
        size_class = f"{expert_count}x{pretty_size}"
    else:
        size_class = model_weight_count_rounded_notation(abs(total_params), min_digits=2)

    return size_class


def naming_convention(model_name: str | None, base_name: str | None, finetune_string: str | None, version_string: str | None, size_label: str | None, output_type: str | None, model_type: Literal['vocab', 'LoRA'] | None = None) -> str:
    # Reference: https://github.com/ggml-org/ggml/blob/master/docs/gguf.md#gguf-naming-convention

    if base_name is not None:
        name = base_name.strip().replace(' ', '-').replace('/', '-')
    elif model_name is not None:
        name = model_name.strip().replace(' ', '-').replace('/', '-')
    else:
        name = "ggml-model"

    parameters = f"-{size_label}" if size_label is not None else ""

    finetune = f"-{finetune_string.strip().replace(' ', '-')}" if finetune_string is not None else ""

    version = f"-{version_string.strip().replace(' ', '-')}" if version_string is not None else ""

    encoding = f"-{output_type.strip().replace(' ', '-').upper()}" if output_type is not None else ""

    kind = f"-{model_type.strip().replace(' ', '-')}" if model_type is not None else ""

    return f"{name}{parameters}{finetune}{version}{encoding}{kind}"


@dataclass
class RemoteTensor:
    dtype: str
    shape: tuple[int, ...]
    offset_start: int
    size: int
    url: str

    def data(self) -> bytearray:
        # TODO: handle request errors (maybe with limited retries?)
        # NOTE: using a bytearray, otherwise PyTorch complains the buffer is not writeable
        data = bytearray(SafetensorRemote.get_data_by_range(url=self.url, start=self.offset_start, size=self.size))
        return data


class SafetensorRemote:
    """
    Uility class to handle remote safetensor files.
    This class is designed to work with Hugging Face model repositories.

    Example (one model has single safetensor file, the other has multiple):
        for model_id in ["ngxson/TEST-Tiny-Llama4", "Qwen/Qwen2.5-7B-Instruct"]:
            tensors = SafetensorRemote.get_list_tensors_hf_model(model_id)
            print(tensors)

    Example reading tensor data:
        tensors = SafetensorRemote.get_list_tensors_hf_model(model_id)
        for name, meta in tensors.items():
            dtype, shape, offset_start, size, remote_safetensor_url = meta
            # read the tensor data
            data = SafetensorRemote.get_data_by_range(remote_safetensor_url, offset_start, size)
            print(data)
    """

    BASE_DOMAIN = "https://huggingface.co"

    @classmethod
    def get_list_tensors_hf_model(cls, model_id: str) -> dict[str, RemoteTensor]:
        """
        Get list of tensors from a Hugging Face model repository.

        Returns a dictionary of tensor names and their metadata.
        Each tensor is represented as a tuple of (dtype, shape, offset_start, size, remote_safetensor_url)
        """
        # case 1: model has only one single model.safetensor file
        is_single_file = cls.check_file_exist(f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors")
        if is_single_file:
            url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors"
            return cls.get_list_tensors(url)

        # case 2: model has multiple files
        index_url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/model.safetensors.index.json"
        is_multiple_files = cls.check_file_exist(index_url)
        if is_multiple_files:
            # read the index file
            index_data = cls.get_data_by_range(index_url, 0)
            index_str = index_data.decode('utf-8')
            index_json = json.loads(index_str)
            assert index_json.get("weight_map") is not None, "weight_map not found in index file"
            weight_map = index_json["weight_map"]
            # get the list of files
            all_files = list(set(weight_map.values()))
            all_files.sort() # make sure we load shard files in order
            # get the list of tensors
            tensors: dict[str, RemoteTensor] = {}
            for file in all_files:
                url = f"{cls.BASE_DOMAIN}/{model_id}/resolve/main/{file}"
                for key, val in cls.get_list_tensors(url).items():
                    tensors[key] = val
            return tensors

        raise ValueError(
            f"No safetensor file has been found for model {model_id}."
            "If the repo has safetensor files, make sure the model is public or you have a "
            "valid Hugging Face token set in the environment variable HF_TOKEN."
        )

    @classmethod
    def get_list_tensors(cls, url: str) -> dict[str, RemoteTensor]:
        """
        Get list of tensors from a remote safetensor file.

        Returns a dictionary of tensor names and their metadata.
        Each tensor is represented as a tuple of (dtype, shape, offset_start, size)
        """
        metadata, data_start_offset = cls.get_metadata(url)
        res: dict[str, RemoteTensor] = {}

        for name, meta in metadata.items():
            if name == "__metadata__":
                continue
            if not isinstance(meta, dict):
                raise ValueError(f"Invalid metadata for tensor '{name}': {meta}")
            try:
                dtype = meta["dtype"]
                shape = meta["shape"]
                offset_start_relative, offset_end_relative = meta["data_offsets"]
                size = offset_end_relative - offset_start_relative
                offset_start = data_start_offset + offset_start_relative
                res[name] = RemoteTensor(dtype=dtype, shape=tuple(shape), offset_start=offset_start, size=size, url=url)
            except KeyError as e:
                raise ValueError(f"Missing key in metadata for tensor '{name}': {e}, meta = {meta}")

        # order by name (same as default safetensors behavior)
        # ref: https://github.com/huggingface/safetensors/blob/0816a1ae1d6b731cefd67f061d80d1cadd0dd7bb/bindings/python/src/lib.rs#L606
        res = dict(sorted(res.items(), key=lambda t: t[0]))

        return res

    @classmethod
    def get_metadata(cls, url: str) -> tuple[dict, int]:
        """
        Get JSON metadata from a remote safetensor file.

        Returns tuple of (metadata, data_start_offset)
        """
        # Request first 5MB of the file (hopefully enough for metadata)
        read_size = 5 * 1024 * 1024
        raw_data = cls.get_data_by_range(url, 0, read_size)

        # Parse header
        # First 8 bytes contain the metadata length as u64 little-endian
        if len(raw_data) < 8:
            raise ValueError("Not enough data to read metadata size")
        metadata_length = int.from_bytes(raw_data[:8], byteorder='little')

        # Calculate the data start offset
        data_start_offset = 8 + metadata_length

        # Check if we have enough data to read the metadata
        if len(raw_data) < 8 + metadata_length:
            raise ValueError(f"Could not read complete metadata. Need {8 + metadata_length} bytes, got {len(raw_data)}")

        # Extract metadata bytes and parse as JSON
        metadata_bytes = raw_data[8:8 + metadata_length]
        metadata_str = metadata_bytes.decode('utf-8')
        try:
            metadata = json.loads(metadata_str)
            return metadata, data_start_offset
        except json.JSONDecodeError as e:
            raise ValueError(f"Failed to parse safetensor metadata as JSON: {e}")

    @classmethod
    def get_data_by_range(cls, url: str, start: int, size: int = -1) -> bytes:
        """
        Get raw byte data from a remote file by range.
        If size is not specified, it will read the entire file.
        """
        import requests
        from urllib.parse import urlparse

        parsed_url = urlparse(url)
        if not parsed_url.scheme or not parsed_url.netloc:
            raise ValueError(f"Invalid URL: {url}")

        headers = cls._get_request_headers()
        if size > -1:
            headers["Range"] = f"bytes={start}-{start + size}"
        response = requests.get(url, allow_redirects=True, headers=headers)
        response.raise_for_status()

        # Get raw byte data
        return response.content[slice(size if size > -1 else None)]

    @classmethod
    def check_file_exist(cls, url: str) -> bool:
        """
        Check if a file exists at the given URL.
        Returns True if the file exists, False otherwise.
        """
        import requests
        from urllib.parse import urlparse

        parsed_url = urlparse(url)
        if not parsed_url.scheme or not parsed_url.netloc:
            raise ValueError(f"Invalid URL: {url}")

        try:
            headers = cls._get_request_headers()
            headers["Range"] = "bytes=0-0"
            response = requests.head(url, allow_redirects=True, headers=headers)
            # Success (2xx) or redirect (3xx)
            return 200 <= response.status_code < 400
        except requests.RequestException:
            return False

    @classmethod
    def _get_request_headers(cls) -> dict[str, str]:
        """Prepare common headers for requests."""
        headers = {"User-Agent": "convert_hf_to_gguf"}
        if os.environ.get("HF_TOKEN"):
            headers["Authorization"] = f"Bearer {os.environ['HF_TOKEN']}"
        return headers


@dataclass
class LocalTensorRange:
    filename: Path
    offset: int
    size: int


@dataclass
class LocalTensor:
    dtype: str
    shape: tuple[int, ...]
    data_range: LocalTensorRange

    def mmap_bytes(self) -> np.ndarray:
        return np.memmap(self.data_range.filename, mode='c', offset=self.data_range.offset, shape=self.data_range.size)


class SafetensorsLocal:
    """
        Read a safetensors file from the local filesystem.

        Custom parsing gives a bit more control over the memory usage.
        The official safetensors library doesn't expose file ranges.
    """

    tensors: dict[str, LocalTensor]

    def __init__(self, filename: Path):
        with open(filename, "rb") as f:
            metadata_length = int.from_bytes(f.read(8), byteorder='little')
            file_size = os.stat(filename).st_size
            if file_size < 8 + metadata_length:
                raise ValueError(f"Could not read complete metadata. Need {8 + metadata_length} bytes, got {file_size}")

            metadata_str = f.read(metadata_length).decode('utf-8')
            try:
                metadata = json.loads(metadata_str)
            except json.JSONDecodeError as e:
                raise ValueError(f"Failed to parse safetensors metadata as JSON: {e}")

            data_start_offset = f.tell()

            tensors: dict[str, LocalTensor] = {}
            for name, meta in metadata.items():
                if name == "__metadata__":
                    # ignore metadata, it's not a tensor
                    continue

                tensors[name] = LocalTensor(
                    dtype=meta["dtype"],
                    shape=tuple(meta["shape"]),
                    data_range=LocalTensorRange(
                        filename,
                        data_start_offset + meta["data_offsets"][0],
                        meta["data_offsets"][1] - meta["data_offsets"][0],
                    ),
                )

            # order by name (same as default safetensors behavior)
            # ref: https://github.com/huggingface/safetensors/blob/0816a1ae1d6b731cefd67f061d80d1cadd0dd7bb/bindings/python/src/lib.rs#L606
            self.tensors = dict(sorted(tensors.items(), key=lambda t: t[0]))

    def __enter__(self, *args, **kwargs):
        del args, kwargs  # unused
        return self.tensors

    def __exit__(self, *args, **kwargs):
        del args, kwargs  # unused