import std from '../../dxmodules/dxStd.js' import config from '../../dxmodules/dxConfig.js' import logger from '../../dxmodules/dxLogger.js' import dxCommonUtils from '../../dxmodules/dxCommonUtils.js' import dxMap from '../../dxmodules/dxMap.js' import sqliteService from '../service/sqliteService.js' import driver from '../driver.js' import mqttService from '../service/mqttService.js' /** * 通行记录上报工作线程 (通道2 - 兜底全量上报) * * 职责: * - 常驻运行,只要 MQTT 连接就持续上报 * - 每次加载所有未删除的通行记录(d1_pass_record) * - 每隔 5 秒上报一条 * - 发完当前批次后,自动重新加载最新记录 */ const mqtt_map = dxMap.get("MQTT") const UPLOAD_INTERVAL_MS = 5000 // 每5秒1条 let sendTimer = null let currentBatch = [] // 当前待发批次 let currentIndex = 0 // 当前发送到第几条 function isMqttConnected() { return mqtt_map.get("MQTT_STATUS") === "connected" } // 加载新批次:查询所有未删除的记录(按时间升序) function loadNewBatch() { try { currentBatch = sqliteService.d1_pass_record.findAllOrderByTimeStampAsc() currentIndex = 0 } catch (err) { logger.error("[passRecordWorker] Failed to load batch:", err) currentBatch = [] currentIndex = 0 } } // 发送下一条 function sendNext() { if (!isMqttConnected()) return // 如果当前批次发完了,加载新批次 if (currentIndex >= currentBatch.length) { loadNewBatch() if (currentBatch.length === 0) { return // 无记录可发 } } const record = currentBatch[currentIndex] if (!record || !record.id) { currentIndex++ return } let extra = record.extra ? JSON.parse(record.extra) : "" let accessRecord = { userId: record.userId, type: record.type, result: record.result, name: extra && extra.name ? extra.name : "", timeStamp: record.timeStamp, extra: {}, error: record.message } if (record.type == 300) { if (std.exist(record.code) && config.get("access.uploadToCloud")) { accessRecord.code = dxCommonUtils.fs.fileToBase64(record.code) if (currentIndex > 0) { currentBatch[currentIndex - 1].code = "" } } else { accessRecord.code = "" } } // 发送 try { driver.mqtt.send( "access_device/v2/event/access", JSON.stringify( mqttService.mqttReply( record.id, [accessRecord], mqttService.CODE.S_000 ) ) ) } catch (e) { logger.error("[passRecordWorker] Send failed for record", record.id, e) } currentIndex++ } // 启动常驻上报循环 function startWorker() { if (sendTimer) return loadNewBatch() // 启动时先加载第一批 sendTimer = std.setInterval(sendNext, UPLOAD_INTERVAL_MS) } // 启动 try { startWorker() } catch (error) { logger.error("[passRecordWorker] init error:", error) }