import sqlite3 from pathlib import Path DB_PATH = Path(__file__).parent / "mcu.db" def init_db(): conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS mcu ( id INTEGER PRIMARY KEY AUTOINCREMENT, -- ID name TEXT NOT NULL, -- 名称 core_num TEXT NOT NULL, -- 核心数量 core_type TEXT NOT NULL, -- 核心架构 instruction_set TEXT, -- 指令集架构 coremark TEXT, -- CoreMark 分数 coremark_per_mhz REAL, -- CoreMark/MHz frequency TEXT NOT NULL, -- 主频 MHz flash TEXT, -- Flash KB rom TEXT, -- ROM KB ram TEXT, -- RAM KB sram TEXT, -- SRAM KB sram_in_rtc TEXT, -- RTC SRAM KB bus_width TEXT, -- 总线宽度 bit cache_type TEXT, -- Cache 类型 (L1/L2) l1_cache_size TEXT, -- L1 Cache 大小 KB l2_cache_size TEXT, -- L2 Cache 大小 KB l3_cache_size TEXT, -- L3 Cache 大小 KB pipeline_depth TEXT, -- 流水线深度 simd_support BOOLEAN, -- SIMD 支持 fpu TEXT, -- FPU 支持 (Single/Double Precision) package TEXT, -- 封装 gpios TEXT, -- GPIO 引脚数量 uarts TEXT, -- UART 数量 i2cs TEXT, -- I2C 数量 i2ses TEXT, -- I2S 数量 spis TEXT, -- SPI 数量 spi_protocols TEXT, -- SPI 协议 systems TEXT, -- 系统 manufacturer TEXT, -- 厂商 link TEXT -- 链接 ) """) conn.commit() conn.close() # ==================== 查询所有 MCU(关键字段) ==================== def get_all_mcus(): conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("SELECT id, name, core_num, core_type, rom, ram, sram, flash, frequency, manufacturer FROM mcu") rows = cur.fetchall() conn.close() return [{"id": i, "name": n, "core_num": cn, "core_type": ct, "rom": ro, "ram": ra, "sram": sra, "flash":fl, "frequency": f, "manufacturer": m} for i, n, cn, ct, ro, ra, sra, fl, f, m in rows] # ==================== 查询单个 MCU(完整字段) ==================== def get_mcu(mcu_id): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row # 允许返回 dict cur = conn.cursor() cur.execute("SELECT * FROM mcu WHERE id=?", (mcu_id,)) row = cur.fetchone() conn.close() return dict(row) if row else None # ==================== 插入 MCU(支持所有字段) ==================== def add_mcu(**fields): """动态插入 MCU(支持所有字段)""" keys = ",".join(fields.keys()) placeholders = ",".join("?" for _ in fields) values = tuple(fields.values()) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(f"INSERT INTO mcu ({keys}) VALUES ({placeholders})", values) conn.commit() conn.close() # ==================== 更新 MCU(支持所有字段) ==================== def update_mcu(mcu_id, **fields): """动态更新 MCU(支持所有字段)""" set_clause = ",".join(f"{k}=?" for k in fields.keys()) values = tuple(fields.values()) + (mcu_id,) conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute(f"UPDATE mcu SET {set_clause} WHERE id=?", values) conn.commit() conn.close() # ==================== 删除 MCU ==================== def delete_mcu(mcu_id): conn = sqlite3.connect(DB_PATH) cur = conn.cursor() cur.execute("DELETE FROM mcu WHERE id=?", (mcu_id,)) conn.commit() conn.close()