1#!/usr/bin/env python3
2
3import argparse
4import os
5import sys
6import importlib
7import torch
8import numpy as np
9
10from transformers import AutoTokenizer, AutoModelForCausalLM, AutoModelForImageTextToText, AutoConfig
11
12# Add parent directory to path for imports
13sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
14from utils.common import debug_hook, save_output_data
15
16def parse_arguments():
17 parser = argparse.ArgumentParser(description="Process model with specified path")
18 parser.add_argument("--model-path", "-m", help="Path to the model")
19 parser.add_argument("--prompt-file", "-f", help="Optional prompt file", required=False)
20 parser.add_argument("--verbose", "-v", action="store_true", help="Enable verbose debug output")
21 parser.add_argument("--device", "-d", help="Device to use (cpu, cuda, mps, auto)", default="auto")
22 return parser.parse_args()
23
24def load_model_and_tokenizer(model_path, device="auto"):
25 print("Loading model and tokenizer using AutoTokenizer:", model_path)
26 tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
27 config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)
28 multimodal = False
29 full_config = config
30
31 # Determine device_map based on device argument
32 if device == "cpu":
33 device_map = {"": "cpu"}
34 print("Forcing CPU usage")
35 elif device == "auto":
36 device_map = "auto"
37 else:
38 device_map = {"": device}
39
40 print("Model type: ", config.model_type)
41 if "vocab_size" not in config and "text_config" in config:
42 config = config.text_config
43 multimodal = True
44
45 print("Vocab size: ", config.vocab_size)
46 print("Hidden size: ", config.hidden_size)
47 print("Number of layers: ", config.num_hidden_layers)
48 print("BOS token id: ", config.bos_token_id)
49 print("EOS token id: ", config.eos_token_id)
50
51 unreleased_model_name = os.getenv("UNRELEASED_MODEL_NAME")
52 if unreleased_model_name:
53 model_name_lower = unreleased_model_name.lower()
54 unreleased_module_path = (
55 f"transformers.models.{model_name_lower}.modular_{model_name_lower}"
56 )
57 class_name = f"{unreleased_model_name}ForCausalLM"
58 print(f"Importing unreleased model module: {unreleased_module_path}")
59
60 try:
61 model_class = getattr(importlib.import_module(unreleased_module_path), class_name)
62 model = model_class.from_pretrained(
63 model_path,
64 device_map=device_map,
65 offload_folder="offload",
66 trust_remote_code=True,
67 config=config
68 )
69 except (ImportError, AttributeError) as e:
70 print(f"Failed to import or load model: {e}")
71 exit(1)
72 else:
73 if multimodal:
74 model = AutoModelForImageTextToText.from_pretrained(
75 model_path,
76 device_map=device_map,
77 offload_folder="offload",
78 trust_remote_code=True,
79 config=full_config
80 )
81 else:
82 model = AutoModelForCausalLM.from_pretrained(
83 model_path,
84 device_map=device_map,
85 offload_folder="offload",
86 trust_remote_code=True,
87 config=config
88 )
89
90 print(f"Model class: {model.__class__.__name__}")
91
92 return model, tokenizer, config
93
94def enable_torch_debugging(model):
95 for name, module in model.named_modules():
96 if len(list(module.children())) == 0: # only leaf modules
97 module.register_forward_hook(debug_hook(name))
98
99def get_prompt(args):
100 if args.prompt_file:
101 with open(args.prompt_file, encoding='utf-8') as f:
102 return f.read()
103 elif os.getenv("MODEL_TESTING_PROMPT"):
104 return os.getenv("MODEL_TESTING_PROMPT")
105 else:
106 return "Hello, my name is"
107
108def main():
109 args = parse_arguments()
110 model_path = os.environ.get("MODEL_PATH", args.model_path)
111 if model_path is None:
112 print("Error: Model path must be specified either via --model-path argument or MODEL_PATH environment variable")
113 sys.exit(1)
114
115
116 model, tokenizer, config = load_model_and_tokenizer(model_path, args.device)
117
118 if args.verbose:
119 enable_torch_debugging(model)
120
121 model_name = os.path.basename(model_path)
122
123 # Iterate over the model parameters (the tensors) and get the first one
124 # and use it to get the device the model is on.
125 device = next(model.parameters()).device
126 prompt = get_prompt(args)
127 input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device)
128 token_ids = input_ids[0].cpu().tolist()
129
130 print(f"Input tokens: {input_ids}")
131 print(f"Input text: {repr(prompt)}")
132 print(f"Tokenized: {tokenizer.convert_ids_to_tokens(input_ids[0])}")
133
134 batch_size = 512
135
136 with torch.no_grad():
137 past = None
138 outputs = None
139 for i in range(0, input_ids.size(1), batch_size):
140 print(f"Processing chunk with tokens {i} to {i + batch_size}")
141 chunk = input_ids[:, i:i + batch_size]
142 outputs = model(chunk.to(model.device), past_key_values=past, use_cache=True)
143 past = outputs.past_key_values
144
145 logits = outputs.logits # type: ignore
146
147 # Extract logits for the last token (next token prediction)
148 last_logits = logits[0, -1, :].float().cpu().numpy()
149
150 print(f"Logits shape: {logits.shape}")
151 print(f"Last token logits shape: {last_logits.shape}")
152 print(f"Vocab size: {len(last_logits)}")
153
154 # Print some sample logits for quick verification
155 print(f"First 10 logits: {last_logits[:10]}")
156 print(f"Last 10 logits: {last_logits[-10:]}")
157
158 # Show top 5 predicted tokens
159 top_indices = np.argsort(last_logits)[-5:][::-1]
160 print("Top 5 predictions:")
161 for idx in top_indices:
162 token = tokenizer.decode([idx])
163 print(f" Token {idx} ({repr(token)}): {last_logits[idx]:.6f}")
164
165 save_output_data(last_logits, token_ids, prompt, model_name)
166
167if __name__ == "__main__":
168 main()