llama.cpp 使用¶
安装¶
windows10 github release直接下载安装包,cuda12的,还要下载cuda12的算法包,并将结果放入解压后的安装路径下
llama-cli使用¶
# -grammaer-file 已经过时
llama-cli.exe' -m .\qwen2.5-3b-instruct-q4_k_m.gguf --grammar-file .\json.gbnf -sysf .\system-prompt.txt -temp 0.1
# 使用 --json-shcema-file -jf
llama-cli.exe' -m .\qwen2.5-3b-instruct-q4_k_m.gguf --json-schema-file .\openai-shcema.json -sysf .\system-prompt.txt -temp 0.1
# cnv 多轮对话
llama-cli.exe' -m .\qwen2.5-3b-instruct-q4_k_m.gguf --json-schema-file .\openai-shcema.json -sysf .\system-prompt.txt -temp 0.1 -cnv
llama-server使用¶
# 系统提示词 代码中动态添加, openai 会自动处理
llama-server.exe -m .\qwen2.5-3b-instruct-q4_k_m.gguf -jf .\schema.json --jinja
# 固定系统提示词
llama-server.exe -m .\qwen2.5-3b-instruct-q4_k_m.gguf -jf .\schema.json --jinja --sysf .\system-prompt.txt
# 只web上使用,则去掉 -jf
llama-server.exe -m .\qwen2.5-coder-3b-instruct-q5_k_m.gguf --jinja -jf .\schema.json --temp 0.7 --top-k 40 --top-p 0.9 -ngl 99 --ctx-size 4096 --batch-size 512 --ubatch-size 128 --repeat-penalty 1.1 --port 8054 -v --log-file qwen.log
llama 视觉模型加载¶
llama-cli -m Qwen2.5-VL-3B-Instruct-Q8_0.gguf --mmproj mmproj-Qwen2.5-VL-3B-Instruct-Q8_0.gguf -p "Describe this image." --image ./car-1.jpg
llama-server.exe -m Qwen2.5-VL-3B-Instruct-Q8_0.gguf --mmproj mmproj-Qwen2.5-VL-3B-Instruct-Q8_0.gguf --jinja --temp 0.7 --top-k 40 --top-p 0.9 -ngl 99 --ctx-size 4096 --batch-size 512 --ubatch-size 128 --repeat-penalty 1.1 --port 8054
llama-server 工作流程¶
1. 你的请求
{"messages": [{"role":"user", "content": [
{"type":"image_url", "image_url":{"url":"image/jpeg;base64,..."}},
{"type":"text", "text":"这张图有什么?"}
]}]}
2. llama-server 接收并解析
├─ 识别到 image_url → 解码为 numpy 图像数组
├─ 送入内置的 Vision Encoder (ViT) → 输出图像特征图
├─ 经过投影层 (MLP) → 转为视觉 token 序列 (e.g. 1024 个 token IDs)
└─ 读取 chat template,找到占位符 <|image_pad|>
3. Token 替换与拼接
原始模板输出: ...<|im_start|>user\n<|image_pad|>这张图有什么?<|im_end|>\n...
替换后序列: ...<|im_start|>user\n[1024个视觉token ID]这张图有什么?<|im_end|>\n...
4. 送入 LLM
拼接好的 input_ids + attention_mask → 前向推理 → 生成文本
python openai 格式api使用¶
import os
import subprocess
from dataclasses import dataclass
import json
try:
import readline
# #143 UTF-8 backspace fix for macOS libedit
readline.parse_and_bind('set bind-tty-special-chars off')
readline.parse_and_bind('set input-meta on')
readline.parse_and_bind('set output-meta on')
readline.parse_and_bind('set convert-meta off')
readline.parse_and_bind('set enable-meta-keybindings on')
except ImportError:
pass
from openai import OpenAI
from dotenv import load_dotenv
from pathlib import Path
load_dotenv(override=True)
WORKDIR = Path.cwd()
client = OpenAI(base_url=os.getenv("OPENAI_BASE_URL"), api_key=os.getenv("OPENAIC_API_KEY"))
MODEL = os.environ["MODEL_ID"]
SYSTEM = (
f"You are a coding agent at {WORKDIR}. Currently in a Windows environment. Terminal shell is cmd"
"Use bash to inspect and change the workspace. Act first, then report clearly."
)
TOOLS = [
{
"function": {
"description": ("在 Windows 环境中执行系统命令。"
"⚠️ 严格使用 CMD.exe 或 PowerShell 语法。"
"🚫 绝对禁止 Linux/bash 命令(如 ls, rm, cat, grep, curl, ps, chmod, awk 等)。"
"✅ 必须使用 Windows 原生命令或Powershell命令(如 dir, del, type, findstr, ping, tasklist, Get-Process 等)。"
"📝 路径使用反斜杠 \\,注意空格和引号转义。"),
"name": "bash",
"parameters": {
"properties": {
"command": {
"type": "string"
}
},
"required": [
"command"
],
"type": "object"
}
},
"type": "function"
},
{
"function": {
"description": "读文件内容",
"name": "read_file",
"parameters": {
"properties": {
"limit": {
"type": "integer"
},
"path": {
"type": "string"
}
},
"required": [
"path"
],
"type": "object"
}
},
"type": "function"
},
{
"function": {
"description": "写内容到文件",
"name": "write_file",
"parameters": {
"properties": {
"content": {
"type": "string"
},
"path": {
"type": "string"
}
},
"required": [
"path",
"content"
],
"type": "object"
}
},
"type": "function"
},
{
"function": {
"description": "替换文件中的确切文本",
"name": "edit_file",
"parameters": {
"properties": {
"new_text": {
"type": "string"
},
"old_text": {
"type": "string"
},
"path": {
"type": "string"
}
},
"required": [
"path",
"old_text",
"new_text"
],
"type": "object"
}
},
"type": "function"
}
]
def safe_path(p: str) -> Path:
path = (WORKDIR / p).resolve()
if not path.is_relative_to(WORKDIR):
raise ValueError(f"Path escapes workspace: {p}")
return path
def run_read(path: str, limit: int = None) -> str:
try:
text = safe_path(path).read_text(encoding="utf-8")
lines = text.splitlines()
if limit and limit < len(lines):
lines = lines[:limit] + [f"... ({len(lines) - limit} more lines)"]
return "\n".join(lines)[:50000]
except Exception as e:
return f"Error: {e}"
def run_write(path: str, content: str) -> str:
try:
fp = safe_path(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content, encoding="utf-8")
return f"Wrote {len(content)} bytes to {path}"
except Exception as e:
return f"Error: {e}"
def run_edit(path: str, old_text: str, new_text: str) -> str:
try:
fp = safe_path(path)
content = fp.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: Text not found in {path}"
fp.write_text(content.replace(old_text, new_text, 1))
return f"Edited {path}"
except Exception as e:
return f"Error: {e}"
CONCURRENCY_SAFE = {"read_file"}
CONCURRENCY_UNSAFE = {"write_file", "edit_file"}
# -- The dispatch map: {tool_name: handler} --
TOOL_HANDLERS = {
"bash": lambda **kw: run_bash(kw["command"]),
"read_file": lambda **kw: run_read(kw["path"], kw.get("limit")),
"write_file": lambda **kw: run_write(kw["path"], kw["content"]),
"edit_file": lambda **kw: run_edit(kw["path"], kw["old_text"], kw["new_text"]),
}
@dataclass
class LoopState:
# The minimal loop state: history, loop count, and why we continue.
messages: list
turn_count: int = 1
transition_reason: str | None = None
def run_bash(command: str) -> str:
dangerous = ["rm -rf /", "sudo", "shutdown", "reboot", "> /dev/"]
if any(item in command for item in dangerous):
return "Error: Dangerous command blocked"
try:
result = subprocess.run(
["powershell", "-Command", command],
cwd=os.getcwd(),
capture_output=True, text=True, timeout=120,
encoding="gbk", errors="ignore" # Windows 默认编码
)
# result = subprocess.run(
# command,
# shell=True,
# cwd=os.getcwd(),
# capture_output=True,
# text=True,
# timeout=120,
# )
except subprocess.TimeoutExpired:
return "Error: Timeout (120s)"
except (FileNotFoundError, OSError) as e:
return f"Error: {e}"
output = (result.stdout + result.stderr).strip()
return output[:50000] if output else "(no output)"
def execute_tool_calls(tool_calls) -> list[dict]:
results = []
for tc in tool_calls:
args = json.loads(tc.function.arguments)
print(f"\033[33mtool: {tc.function.name} {tc.function.arguments} \033[0m")
handler = TOOL_HANDLERS.get(tc.function.name)
output = handler(**args) if handler else f"Unknown tool: {tc.function.name}"
print(output[:200])
print()
results.append({
"role": "tool",
"tool_call_id": tc.id,
"content": output,
})
return results
def run_one_turn(state: LoopState) -> bool:
response = client.chat.completions.create(
model=MODEL,
messages=state.messages,
tools=TOOLS,
tool_choice="auto",
temperature=0.1,
max_tokens=1024,
)
choice = response.choices[0]
msg = choice.message
if choice.finish_reason != "tool_calls":
state.transition_reason = None
state.messages.append({"role": "assistant", "content": msg.content})
return False
# state.messages.append({"role": "assistant", "tool_calls": []})
# state.messages.append(msg)
state.messages.append(msg.model_dump())
results = execute_tool_calls(msg.tool_calls)
if not results:
state.transition_reason = None
return False
state.messages.extend(results)
state.turn_count += 1
state.transition_reason = "tool_result"
return True
def agent_loop(state: LoopState) -> None:
while run_one_turn(state):
pass
if __name__ == "__main__":
history = []
history.append({"role": "system", "content": SYSTEM})
while True:
try:
query = input("\033[36ms01 >> \033[0m")
except (EOFError, KeyboardInterrupt):
break
if query.strip().lower() in ("q", "exit", ""):
break
history.append({"role": "user", "content": query})
state = LoopState(messages=history)
agent_loop(state)
final_text = history[-1]["content"]
if final_text:
print(final_text)
print()