import { Buffer } from 'buffer' import { type Subscription } from 'react-native-ble-plx' import { BleClient } from '../core/BleClient' import { type APP_COMMAND_TYPES, BLE_UUIDS, FRAME_CONSTANTS } from '../protocol/Constants' import { ProtocolManager } from '../protocol/ProtocolManager' import { type ProtocolFrame } from '../protocol/types' export class BleProtocolService { private static instance: BleProtocolService private client = BleClient.getInstance() private listeners: Map void>> = new Map() private subscription: Subscription | null = null // deviceId_type -> { total: number, frames: ArrayBuffer[] } private fragments: Map = new Map() private constructor() {} public static getInstance(): BleProtocolService { if (!BleProtocolService.instance) { BleProtocolService.instance = new BleProtocolService() } return BleProtocolService.instance } public addListener(type: number, callback: (data: ArrayBuffer, deviceId: string) => void) { if (!this.listeners.has(type)) { this.listeners.set(type, new Set()) } this.listeners.get(type)!.add(callback) } public removeListener(type: number, callback: (data: ArrayBuffer, deviceId: string) => void) { if (this.listeners.has(type)) { this.listeners.get(type)!.delete(callback) } } private emit(type: number, data: ArrayBuffer, deviceId: string) { if (this.listeners.has(type)) { this.listeners.get(type)!.forEach((cb) => cb(data, deviceId)) } } public async initialize(deviceId: string) { // Clean up previous subscription this.disconnect() // Clear fragments for this device this.clearFragments(deviceId) try { this.subscription = await this.client.monitor( deviceId, BLE_UUIDS.SERVICE, BLE_UUIDS.READ_CHARACTERISTIC, (error, value) => { if (error) { // Check for known native crash error and ignore/log as debug if ( error.errorCode === 0 && error.message.includes('Unknown error') && error.reason?.includes('PromiseImpl.reject') ) { console.debug('Ignored native monitor error', error) return } console.warn('Monitor error', error) return } if (value) { const buffer = Buffer.from(value, 'base64') const hexString = buffer .toString('hex') .match(/.{1,2}/g) ?.join(' ') .toUpperCase() || '' console.log(`[BleProtocol] Received ${buffer.byteLength} bytes:`, hexString) this.handleRawData(deviceId, buffer) } }, ) } catch (error) { console.error('Failed to initialize protocol service:', error) throw error } } public disconnect() { if (this.subscription) { try { this.subscription.remove() } catch (e) { console.warn('Failed to remove subscription', e) } this.subscription = null } } private handleRawData(deviceId: string, data: Buffer) { const frame = ProtocolManager.parseFrame(data.buffer) if (!frame) return if (frame.subpageTotal > 0) { this.handleFragment(deviceId, frame) } else { this.emit(frame.type, frame.data, deviceId) } } private handleFragment(deviceId: string, frame: ProtocolFrame) { const key = `${deviceId}_${frame.type}` if (!this.fragments.has(key)) { this.fragments.set(key, { total: frame.subpageTotal, frames: new Array(frame.subpageTotal).fill(null), }) } const session = this.fragments.get(key)! // Basic validation if (frame.curPage >= session.total) return session.frames[frame.curPage] = frame.data // Check if complete if (session.frames.every((f) => f !== null)) { const combinedLength = session.frames.reduce((acc, val) => acc + (val ? val.byteLength : 0), 0) const combined = new Uint8Array(combinedLength) let offset = 0 // Reassemble from High to Low pages for (let i = session.total - 1; i >= 0; i--) { const part = session.frames[i] if (part) { combined.set(new Uint8Array(part), offset) offset += part.byteLength } } this.fragments.delete(key) this.emit(frame.type, combined.buffer as ArrayBuffer, deviceId) } } private clearFragments(deviceId: string) { for (const key of this.fragments.keys()) { if (key.startsWith(deviceId)) { this.fragments.delete(key) } } } public async send( deviceId: string, type: APP_COMMAND_TYPES, data: object | ArrayBuffer | Uint8Array, onProgress?: (progress: number) => void, ): Promise { let payload: Uint8Array if (data instanceof ArrayBuffer) { console.debug('[BleProtocolService] Sending ArrayBuffer') payload = new Uint8Array(data) } else if (data instanceof Uint8Array) { console.debug('[BleProtocolService] Sending Uint8Array') payload = data } else { console.debug('[BleProtocolService] Sending JSON payload') const jsonStr = JSON.stringify(data) payload = new Uint8Array(Buffer.from(jsonStr)) } const device = this.client.getConnectedDevice() console.log('send-------------device', device) const mtu = device?.mtu || 23 // MTU - 3 bytes (ATT overhead) - Protocol Header - Protocol Footer const maxPayloadSize = mtu - 3 - FRAME_CONSTANTS.HEADER_SIZE - FRAME_CONSTANTS.FOOTER_SIZE // Ensure reasonable bounds (at least 1 byte, max constrained by protocol constant) const safeMaxDataSize = Math.max(1, Math.min(maxPayloadSize, FRAME_CONSTANTS.MAX_DATA_SIZE)) console.debug(`[BleProtocolService] Sending with MTU=${mtu}, maxDataSize=${safeMaxDataSize}`) const rawPayloadHex = payload.reduce((acc, val) => acc + val.toString(16).padStart(2, '0') + ' ', '') const formattedRawPayloadHex = rawPayloadHex.substring(0, 512 * 2) + '\n......\n' + rawPayloadHex.substring(rawPayloadHex.length - 512 * 2) console.debug( `[BleProtocolService] Sending payload size=${payload.byteLength}, raw payload hex=\n${formattedRawPayloadHex}`, ) const frames = ProtocolManager.createFrame(type, payload, FRAME_CONSTANTS.HEAD_APP_TO_DEVICE, safeMaxDataSize) const total = frames.length console.debug(`Sending ${total} frames`) // 使用滑动窗口流控发送 await this.sendFramesWithFlowControl(deviceId, frames, onProgress) } /** * 串行发送帧数据 + 自适应间隔 * 硬件不支持并发,采用串行发送 + 动态调整间隔策略 */ private async sendFramesWithFlowControl( deviceId: string, frames: Uint8Array[], onProgress?: (progress: number) => void, ): Promise { const total = frames.length const maxRetries = FRAME_CONSTANTS.FLOW_CONTROL.MAX_RETRIES const retryDelay = FRAME_CONSTANTS.FLOW_CONTROL.RETRY_DELAY // 性能监控指标 const startTime = Date.now() let failedCount = 0 let retriedCount = 0 let totalRetries = 0 // 自适应间隔参数 const MIN_INTERVAL = 40 // 最小间隔 40ms const MAX_INTERVAL = 150 // 最大间隔 150ms const INITIAL_INTERVAL = 50 // 初始间隔 50ms(比原来的35ms保守) let currentInterval = INITIAL_INTERVAL let consecutiveSuccesses = 0 let consecutiveFailures = 0 console.debug(`[FlowControl] Starting serial transmission: ${total} frames, initial interval: ${currentInterval}ms`) for (let i = 0; i < total; i++) { const frame = frames[i] // 打印前几帧的原始数据用于调试 if (i < 3) { const rawFrame = Array.from(frame) .map((b) => b.toString(16).padStart(2, '0')) .join(' ') console.debug(`raw ${i + 1} frame \n ${rawFrame}`) } const base64 = Buffer.from(frame).toString('base64') let frameSucceeded = false // 重试逻辑(第一帧失败直接报错,不重试) const frameMaxRetries = i === 0 ? 0 : maxRetries for (let attempt = 0; attempt <= frameMaxRetries; attempt++) { try { // 串行等待每次写入完成 await this.client.write(deviceId, BLE_UUIDS.SERVICE, BLE_UUIDS.WRITE_CHARACTERISTIC, base64, false) // 写入成功 frameSucceeded = true consecutiveSuccesses++ consecutiveFailures = 0 if (attempt > 0) { retriedCount++ totalRetries += attempt console.debug(`[FlowControl] Frame ${i} succeeded after ${attempt} retries`) } // 自适应调整:连续成功 5 次,逐渐减小间隔(加速) if (consecutiveSuccesses >= 5 && currentInterval > MIN_INTERVAL) { currentInterval = Math.max(MIN_INTERVAL, currentInterval - 5) console.debug(`[FlowControl] Speed up: interval reduced to ${currentInterval}ms`) consecutiveSuccesses = 0 } break } catch (error) { // 第一帧失败直接报错 if (i === 0) { console.error(`[FlowControl] First frame write failed, aborting:`, error) throw new Error(`First frame write failed: ${(error as Error)?.message || error}`) } consecutiveFailures++ consecutiveSuccesses = 0 console.warn(`[FlowControl] Frame ${i} write failed (attempt ${attempt + 1}/${frameMaxRetries + 1}):`, error) // 如果还有重试机会,等待后重试 if (attempt < frameMaxRetries) { const backoffDelay = retryDelay * (attempt + 1) await new Promise((resolve) => setTimeout(resolve, backoffDelay)) } else { // 所有重试都失败 failedCount++ console.error(`[FlowControl] Frame ${i} failed after ${frameMaxRetries} retries`) // 严重失败:大幅增加间隔 currentInterval = Math.min(MAX_INTERVAL, currentInterval * 2) console.warn(`[FlowControl] Severe failure: interval increased to ${currentInterval}ms`) } } } // 自适应调整:连续失败,增加间隔(退避) if (consecutiveFailures >= 2 && currentInterval < MAX_INTERVAL) { currentInterval = Math.min(MAX_INTERVAL, currentInterval * 1.5) console.debug(`[FlowControl] Slow down: interval increased to ${currentInterval}ms`) } // 间隔等待(只有成功才继续,失败会在重试中已经等待过) if (frameSucceeded) { await new Promise((resolve) => setTimeout(resolve, currentInterval)) } // 更新进度 if (onProgress) { onProgress((i + 1) / total) } } // 性能统计 const duration = Date.now() - startTime const throughput = total > 0 ? (total / duration) * 1000 : 0 const avgRetriesPerFailedFrame = failedCount > 0 ? totalRetries / retriedCount : 0 console.log( `[FlowControl] Transmission completed: - Total frames: ${total} - Duration: ${duration}ms - Throughput: ${throughput.toFixed(2)} frames/sec - Failed: ${failedCount} - Frames retried: ${retriedCount} - Total retry attempts: ${totalRetries} - Avg retries per failed frame: ${avgRetriesPerFailedFrame.toFixed(2)} - Final interval: ${currentInterval}ms - Success rate: ${(((total - failedCount) / total) * 100).toFixed(2)}%`, ) if (failedCount > 0) { throw new Error(`Transmission incomplete: ${failedCount}/${total} frames failed`) } } }