1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import std from '../../dxmodules/dxStd.js'
import config from '../../dxmodules/dxConfig.js'
import logger from '../../dxmodules/dxLogger.js'
import dxCommonUtils from '../../dxmodules/dxCommonUtils.js'
import dxMap from '../../dxmodules/dxMap.js'
import sqliteService from '../service/sqliteService.js'
import driver from '../driver.js'
import mqttService from '../service/mqttService.js'
 
/**
 * 通行记录上报工作线程 (通道2 - 兜底全量上报)
 *
 * 职责:
 * - 常驻运行,只要 MQTT 连接就持续上报
 * - 每次加载所有未删除的通行记录(d1_pass_record)
 * - 每隔 5 秒上报一条
 * - 发完当前批次后,自动重新加载最新记录
 */
 
const mqtt_map = dxMap.get("MQTT")
 
const UPLOAD_INTERVAL_MS = 5000 // 每5秒1条
 
let sendTimer = null
let currentBatch = [] // 当前待发批次
let currentIndex = 0   // 当前发送到第几条
 
function isMqttConnected() {
    return mqtt_map.get("MQTT_STATUS") === "connected"
}
 
// 加载新批次:查询所有未删除的记录(按时间升序)
function loadNewBatch() {
    try {
        currentBatch = sqliteService.d1_pass_record.findAllOrderByTimeStampAsc() 
        currentIndex = 0
    } catch (err) {
        logger.error("[passRecordWorker] Failed to load batch:", err)
        currentBatch = []
        currentIndex = 0
    }
}
 
// 发送下一条
function sendNext() {
    if (!isMqttConnected()) return
 
    // 如果当前批次发完了,加载新批次
    if (currentIndex >= currentBatch.length) {
        loadNewBatch()
        if (currentBatch.length === 0) {
            return // 无记录可发
        }
    }
 
    const record = currentBatch[currentIndex]
    if (!record || !record.id) {
        currentIndex++
        return
    }
    let extra = record.extra ? JSON.parse(record.extra) : ""
    let accessRecord = {
        userId: record.userId,
        type: record.type,
        result: record.result,
        name: extra && extra.name ? extra.name : "",
        timeStamp: record.timeStamp,
        extra: {},
        error: record.message
    }
    if (record.type == 300) {
        if (std.exist(record.code) && config.get("access.uploadToCloud")) {
            accessRecord.code = dxCommonUtils.fs.fileToBase64(record.code)
            if (currentIndex > 0) {
                currentBatch[currentIndex - 1].code = ""
            }
        } else {
            accessRecord.code = ""
        }
    }
    // 发送
    try {
        driver.mqtt.send(
            "access_device/v2/event/access",
            JSON.stringify(
                mqttService.mqttReply(
                    record.id,
                    [accessRecord],
                    mqttService.CODE.S_000
                )
            )
        )
    } catch (e) {
        logger.error("[passRecordWorker] Send failed for record", record.id, e)
    }
 
    currentIndex++
}
 
// 启动常驻上报循环
function startWorker() {
    if (sendTimer) return
    loadNewBatch() // 启动时先加载第一批
    sendTimer = std.setInterval(sendNext, UPLOAD_INTERVAL_MS)
}
 
// 启动
try {
    startWorker()
} catch (error) {
    logger.error("[passRecordWorker] init error:", error)
}