1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import std from '../../dxmodules/dxStd.js'
import config from '../../dxmodules/dxConfig.js'
import logger from '../../dxmodules/dxLogger.js'
import dxCommonUtils from '../../dxmodules/dxCommonUtils.js'
import dxMap from '../../dxmodules/dxMap.js'
import sqliteService from '../service/sqliteService.js'
import driver from '../driver.js'
import mqttService from '../service/mqttService.js'
 
/**
 * 通行记录上报工作线程 (通道2 - 兜底全量上报)
 *
 * 职责:
 * - 常驻运行,只要 MQTT 连接就持续上报
 * - 每次加载所有未上报的通行记录(d1_pass_record)
 * - 每隔 5 秒上报一条
 * - 发完当前批次后,自动重新加载最新记录
 */
 
const mqtt_map = dxMap.get("MQTT")
 
const UPLOAD_INTERVAL_MS = 5000 // 每5秒1条
 
let sendTimer = null
let currentBatch = [] // 当前待发批次
let currentIndex = 0   // 当前发送到第几条
 
function isMqttConnected() {
    return mqtt_map.get("MQTT_STATUS") === "connected"
}
 
// 加载新批次:查询所有未删除且未上报的记录(按时间升序)
function loadNewBatch() {
    try {
        let sql = `SELECT * FROM d1_pass_record WHERE reported = 0 OR reported IS NULL ORDER BY timeStamp ASC`
        let result = sqliteService.select(sql)
        currentBatch = result || []
        currentIndex = 0
    } catch (err) {
        logger.error("[passRecordWorker] Failed to load batch:", err)
        currentBatch = []
        currentIndex = 0
    }
}
 
// 发送下一条
function sendNext() {
    if (!isMqttConnected()) return
 
    // 如果当前批次发完了,加载新批次
    if (currentIndex >= currentBatch.length) {
        loadNewBatch()
        if (currentBatch.length === 0) {
            return // 无记录可发
        }
    }
 
    const record = currentBatch[currentIndex]
    if (!record || !record.id) {
        currentIndex++
        return
    }
    let extra = record.extra ? JSON.parse(record.extra) : ""
    let extra2 = record.extra2 ? JSON.parse(record.extra2) : ""
    let accessRecord = {
        timeStamp: mqttService.timestampToDateString(record.timeStamp || 0),
        result: record.result || 0,
        error: record.message || "",
        // permissionId: record.permissionId || "", // TODO
        door: config.get("houseName") || "",
        users: [
            {
                userId: record.userId || "",
                name: extra && extra.name ? extra.name : "",
                userType: extra && extra.type ? extra.type : 0,
                accessType: extra && extra.accessType !== undefined ? extra.accessType : "",
                card: extra && extra.card ? extra.card : "",
                face: extra && extra.face ? extra.face : "",
                finge: extra && extra.finge ? extra.finge : ""
            }
        ]
    }
    
    // 如果是双人认证,添加第二个用户信息
    if (record.userId2) {
        let secondUser = {
            userId: record.userId2 || "",
            name: extra2 && extra2.name ? extra2.name : "",
            userType: extra2 && extra2.type ? extra2.type : 0,
            accessType: extra2 && extra2.accessType !== undefined ? extra2.accessType : "",
            card: extra2 && extra2.card ? extra2.card : "",
            face: extra2 && extra2.face ? extra2.face : "",
            finge: extra2 && extra2.finge ? extra2.finge : ""
        }
        // 根据认证类型添加相应的认证信息
        if (extra2 && extra2.accessType == 200 && extra2.card) {
            secondUser.card = extra2.card
        } else if (extra2 && extra2.accessType == 300) {
            // 人脸认证,face字段可以留空或添加相应信息
        } else if (extra2 && extra2.accessType == 500) {
            // 指纹认证,finge字段可以留空或添加相应信息
        }
        accessRecord.users.push(secondUser)
    }
    
    if (record.type == 300) {
        if (std.exist(record.code)) {
            // 人脸认证,将图片转换为base64填充到face字段
            accessRecord.users[0].face = dxCommonUtils.fs.fileToBase64(record.code)
            if (currentIndex > 0) {
                currentBatch[currentIndex - 1].code = ""
            }
        } else {
            accessRecord.users[0].face = ""
        }
        
        // 如果有第二个用户的二维码
        if (record.code2) {
            accessRecord.users[1] = accessRecord.users[1] || {}
            accessRecord.users[1].code = record.code2
        }
    }
    // 发送
    try {
        const sn = config.get("sys.sn") || "default"
        driver.mqtt.send(
            `access_device/v2/event/${sn}/access`,
            JSON.stringify(
                mqttService.mqttReply(
                    record.id,
                    [accessRecord],
                    mqttService.CODE.S_000
                )
            )
        )
        // 标记记录为已上报
        try {
            let updateSql = `UPDATE d1_pass_record SET reported = 1 WHERE id = '${record.id}'`
            sqliteService.exec(updateSql)
        } catch (updateErr) {
            logger.error("[passRecordWorker] Failed to update reported status for record", record.id, updateErr)
        }
    } catch (e) {
        logger.error("[passRecordWorker] Send failed for record", record.id, e)
    }
 
    currentIndex++
}
 
// 启动常驻上报循环
function startWorker() {
    if (sendTimer) return
    loadNewBatch() // 启动时先加载第一批
    sendTimer = std.setInterval(sendNext, UPLOAD_INTERVAL_MS)
}
 
// 启动
try {
    startWorker()
} catch (error) {
    logger.error("[passRecordWorker] init error:", error)
}