package com.ld.igds.protocol.bhzn.verb.server;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.alibaba.fastjson.TypeReference;
|
import com.ld.igds.protocol.bhzn.verb.analysis.AnalysisService;
|
import com.ld.igds.protocol.bhzn.verb.dto.IoMessage;
|
import com.ld.igds.protocol.bhzn.verb.msg.ReMessageBuilder;
|
import com.ld.igds.util.BytesUtil;
|
import com.ld.igds.util.ContextUtil;
|
import com.ld.igds.util.SpringUtil;
|
import com.ld.io.api.IoMsgConsumer;
|
import com.ld.io.api.IoSession;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Service;
|
|
import static com.ld.igds.protocol.bhzn.verb.server.BhznVerbServerUtils.CHARSET;
|
|
/**
|
*
|
*/
|
@Slf4j
|
@Service
|
public class BhznVerbMessageConsumer implements IoMsgConsumer {
|
|
|
@Autowired
|
private AnalysisService analysisService;
|
|
/**
|
* 处理消息
|
*
|
* @param session
|
* @param bytes
|
*/
|
@Override
|
public void consume(IoSession session, byte[] bytes) {
|
if (null == bytes) {
|
log.error("Reply bytes is null");
|
return;
|
}
|
//添加信息心跳标记
|
BhznVerbServerUtils.addHeartBeat(session);
|
IoMessage reMessage = null;
|
try {
|
String message = new String(bytes, CHARSET);
|
message = message.substring(message.indexOf(BhznVerbServerUtils.MSG_START1) + BhznVerbServerUtils.MSG_START1.length());
|
log.info("【"+session.getAddress()+"】控制柜------>>>>平台:"+message);
|
reMessage = JSON.parseObject(message, IoMessage.class);
|
JSONObject jsonObject = JSON.parseObject(message);
|
reMessage.setContentStr(jsonObject.getString("content"));
|
if (
|
reMessage.getCmd() == null) {
|
|
log.error("控制柜------>>>>平台:报文信息不完整,不做解析-IP={},port={},msg={}", session.getAddress(), session.getPort(), message);
|
return;
|
}
|
if(StringUtils.isEmpty(reMessage.getSn())){
|
//气象站
|
reMessage.setSn("9999");
|
}
|
//根据SN获取组织编码
|
String companyId = ContextUtil.getCompanyIdBySn(reMessage.getSn());
|
if (null == companyId) {
|
log.error("控制柜------>>>>平台:报文信息不完整,当前分机系统未注册,不做解析-IP={},port={},msg={}", session.getAddress(), session.getPort(), message);
|
return;
|
}
|
|
reMessage.setCompanyId(companyId);
|
reMessage.setIp(session.getAddress());
|
reMessage.setPort(session.getPort());
|
reMessage.setStrMsg(message);
|
session.setCompanyId(companyId);
|
|
log.info("控制柜------>>>>平台:IP={},PORT={},message={}", reMessage.getIp(), reMessage.getPort(), message);
|
|
} catch (Exception e) {
|
log.error("控制柜------>>>>平台:收到报文解析异常:{}" , e.getMessage());
|
log.error(e.getMessage(),e);
|
e.printStackTrace();
|
}
|
|
//调用解析接口开始解析
|
if (null == analysisService) {
|
analysisService = (AnalysisService) SpringUtil.getBean(AnalysisService.BEAN_ID);
|
}
|
analysisService.analysis(reMessage);
|
}
|
}
|