expo-duooomi-app/ble/services/BleProtocolService.ts

339 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Buffer } from 'buffer'
import { type Subscription } from 'react-native-ble-plx'
import { BleClient } from '../core/BleClient'
import { type APP_COMMAND_TYPES, BLE_UUIDS, FRAME_CONSTANTS } from '../protocol/Constants'
import { ProtocolManager } from '../protocol/ProtocolManager'
import { type ProtocolFrame } from '../protocol/types'
export class BleProtocolService {
private static instance: BleProtocolService
private client = BleClient.getInstance()
private listeners: Map<number, Set<(data: ArrayBuffer, deviceId: string) => void>> = new Map()
private subscription: Subscription | null = null
// deviceId_type -> { total: number, frames: ArrayBuffer[] }
private fragments: Map<string, { total: number; frames: (ArrayBuffer | null)[] }> = new Map()
private constructor() {}
public static getInstance(): BleProtocolService {
if (!BleProtocolService.instance) {
BleProtocolService.instance = new BleProtocolService()
}
return BleProtocolService.instance
}
public addListener(type: number, callback: (data: ArrayBuffer, deviceId: string) => void) {
if (!this.listeners.has(type)) {
this.listeners.set(type, new Set())
}
this.listeners.get(type)!.add(callback)
}
public removeListener(type: number, callback: (data: ArrayBuffer, deviceId: string) => void) {
if (this.listeners.has(type)) {
this.listeners.get(type)!.delete(callback)
}
}
private emit(type: number, data: ArrayBuffer, deviceId: string) {
if (this.listeners.has(type)) {
this.listeners.get(type)!.forEach((cb) => cb(data, deviceId))
}
}
public async initialize(deviceId: string) {
// Clean up previous subscription
this.disconnect()
// Clear fragments for this device
this.clearFragments(deviceId)
try {
this.subscription = await this.client.monitor(
deviceId,
BLE_UUIDS.SERVICE,
BLE_UUIDS.READ_CHARACTERISTIC,
(error, value) => {
if (error) {
// Check for known native crash error and ignore/log as debug
if (
error.errorCode === 0 &&
error.message.includes('Unknown error') &&
error.reason?.includes('PromiseImpl.reject')
) {
console.debug('Ignored native monitor error', error)
return
}
console.warn('Monitor error', error)
return
}
if (value) {
const buffer = Buffer.from(value, 'base64')
const hexString =
buffer
.toString('hex')
.match(/.{1,2}/g)
?.join(' ')
.toUpperCase() || ''
console.log(`[BleProtocol] Received ${buffer.byteLength} bytes:`, hexString)
this.handleRawData(deviceId, buffer)
}
},
)
} catch (error) {
console.error('Failed to initialize protocol service:', error)
throw error
}
}
public disconnect() {
if (this.subscription) {
try {
this.subscription.remove()
} catch (e) {
console.warn('Failed to remove subscription', e)
}
this.subscription = null
}
}
private handleRawData(deviceId: string, data: Buffer) {
const frame = ProtocolManager.parseFrame(data.buffer)
if (!frame) return
if (frame.subpageTotal > 0) {
this.handleFragment(deviceId, frame)
} else {
this.emit(frame.type, frame.data, deviceId)
}
}
private handleFragment(deviceId: string, frame: ProtocolFrame) {
const key = `${deviceId}_${frame.type}`
if (!this.fragments.has(key)) {
this.fragments.set(key, {
total: frame.subpageTotal,
frames: new Array(frame.subpageTotal).fill(null),
})
}
const session = this.fragments.get(key)!
// Basic validation
if (frame.curPage >= session.total) return
session.frames[frame.curPage] = frame.data
// Check if complete
if (session.frames.every((f) => f !== null)) {
const combinedLength = session.frames.reduce((acc, val) => acc + (val ? val.byteLength : 0), 0)
const combined = new Uint8Array(combinedLength)
let offset = 0
// Reassemble from High to Low pages
for (let i = session.total - 1; i >= 0; i--) {
const part = session.frames[i]
if (part) {
combined.set(new Uint8Array(part), offset)
offset += part.byteLength
}
}
this.fragments.delete(key)
this.emit(frame.type, combined.buffer as ArrayBuffer, deviceId)
}
}
private clearFragments(deviceId: string) {
for (const key of this.fragments.keys()) {
if (key.startsWith(deviceId)) {
this.fragments.delete(key)
}
}
}
public async send(
deviceId: string,
type: APP_COMMAND_TYPES,
data: object | ArrayBuffer | Uint8Array,
onProgress?: (progress: number) => void,
): Promise<void> {
let payload: Uint8Array
if (data instanceof ArrayBuffer) {
console.debug('[BleProtocolService] Sending ArrayBuffer')
payload = new Uint8Array(data)
} else if (data instanceof Uint8Array) {
console.debug('[BleProtocolService] Sending Uint8Array')
payload = data
} else {
console.debug('[BleProtocolService] Sending JSON payload')
const jsonStr = JSON.stringify(data)
payload = new Uint8Array(Buffer.from(jsonStr))
}
const device = this.client.getConnectedDevice()
console.log('send-------------device', device)
const mtu = device?.mtu || 23
// MTU - 3 bytes (ATT overhead) - Protocol Header - Protocol Footer
const maxPayloadSize = mtu - 3 - FRAME_CONSTANTS.HEADER_SIZE - FRAME_CONSTANTS.FOOTER_SIZE
// Ensure reasonable bounds (at least 1 byte, max constrained by protocol constant)
const safeMaxDataSize = Math.max(1, Math.min(maxPayloadSize, FRAME_CONSTANTS.MAX_DATA_SIZE))
console.debug(`[BleProtocolService] Sending with MTU=${mtu}, maxDataSize=${safeMaxDataSize}`)
const rawPayloadHex = payload.reduce((acc, val) => acc + val.toString(16).padStart(2, '0') + ' ', '')
const formattedRawPayloadHex =
rawPayloadHex.substring(0, 512 * 2) + '\n......\n' + rawPayloadHex.substring(rawPayloadHex.length - 512 * 2)
console.debug(
`[BleProtocolService] Sending payload size=${payload.byteLength}, raw payload hex=\n${formattedRawPayloadHex}`,
)
const frames = ProtocolManager.createFrame(type, payload, FRAME_CONSTANTS.HEAD_APP_TO_DEVICE, safeMaxDataSize)
const total = frames.length
console.debug(`Sending ${total} frames`)
// 使用滑动窗口流控发送
await this.sendFramesWithFlowControl(deviceId, frames, onProgress)
}
/**
* 串行发送帧数据 + 自适应间隔
* 硬件不支持并发,采用串行发送 + 动态调整间隔策略
*/
private async sendFramesWithFlowControl(
deviceId: string,
frames: Uint8Array[],
onProgress?: (progress: number) => void,
): Promise<void> {
const total = frames.length
const maxRetries = FRAME_CONSTANTS.FLOW_CONTROL.MAX_RETRIES
const retryDelay = FRAME_CONSTANTS.FLOW_CONTROL.RETRY_DELAY
// 性能监控指标
const startTime = Date.now()
let failedCount = 0
let retriedCount = 0
let totalRetries = 0
// 自适应间隔参数
const MIN_INTERVAL = 40 // 最小间隔 40ms
const MAX_INTERVAL = 150 // 最大间隔 150ms
const INITIAL_INTERVAL = 50 // 初始间隔 50ms比原来的35ms保守
let currentInterval = INITIAL_INTERVAL
let consecutiveSuccesses = 0
let consecutiveFailures = 0
console.debug(`[FlowControl] Starting serial transmission: ${total} frames, initial interval: ${currentInterval}ms`)
for (let i = 0; i < total; i++) {
const frame = frames[i]
// 打印前几帧的原始数据用于调试
if (i < 3) {
const rawFrame = Array.from(frame)
.map((b) => b.toString(16).padStart(2, '0'))
.join(' ')
console.debug(`raw ${i + 1} frame \n ${rawFrame}`)
}
const base64 = Buffer.from(frame).toString('base64')
let frameSucceeded = false
// 重试逻辑(第一帧失败直接报错,不重试)
const frameMaxRetries = i === 0 ? 0 : maxRetries
for (let attempt = 0; attempt <= frameMaxRetries; attempt++) {
try {
// 串行等待每次写入完成
await this.client.write(deviceId, BLE_UUIDS.SERVICE, BLE_UUIDS.WRITE_CHARACTERISTIC, base64, false)
// 写入成功
frameSucceeded = true
consecutiveSuccesses++
consecutiveFailures = 0
if (attempt > 0) {
retriedCount++
totalRetries += attempt
console.debug(`[FlowControl] Frame ${i} succeeded after ${attempt} retries`)
}
// 自适应调整:连续成功 5 次,逐渐减小间隔(加速)
if (consecutiveSuccesses >= 5 && currentInterval > MIN_INTERVAL) {
currentInterval = Math.max(MIN_INTERVAL, currentInterval - 5)
console.debug(`[FlowControl] Speed up: interval reduced to ${currentInterval}ms`)
consecutiveSuccesses = 0
}
break
} catch (error) {
// 第一帧失败直接报错
if (i === 0) {
console.error(`[FlowControl] First frame write failed, aborting:`, error)
throw new Error(`First frame write failed: ${(error as Error)?.message || error}`)
}
consecutiveFailures++
consecutiveSuccesses = 0
console.warn(`[FlowControl] Frame ${i} write failed (attempt ${attempt + 1}/${frameMaxRetries + 1}):`, error)
// 如果还有重试机会,等待后重试
if (attempt < frameMaxRetries) {
const backoffDelay = retryDelay * (attempt + 1)
await new Promise((resolve) => setTimeout(resolve, backoffDelay))
} else {
// 所有重试都失败
failedCount++
console.error(`[FlowControl] Frame ${i} failed after ${frameMaxRetries} retries`)
// 严重失败:大幅增加间隔
currentInterval = Math.min(MAX_INTERVAL, currentInterval * 2)
console.warn(`[FlowControl] Severe failure: interval increased to ${currentInterval}ms`)
}
}
}
// 自适应调整:连续失败,增加间隔(退避)
if (consecutiveFailures >= 2 && currentInterval < MAX_INTERVAL) {
currentInterval = Math.min(MAX_INTERVAL, currentInterval * 1.5)
console.debug(`[FlowControl] Slow down: interval increased to ${currentInterval}ms`)
}
// 间隔等待(只有成功才继续,失败会在重试中已经等待过)
if (frameSucceeded) {
await new Promise((resolve) => setTimeout(resolve, currentInterval))
}
// 更新进度
if (onProgress) {
onProgress((i + 1) / total)
}
}
// 性能统计
const duration = Date.now() - startTime
const throughput = total > 0 ? (total / duration) * 1000 : 0
const avgRetriesPerFailedFrame = failedCount > 0 ? totalRetries / retriedCount : 0
console.log(
`[FlowControl] Transmission completed:
- Total frames: ${total}
- Duration: ${duration}ms
- Throughput: ${throughput.toFixed(2)} frames/sec
- Failed: ${failedCount}
- Frames retried: ${retriedCount}
- Total retry attempts: ${totalRetries}
- Avg retries per failed frame: ${avgRetriesPerFailedFrame.toFixed(2)}
- Final interval: ${currentInterval}ms
- Success rate: ${(((total - failedCount) / total) * 100).toFixed(2)}%`,
)
if (failedCount > 0) {
throw new Error(`Transmission incomplete: ${failedCount}/${total} frames failed`)
}
}
}