import std from '../../dxmodules/dxStd.js'
|
import config from '../../dxmodules/dxConfig.js'
|
import logger from '../../dxmodules/dxLogger.js'
|
import dxCommonUtils from '../../dxmodules/dxCommonUtils.js'
|
import dxMap from '../../dxmodules/dxMap.js'
|
import sqliteService from '../service/sqliteService.js'
|
import driver from '../driver.js'
|
import mqttService from '../service/mqttService.js'
|
|
/**
|
* 通行记录上报工作线程 (通道2 - 兜底全量上报)
|
*
|
* 职责:
|
* - 常驻运行,只要 MQTT 连接就持续上报
|
* - 每次加载所有未上报的通行记录(d1_pass_record)
|
* - 每隔 5 秒上报一条
|
* - 发完当前批次后,自动重新加载最新记录
|
*/
|
|
const mqtt_map = dxMap.get("MQTT")
|
|
const UPLOAD_INTERVAL_MS = 5000 // 每5秒1条
|
|
let sendTimer = null
|
let currentBatch = [] // 当前待发批次
|
let currentIndex = 0 // 当前发送到第几条
|
|
function isMqttConnected() {
|
return mqtt_map.get("MQTT_STATUS") === "connected"
|
}
|
|
// 加载新批次:查询所有未删除且未上报的记录(按时间升序)
|
function loadNewBatch() {
|
try {
|
let sql = `SELECT * FROM d1_pass_record WHERE reported = 0 OR reported IS NULL ORDER BY timeStamp ASC`
|
let result = sqliteService.select(sql)
|
currentBatch = result || []
|
currentIndex = 0
|
} catch (err) {
|
logger.error("[passRecordWorker] Failed to load batch:", err)
|
currentBatch = []
|
currentIndex = 0
|
}
|
}
|
|
// 发送下一条
|
function sendNext() {
|
if (!isMqttConnected()) return
|
|
// 如果当前批次发完了,加载新批次
|
if (currentIndex >= currentBatch.length) {
|
loadNewBatch()
|
if (currentBatch.length === 0) {
|
return // 无记录可发
|
}
|
}
|
|
const record = currentBatch[currentIndex]
|
if (!record || !record.id) {
|
currentIndex++
|
return
|
}
|
let extra = record.extra ? JSON.parse(record.extra) : ""
|
let extra2 = record.extra2 ? JSON.parse(record.extra2) : ""
|
let accessRecord = {
|
timeStamp: mqttService.timestampToDateString(record.timeStamp || 0),
|
result: record.result || 0,
|
error: record.message || "",
|
// permissionId: record.permissionId || "", // TODO
|
door: config.get("houseName") || "",
|
users: [
|
{
|
userId: record.userId || "",
|
name: extra && extra.name ? extra.name : "",
|
userType: extra && extra.type ? extra.type : 0,
|
accessType: extra && extra.accessType !== undefined ? extra.accessType : "",
|
card: extra && extra.card ? extra.card : "",
|
face: extra && extra.face ? extra.face : "",
|
finge: extra && extra.finge ? extra.finge : ""
|
}
|
]
|
}
|
|
// 如果是双人认证,添加第二个用户信息
|
if (record.userId2) {
|
let secondUser = {
|
userId: record.userId2 || "",
|
name: extra2 && extra2.name ? extra2.name : "",
|
userType: extra2 && extra2.type ? extra2.type : 0,
|
accessType: extra2 && extra2.accessType !== undefined ? extra2.accessType : "",
|
card: extra2 && extra2.card ? extra2.card : "",
|
face: extra2 && extra2.face ? extra2.face : "",
|
finge: extra2 && extra2.finge ? extra2.finge : ""
|
}
|
// 根据认证类型添加相应的认证信息
|
if (extra2 && extra2.accessType == 200 && extra2.card) {
|
secondUser.card = extra2.card
|
} else if (extra2 && extra2.accessType == 300) {
|
// 人脸认证,face字段可以留空或添加相应信息
|
} else if (extra2 && extra2.accessType == 500) {
|
// 指纹认证,finge字段可以留空或添加相应信息
|
}
|
accessRecord.users.push(secondUser)
|
}
|
|
if (record.type == 300) {
|
if (std.exist(record.code)) {
|
// 人脸认证,将图片转换为base64填充到face字段
|
accessRecord.users[0].face = dxCommonUtils.fs.fileToBase64(record.code)
|
if (currentIndex > 0) {
|
currentBatch[currentIndex - 1].code = ""
|
}
|
} else {
|
accessRecord.users[0].face = ""
|
}
|
|
// 如果有第二个用户的二维码
|
if (record.code2) {
|
accessRecord.users[1] = accessRecord.users[1] || {}
|
accessRecord.users[1].code = record.code2
|
}
|
}
|
// 发送
|
try {
|
const sn = config.get("sys.sn") || "default"
|
driver.mqtt.send(
|
`access_device/v2/event/${sn}/access`,
|
JSON.stringify(
|
mqttService.mqttReply(
|
record.id,
|
[accessRecord],
|
mqttService.CODE.S_000
|
)
|
)
|
)
|
// 标记记录为已上报
|
try {
|
let updateSql = `UPDATE d1_pass_record SET reported = 1 WHERE id = '${record.id}'`
|
sqliteService.exec(updateSql)
|
} catch (updateErr) {
|
logger.error("[passRecordWorker] Failed to update reported status for record", record.id, updateErr)
|
}
|
} catch (e) {
|
logger.error("[passRecordWorker] Send failed for record", record.id, e)
|
}
|
|
currentIndex++
|
}
|
|
// 启动常驻上报循环
|
function startWorker() {
|
if (sendTimer) return
|
loadNewBatch() // 启动时先加载第一批
|
sendTimer = std.setInterval(sendNext, UPLOAD_INTERVAL_MS)
|
}
|
|
// 启动
|
try {
|
startWorker()
|
} catch (error) {
|
logger.error("[passRecordWorker] init error:", error)
|
}
|