stm32_OTA升级

Typing SVG


  1. FLASH 的擦除
项目 几乎所有STM32都一样
最小擦除单位 不是字节、不是字,而是 Sector(扇区)Page(不同系列叫法不同)
F4系列(F405/407/415/429等) 低容量:16KB+16KB+16KB+16KB+64KB+128KB+128KB+128KB+128KB+128KB+128KB+128KB 高容量(如512KB以上):全是128KB一个Sector
H7/G4/L4/F0 等 有的2KB、4KB、8KB、128KB不等
擦除后数据 全变成 0xFF(不是0!)
写之前必须先擦除 只能从1→0可以,从0→1必须先擦除整个Sector
擦除次数 典型1万次,H7可达10万次
  1. CRC校验
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

1. CRC 是什么?(Cyclic Redundancy Check 循环冗余校验)
CRC 是目前嵌入式、通信、存储领域最最最常用的错误检测码,几乎所有 OTA、Flash 烧录、U盘、SD卡、Wi-Fi、以太网、CAN、Modbus、固件升级都用它。
一句话总结它的作用:
用极少的几个字节(通常 4 字节 CRC32),就能以极高的概率(99.99999998%)检测出数据在传输或存储过程中发生的任何位翻转错误。

核心思想:模2除法 + 异或运算(完全不像普通除法)

CRC 的计算过程可以理解为:
把整个待校验的数据当成一个超大二进制数,用一个约定的“生成多项式”(比如 0xEDB88320)去做模2除法,最后的余数就是 CRC 值。
关键点全部在这里(记住这三句话就懂了):

所有运算都是模2:加减同异或(没有进位借位)
除法其实就是连续异或
被除数后面要先补上 CRC 位数个 0(CRC32 就补 32 个 0)

2. 原理


Flash 布局(以 STM32F401RET6 512KB Flash 为例):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
0x08000000 ┬ Bootloader          (32KB Sector 0 , 1)
0x08008000 ├ OTA 信息区 (16KB, 独立 Sector 2)
0x0800C000 ├ APP 固件区 (剩余 ≈460KB, Sector 3~7)
上电复位

Bootloader 启动(地址 0x08000000)

1. 读取 OTA 信息区(0x08008000 开始的16KB)

2. 判断三个条件是否全部满足:
├─ APP 区不是全 0xFF(有固件存在)
├─ OTA 信息区 effective == 1(标记有效)
└─ OTA 信息区 update == 0(APP 没请求升级)
├─ 是 → 直接跳到 APP(0x0800C000)
└─ 否 → 进入 OTA 升级流程

连接 Wi-Fi

HTTP GET /header → 拿到版本、总CRC、块数量

版本比当前高? → 继续(否则直接跳APP)

擦除整个 APP 区(Sector 3~7 460KB)

循环下载 256Byte + 4Byte块CRC(/block/0 ... /block/N)

每块校验块CRC + 累加总CRC + 写入 Flash

总CRC 校验通过

擦除 OTA 信息区(Sector 2

写入新信息:version=新版本, update=0, effective=1

跳转到新 APP
特点 说明
完全独立 16KB OTA 信息区 放在单独 Sector(你原来的 Sector 2),APP 程序也可以安全擦写它,用于请求升级
双重校验 每 256 Byte 块独立 CRC32 + 整个固件总 CRC32,双保险
零硬编码 所有地址、IP、端口、路径、Wi-Fi 函数全部集中在 bl_porting.h 一个文件
主动式更新 将更新权交给用户。用户主动发起请求进行更新

3. 移植


  1. 将下边三个文件移植到你的工程对应的目录下
1
2
3
4
5
Inc/
├── bootloader.h
└── bl_porting.h ← 以后唯一需要改的文件!!!
Src/
└── bootloader.c
  1. 修改配置文件bl_porting.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 1. 改 HAL 头文件(换系列必改)
#include "stm32f4xx_hal.h" // → stm32h7xx_hal.h / stm32g4xx_hal.h 等
// 2. Flash 布局(必须按 Sector 对齐!)
#define BL_FLASH_BASE 0x08000000UL
#define BL_BOOTLOADER_SIZE (32UL * 1024UL) // 根据需求更改,以扇区或页对齐
// OTA 信息区大小(强烈建议保持 16KB 或 独立 Sector 保证至少一页或一扇区对齐)
#define BL_OTA_INFO_SIZE (16UL * 1024UL)
#define BL_OTA_INFO_START (BL_FLASH_BASE + BL_BOOTLOADER_SIZE)
#define BL_APP_START (BL_OTA_INFO_START + BL_OTA_INFO_SIZE)
// 3. Sector 编号(最关键!不同芯片不同)
#define BL_OTA_INFO_SECTOR FLASH_SECTOR_2 // 16KB 区所在 Sector
#define BL_APP_FIRST_SECTOR FLASH_SECTOR_3 // APP 起始 Sector
#define BL_APP_SECTOR_COUNT 5 // F401ret6共有8个,APP开始是3扇区,后边总共剩余5个
// 4. 服务器地址(换项目必改)
#define BL_SERVER_HOST "192.168.1.100"
#define BL_SERVER_PORT 8000
#define BL_HEADER_PATH "/firmware/header"
#define BL_BLOCK_PATH_FMT "/firmware/block/%lu"
// 5. Wi-Fi 驱动函数名(换 Wi-Fi 模块必改)
#define BL_WIFI_CONNECT() Esp8266_ConnectWiFi() // 你的实际函数名
#define BL_WIFI_SEND(data, len) Esp8266_SendData(data, len)
#define BL_WIFI_RECV(data, len) Esp8266_RecvData(data, len)
#define BL_WIFI_WAIT(ms) Esp8266_WaitForData(ms)
// 6. 帮助OTA分区存入初始数据,初次为1. 全擦除烧录一次后请改为0,后续不需要修改
#define BL_FIRST_BOOT_INIT 0
// 7. 调试打印
#define BL_DEBUG_EN 1
#if BL_DEBUG_EN
#define bl_printf(...) printf(VA_ARGS)
#else
#define bl_printf(...)
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bootloader.h
#ifndef __BOOTLOADER_H
#define __BOOTLOADER_H
#include "bl_porting.h"
typedef enum {
OTA_BLOCK,
APP_BLOCK
} BLOCK_TYPE;
typedef struct {
uint32_t version;
uint32_t update; // 1 = 需要升级
uint32_t effective; // 1 = 固件有效
} bl_ota_info_t;
typedef struct {
uint32_t version;
uint32_t checksum;
uint32_t blocknum;
uint32_t blocksize;
uint32_t binsize;
} bl_firmware_header_t;
void Bootloader_Main(void);
#endif
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
//bootloader.c
#include "bootloader.h"
#include <stdio.h>
#include <string.h>
char *hostname = BL_SERVER_HOST;
uint16_t hostport = BL_SERVER_PORT;
/* ============================ CRC32 ============================ */
static uint32_t crc32_update(uint32_t crc, uint8_t data)
{
crc ^= data;
for (int i = 0; i < 8; i++)
crc = (crc & 1) ? (crc >> 1) ^ 0xEDB88320UL : (crc >> 1);
return crc;
}
static uint32_t crc32(const uint8_t *buf, uint32_t len)
{
uint32_t crc = 0xFFFFFFFFUL;
for (uint32_t i = 0; i < len; i++)
crc = crc32_update(crc, buf[i]);
return ~crc;
}
/* ============================ HTTP 工具 ============================ */
// 构建请求头
static void build_request(char *buf, size_t size, const char *path){
snprintf(buf, size,
"GET %s HTTP/1.1\r\nHost: %s:%d\r\nConnection: close\r\n\r\n",
path, BL_SERVER_HOST, BL_SERVER_PORT);
}
// 跳过响应头
static int skip_http_header(void){
uint8_t tmp[4];
while (1) {
if (BL_WIFI_RECV(tmp, 1) != 1) return -1;
if (tmp[0] == '\r') {
if (BL_WIFI_RECV(tmp, 3) != 3) return -1;
if (tmp[0] == '\n' && tmp[1] == '\r' && tmp[2] == '\n')
return 0;
}
}
}
/* ============================ Flash 操作 ============================ */
// 擦除指定区域
static int flash_erase_block(BLOCK_TYPE type){
FLASH_EraseInitTypeDef erase = {0};
uint32_t sector_error = 0;
if (HAL_FLASH_Unlock() != HAL_OK) return -1;

erase.TypeErase = FLASH_TYPEERASE_SECTORS;
erase.Banks = FLASH_BANK_1;
erase.VoltageRange = FLASH_VOLTAGE_RANGE_3;

if (type == OTA_BLOCK) {
erase.Sector = BL_OTA_INFO_SECTOR;
erase.NbSectors = 1;
} else if (type == APP_BLOCK) {
erase.Sector = BL_APP_FIRST_SECTOR;
erase.NbSectors = BL_APP_SECTOR_COUNT;
} else {
HAL_FLASH_Lock();
return -1;
}

if (HAL_FLASHEx_Erase(&amp;erase, &amp;sector_error) != HAL_OK) {
HAL_FLASH_Lock();
bl_printf(&#34;[FLASH] Erase failed, error=0x%lX\r\n&#34;, sector_error);
return -1;
}

HAL_FLASH_Lock();
bl_printf(&#34;[FLASH] Erase %s success\r\n&#34;, type == OTA_BLOCK ? &#34;OTA&#34; : &#34;APP&#34;);
return 0;

}
// 写入数据
static int flash_write(uint32_t addr, const uint8_t *data, uint32_t len){
if (HAL_FLASH_Unlock() != HAL_OK) return -1;
for (uint32_t i = 0; i &lt; len; i += 4) {
uint32_t word = 0xFFFFFFFFUL;
uint32_t remain = len - i;
memcpy(&amp;word, data + i, remain &gt; 4 ? 4 : remain);

if (HAL_FLASH_Program(FLASH_TYPEPROGRAM_WORD, addr + i, word) != HAL_OK) {
HAL_FLASH_Lock();
bl_printf(&#34;[FLASH] Program failed @0x%08lX\r\n&#34;, addr + i);
return -1;
}
}
HAL_FLASH_Lock();
return 0;

}
// 读取OTA信息
static void read_ota_info(bl_ota_info_t p){
memcpy(p, (const void)BL_OTA_INFO_START, sizeof(*p));
}
// 判断APP区域是否没数据
static int is_app_empty(void){
const uint32_t p = (const uint32_t)BL_APP_START;
for (int i = 0; i < 16; i++) {
if (p[i] != 0xFFFFFFFFUL) return 0;
}
return 1;
}
// 跳转到APP
static void jump_to_app(void){
uint32_t msp = ((__IO uint32_t)BL_APP_START);
uint32_t reset = ((__IO uint32_t)(BL_APP_START + 4));
if ((msp &amp; 0x2FFE0000) != 0x20000000) {
bl_printf(&#34;[BOOT] Invalid stack pointer\r\n&#34;);
return;
}

HAL_RCC_DeInit();
HAL_DeInit();

for (int i = 0; i &lt; 8; i++) {
NVIC-&gt;ICER[i] = 0xFFFFFFFF;
NVIC-&gt;ICPR[i] = 0xFFFFFFFF;
}

__set_MSP(msp);
SCB-&gt;VTOR = BL_APP_START;

((void (*)(void))reset)();

while (1); // 不应该执行到这里

}
/* ============================ OTA 主流程 ============================ */
// 更新主流程
static void perform_ota(void){
char req[256];
uint8_t block[260];
bl_firmware_header_t hdr = {0};
uint32_t total_crc = 0xFFFFFFFFUL;
uint32_t write_addr = BL_APP_START;
// 1. 获取固件头
build_request(req, sizeof(req), BL_HEADER_PATH);
BL_WIFI_SEND(req, strlen(req));
if (skip_http_header() != 0 || BL_WIFI_RECV(&amp;hdr, sizeof(hdr)) != sizeof(hdr)) {
goto fail;
}

bl_ota_info_t current;
read_ota_info(&amp;current);
if (hdr.version &lt;= current.version) {
bl_printf(&#34;[OTA] Already the latest version\r\n&#34;);
// 将更新标志位取消掉,不然重启后就进入这里。
bl_ota_info_t new_info = { .version = hdr.version, .update = 0, .effective = 1 };
flash_erase_block(OTA_BLOCK);
flash_write(BL_OTA_INFO_START, (uint8_t*)&amp;new_info, sizeof(new_info));
//跳转回主程序
jump_to_app();
return;
}

// 2. 擦除 APP 分区
if (flash_erase_block(APP_BLOCK) != 0) goto fail;

// 3. 逐块下载
for (uint32_t i = 0; i &lt; hdr.blocknum; i++) {
char path[64];
snprintf(path, sizeof(path), BL_BLOCK_PATH_FMT, i);

build_request(req, sizeof(req), path);
BL_WIFI_SEND(req, strlen(req));
BL_WIFI_WAIT(8000);

if (skip_http_header() != 0) goto fail;

// 收完整 260 字节
uint32_t recv = 0;
while (recv &lt; 260) {
int r = BL_WIFI_RECV(block + recv, 260 - recv);
if (r &lt;= 0) goto fail;
recv += r;
}

// 块 CRC 校验
if (crc32(block, 256) != ((uint32_t*)(block + 256))[0]) {
bl_printf(&#34;[OTA] Block %lu CRC error\r\n&#34;, i);
goto fail;
}

// 计算最后一块真实长度
uint32_t this_len = 256;
if (i == hdr.blocknum - 1) {
uint32_t remain = hdr.binsize % 256;
if (remain) this_len = remain;
}

// 累加总 CRC
for (uint32_t j = 0; j &lt; this_len; j++)
total_crc = crc32_update(total_crc, block[j]);

if (flash_write(write_addr, block, this_len) != 0) goto fail;

write_addr += this_len;
bl_printf(&#34;[OTA] Block %lu/%lu OK\r\n&#34;, i + 1, hdr.blocknum);
}

// 4. 总 CRC 校验
if (~total_crc != hdr.checksum) {
bl_printf(&#34;[OTA] Total CRC failed\r\n&#34;);
goto fail;
}

// 5. 更新 OTA 信息区
bl_ota_info_t new_info = { .version = hdr.version, .update = 0, .effective = 1 };
flash_erase_block(OTA_BLOCK);
flash_write(BL_OTA_INFO_START, (uint8_t*)&amp;new_info, sizeof(new_info));

bl_printf(&#34;[OTA] UPDATE SUCCESS! Jumping to APP...\r\n&#34;);
HAL_Delay(200);
jump_to_app();

fail:
bl_printf("[OTA] UPDATE FAILED!\r\n");
while (1) HAL_Delay(1000);
}
/* ============================ 主函数 ============================ */
// 启动引导
void Bootloader_Main(void){
delay_Init(); // 你原来的延时初始化
HAL_Delay(200);
#if BL_FIRST_BOOT_INIT
{
bl_ota_info_t init = { .version = 0, .update = 0, .effective = 0 };
flash_erase_block(OTA_BLOCK);
flash_write(BL_OTA_INFO_START, (uint8_t*)&init, sizeof(init));
}
#endif
bl_ota_info_t ota_info;
read_ota_info(&amp;ota_info);

if (is_app_empty() || ota_info.effective != 1 || ota_info.update == 1) {
bl_printf(&#34;[BOOT] Enter OTA mode\r\n&#34;);
BL_WIFI_CONNECT();
perform_ota();

// 如果走到这里说明升级失败
while (1) {
HAL_Delay(2000);
bl_printf(&#34;[BOOT] Update failed, please reset device\r\n&#34;);
}
}

bl_printf(&#34;[BOOT] Start APP v0x%08lX\r\n&#34;, ota_info.version);
jump_to_app();

}
  1. 后续如何更新? 在APP中,预留触发更新APP的触发条件,比如一个按钮按下/接收到某个信息,触发APP更新。

4. 服务端设计


- 支持无限多个项目自动识别 + 热更新 + 版本号从 .txt 读取 + 零配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
## smart_server.py
from flask import Flask, abort, make_response
from werkzeug.serving import make_server
from werkzeug.serving import WSGIRequestHandler
import struct
import zlib
import os
import time
import threading
app = Flask(name)
====================== 配置 ======================
BLOCK_DATA_SIZE = 256 # 数据块大小(字节)
CRC_SIZE = 4 # CRC校验位大小(字节)
BLOCK_TOTAL_SIZE = BLOCK_DATA_SIZE + CRC_SIZE # 每块总大小(260字节)
HEADER_SIZE = 20 # 头部大小(5×uint32_t = 20字节)
SERVER_PORT = 8000 # 服务端口
CHECK_INTERVAL = 5 # 主动检查间隔(秒,改为1秒更灵敏)
================================================
全局缓存(带线程锁,避免并发冲突)
_firmware_cache = {
'firmwares': {}, # key: 固件名(不含后缀), value: 固件+版本信息
'lock': threading.Lock() # 线程锁,保护缓存读写
}
def get_firmware_version(firmware_name):
"""读取对应固件的版本文件(如 app1.txt),返回版本号"""
version_file = f"{firmware_name}.txt"
if not os.path.exists(version_file):
# 仅首次警告,避免重复刷屏
if firmware_name not in [f['name'] for f in _firmware_cache['firmwares'].values()]:
print(f"警告: {version_file} 不存在,{firmware_name} 使用默认版本 1")
return 1
try:
with open(version_file, 'r', encoding='utf-8') as f:
content = f.read().strip()
if content.isdigit():
return int(content)
else:
print(f"警告: {version_file} 版本号无效(必须是纯数字),{firmware_name} 使用默认版本 1")
return 1
except Exception as e:
print(f"读取 {version_file} 失败: {e}{firmware_name} 使用默认版本 1")
return 1
def scan_firmwares():
"""扫描同级目录下所有 .bin 固件,匹配对应 .txt 版本文件,返回固件信息字典"""
current_dir = os.path.dirname(os.path.abspath(file))
firmwares = {}
for filename in os.listdir(current_dir):
# 只处理 .bin 固件文件
if filename.lower().endswith('.bin'):
firmware_name = os.path.splitext(filename)[0] # 提取固件名(不含后缀)
firmware_path = os.path.join(current_dir, filename)
version_file_path = os.path.join(current_dir, f"{firmware_name}.txt")
try:
# 读取固件数据
with open(firmware_path, 'rb') as f:
firmware_data = f.read()
if not firmware_data:
print(f"警告: {filename} 为空文件,跳过")
continue
# 计算固件CRC32和块数
crc32_val = zlib.crc32(firmware_data) & 0xFFFFFFFF
blocknum = (len(firmware_data) + BLOCK_DATA_SIZE - 1) // BLOCK_DATA_SIZE
# 读取版本号(从对应 .txt 文件)
version = get_firmware_version(firmware_name)
# 记录文件修改时间(用于检测更新)
firmware_mtime = os.path.getmtime(firmware_path)
version_mtime = os.path.getmtime(version_file_path) if os.path.exists(version_file_path) else 0
# 存储固件完整信息
firmwares[firmware_name] = {
'name': firmware_name,
'bin_path': firmware_path,
'version_path': version_file_path,
'data': firmware_data,
'size': len(firmware_data),
'crc32': crc32_val,
'blocknum': blocknum,
'version': version,
'bin_mtime': firmware_mtime, # 固件文件修改时间
'version_mtime': version_mtime # 版本文件修改时间
}
except Exception as e:
print(f"处理 {filename} 失败: {e},跳过")
return firmwares
def check_for_updates():
"""主动检查固件更新(独立线程调用),有更新则刷新缓存"""
while True:
try:
new_firmwares = scan_firmwares()
with _firmware_cache['lock']:
old_firmwares = _firmware_cache['firmwares']
# 检测固件新增、删除或更新
old_names = set(old_firmwares.keys())
new_names = set(new_firmwares.keys())
added_names = new_names - old_names # 新增的固件
removed_names = old_names - new_names # 移除的固件
updated_names = [] # 更新的固件(固件或版本文件修改)
# 检查原有固件是否有更新
for name in old_names & new_names:
old_info = old_firmwares[name]
new_info = new_firmwares[name]
# 固件文件修改 或 版本文件修改 → 判定为更新
if (new_info['bin_mtime'] != old_info['bin_mtime'] or
new_info['version_mtime'] != old_info['version_mtime']):
updated_names.append(name)
# 有变更时刷新缓存并打印日志
if added_names or removed_names or updated_names:
with _firmware_cache['lock']:
_firmware_cache['firmwares'] = new_firmwares
print(f"\n=== 检测到固件变更 ===")
if added_names:
print(f"新增固件: {sorted(added_names)}")
if removed_names:
print(f"移除固件: {sorted(removed_names)}")
if updated_names:
print(f"更新固件: {sorted(updated_names)}")
print(f"当前可用固件总数: {len(new_firmwares)}")
print(f"======================\n")
except Exception as e:
print(f"热更新检查异常: {e}")
# 间隔指定时间再次检查
time.sleep(CHECK_INTERVAL)
def start_update_check_thread():
"""启动热更新检查独立线程"""
update_thread = threading.Thread(target=check_for_updates, daemon=True)
update_thread.name = "FirmwareUpdateChecker"
update_thread.start()
print(f"热更新检查线程已启动(每{CHECK_INTERVAL}秒检查一次)")
def get_firmware_info(firmware_name):
"""获取指定固件的详细信息(带锁读取,确保线程安全)"""
with _firmware_cache['lock']:
return _firmware_cache['firmwares'].get(firmware_name)
def get_all_available_firmwares():
"""获取所有可用固件的名称列表(带锁读取)"""
with _firmware_cache['lock']:
return sorted(_firmware_cache['firmwares'].keys())
====================== 接口 ======================
@app.route('/<firmware_name>/header', methods=['GET'])
def get_header(firmware_name):
"""获取指定固件的20字节头部信息(版本号取自对应 .txt 文件)"""
firmware_info = get_firmware_info(firmware_name)
if not firmware_info:
abort(404, f"Firmware '{firmware_name}' not found (无对应 .bin 文件)")
# 头部格式:<IIIII(小端序,5个uint32_t)
header = struct.pack('<IIIII',
firmware_info['version'], # 版本号(来自 firmware_name.txt)
firmware_info['crc32'], # 整个固件的CRC32校验值
firmware_info['blocknum'], # 固件总块数
BLOCK_DATA_SIZE, # 每块数据大小(256字节)
firmware_info['size'] # 固件总大小(字节)
)
resp = make_response(header)
resp.headers['Content-Type'] = 'application/octet-stream'
resp.headers['Content-Length'] = HEADER_SIZE
resp.headers['Connection'] = 'keep-alive'
resp.headers['Keep-Alive'] = 'timeout=60, max=100'
return resp
@app.route('/<firmware_name>/block/<int:idx>', methods=['GET'])
def get_block(firmware_name, idx):
"""获取指定固件的第idx块数据(256字节数据 + 4字节CRC)"""
firmware_info = get_firmware_info(firmware_name)
if not firmware_info:
abort(404, f"Firmware '{firmware_name}' not found (无对应 .bin 文件)")
# 检查块索引是否合法
if idx < 0 or idx >= firmware_info['blocknum']:
abort(416, f"Block index out of range (最大索引: {firmware_info['blocknum'] - 1})")
# 提取对应块的数据
start = idx * BLOCK_DATA_SIZE
block_data = firmware_info['data'][start:start + BLOCK_DATA_SIZE]
# 最后一块数据不足256字节时,补0xFF(与Flash未擦除状态一致)
if len(block_data) < BLOCK_DATA_SIZE:
block_data += b'\xFF' * (BLOCK_DATA_SIZE - len(block_data))
# 计算当前块的CRC32校验值
block_crc = zlib.crc32(block_data) & 0xFFFFFFFF
packet = block_data + struct.pack('<I', block_crc) # 数据+CRC组合
resp = make_response(packet)
resp.headers['Content-Type'] = 'application/octet-stream'
resp.headers['Content-Length'] = BLOCK_TOTAL_SIZE
resp.headers['Connection'] = 'keep-alive'
resp.headers['Keep-Alive'] = 'timeout=60, max=100'
return resp
@app.route('/firmwares', methods=['GET'])
def list_firmwares():
"""列出所有可用固件及版本信息(调试/客户端查询用)"""
available_firmwares = get_all_available_firmwares()
if not available_firmwares:
return make_response("No firmwares available (未找到任何 .bin 文件)", 404)
resp_text = "=== Available Firmwares ===\n"
for name in available_firmwares:
info = get_firmware_info(name)
resp_text += (
f"Firmware: {name}\n"
f" Version: {info['version']} (来自 {name}.txt)\n"
f" Size: {info['size']} bytes\n"
f" CRC32: 0x{info['crc32']:08X}\n"
f" Blocks: {info['blocknum']} × {BLOCK_DATA_SIZE} bytes\n"
f" Files: {os.path.basename(info['bin_path'])} + {os.path.basename(info['version_path'])}\n\n"
)
return make_response(resp_text, 200)
====================== 强制 Keep-Alive 配置 ======================
class KeepAliveRequestHandler(WSGIRequestHandler):
def send_header(self, keyword, value):
# 阻止服务端返回 Connection: close
if keyword.lower() == 'connection' and value.lower() == 'close':
return
super().send_header(keyword, value)
def close_connection(self):
# 强制保持连接
return False
====================== 服务启动 ======================
def run_server():
print("=== OTA Server (多固件+独立版本+实时热更新版) ===")
print(f"配置: 块大小={BLOCK_DATA_SIZE}字节, 头部={HEADER_SIZE}字节, 端口={SERVER_PORT}")
print(f"热更新检查间隔: {CHECK_INTERVAL}秒")
print(f"固件目录: {os.path.dirname(os.path.abspath(file))}")
print(f"规则: 每个 .bin 固件需对应同名 .txt 版本文件(如 app1.bin ↔ app1.txt)\n")
# 初始化固件缓存
initial_firmwares = scan_firmwares()
with _firmware_cache['lock']:
_firmware_cache['firmwares'] = initial_firmwares
# 启动热更新检查独立线程
start_update_check_thread()
# 打印初始固件状态
available_firmwares = get_all_available_firmwares()
if available_firmwares:
print(f"\n初始可用固件 ({len(available_firmwares)}个):")
for name in available_firmwares:
info = get_firmware_info(name)
print(f" - {name}: 版本{info['version']}, {info['size']}字节, CRC32=0x{info['crc32']:08X}")
else:
print("\n警告: 未找到任何 .bin 固件文件!")
# 打印接口说明
print("\n=== 接口说明 ===")
print(" GET /firmwares → 列出所有可用固件及版本信息")
print(" GET /<固件名>/header → 获取对应固件的20字节头部(版本取自同名 .txt)")
print(" GET /<固件名>/block/<索引> → 获取对应固件的256+4字节数据块")
print(f"\n服务器启动 → http://<IP>:{SERVER_PORT}")
# 启动服务(支持多线程)
server = make_server('0.0.0.0', SERVER_PORT, app,
threaded=True,
request_handler=KeepAliveRequestHandler)
try:
server.serve_forever()
except KeyboardInterrupt:
print("\n服务器已停止。")
if name == 'main':
run_server()
  • 使用

    1. 将固件上传到脚本同级目录下

    2. 创建一个与固件同名的.txt文件。写入版本号,纯数字

    3. 启动脚本

      1
      python smart_server.py
    4. 后续有别的项目需要更新,指定好服务器的地址后,也在本脚本目录同级上传bin固件,以及添加版本信息,,运行中的脚本拥有(热更新)可以自动识别刚加入的固件以及版本。 例:

    1
    2
    3
    4
    5
    ├─ project1.bin   //工程1的bin固件
    ├─ project1.txt //工程1的版本信息 (内容就纯数字1,2,3 ...)
    ├─ project2.bin //工程2的bin固件 <-- 新添加的工程2
    ├─ project2.txt //工程2的版本信息 <-- 也要添加版本文档
    ├─ smart_server.py // 脚本文件