import std from '../../dxmodules/dxStd.js' import config from '../../dxmodules/dxConfig.js' import logger from '../../dxmodules/dxLogger.js' import dxCommonUtils from '../../dxmodules/dxCommonUtils.js' import dxMap from '../../dxmodules/dxMap.js' import sqliteService from '../service/sqliteService.js' import driver from '../driver.js' import mqttService from '../service/mqttService.js' /** * 通行记录上报工作线程 (通道2 - 兜底全量上报) * * 职责: * - 常驻运行,只要 MQTT 连接就持续上报 * - 每次加载所有未上报的通行记录(d1_pass_record) * - 每隔 5 秒上报一条 * - 发完当前批次后,自动重新加载最新记录 */ const mqtt_map = dxMap.get("MQTT") const UPLOAD_INTERVAL_MS = 5000 // 每5秒1条 let sendTimer = null let currentBatch = [] // 当前待发批次 let currentIndex = 0 // 当前发送到第几条 function isMqttConnected() { return mqtt_map.get("MQTT_STATUS") === "connected" } // 加载新批次:查询所有未删除且未上报的记录(按时间升序) function loadNewBatch() { try { let sql = `SELECT * FROM d1_pass_record WHERE reported = 0 OR reported IS NULL ORDER BY timeStamp ASC` let result = sqliteService.select(sql) currentBatch = result || [] currentIndex = 0 } catch (err) { logger.error("[passRecordWorker] Failed to load batch:", err) currentBatch = [] currentIndex = 0 } } // 发送下一条 function sendNext() { if (!isMqttConnected()) return // 如果当前批次发完了,加载新批次 if (currentIndex >= currentBatch.length) { loadNewBatch() if (currentBatch.length === 0) { return // 无记录可发 } } const record = currentBatch[currentIndex] if (!record || !record.id) { currentIndex++ return } let extra = record.extra ? JSON.parse(record.extra) : "" let extra2 = record.extra2 ? JSON.parse(record.extra2) : "" let accessRecord = { timeStamp: mqttService.timestampToDateString(record.timeStamp || 0), result: record.result || 0, error: record.message || "", // permissionId: record.permissionId || "", // TODO door: config.get("houseName") || "", users: [ { userId: record.userId || "", name: extra && extra.name ? extra.name : "", userType: extra && extra.type ? extra.type : 0, accessType: extra && extra.accessType !== undefined ? extra.accessType : "", card: extra && extra.card ? extra.card : "", face: extra && extra.face ? extra.face : "", finge: extra && extra.finge ? extra.finge : "" } ] } // 如果是双人认证,添加第二个用户信息 if (record.userId2) { let secondUser = { userId: record.userId2 || "", name: extra2 && extra2.name ? extra2.name : "", userType: extra2 && extra2.type ? extra2.type : 0, accessType: extra2 && extra2.accessType !== undefined ? extra2.accessType : "", card: extra2 && extra2.card ? extra2.card : "", face: extra2 && extra2.face ? extra2.face : "", finge: extra2 && extra2.finge ? extra2.finge : "" } // 根据认证类型添加相应的认证信息 if (extra2 && extra2.accessType == 200 && extra2.card) { secondUser.card = extra2.card } else if (extra2 && extra2.accessType == 300) { // 人脸认证,face字段可以留空或添加相应信息 } else if (extra2 && extra2.accessType == 500) { // 指纹认证,finge字段可以留空或添加相应信息 } accessRecord.users.push(secondUser) } if (record.type == 300) { if (std.exist(record.code)) { // 人脸认证,将图片转换为base64填充到face字段 accessRecord.users[0].face = dxCommonUtils.fs.fileToBase64(record.code) if (currentIndex > 0) { currentBatch[currentIndex - 1].code = "" } } else { accessRecord.users[0].face = "" } // 如果有第二个用户的二维码 if (record.code2) { accessRecord.users[1] = accessRecord.users[1] || {} accessRecord.users[1].code = record.code2 } } // 发送 try { const sn = config.get("sys.sn") || "default" driver.mqtt.send( `access_device/v2/event/${sn}/access`, JSON.stringify( mqttService.mqttReply( record.id, [accessRecord], mqttService.CODE.S_000 ) ) ) // 标记记录为已上报 try { let updateSql = `UPDATE d1_pass_record SET reported = 1 WHERE id = '${record.id}'` sqliteService.exec(updateSql) } catch (updateErr) { logger.error("[passRecordWorker] Failed to update reported status for record", record.id, updateErr) } } catch (e) { logger.error("[passRecordWorker] Send failed for record", record.id, e) } currentIndex++ } // 启动常驻上报循环 function startWorker() { if (sendTimer) return loadNewBatch() // 启动时先加载第一批 sendTimer = std.setInterval(sendNext, UPLOAD_INTERVAL_MS) } // 启动 try { startWorker() } catch (error) { logger.error("[passRecordWorker] init error:", error) }