| | |
| | | import org.eclipse.paho.client.mqttv3.*; |
| | | import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.annotation.Bean; |
| | | import org.springframework.context.annotation.Configuration; |
| | | import org.springframework.integration.annotation.IntegrationComponentScan; |
| | | import org.springframework.integration.annotation.ServiceActivator; |
| | | import org.springframework.integration.channel.DirectChannel; |
| | | import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; |
| | | import org.springframework.integration.mqtt.core.MqttPahoClientFactory; |
| | | import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; |
| | | import org.springframework.messaging.MessageChannel; |
| | | import org.springframework.messaging.MessageHandler; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | |
| | | @Configuration |
| | | @Slf4j |
| | | @IntegrationComponentScan |
| | | public class MqttProviderConfig { |
| | | |
| | | @Autowired |
| | | private MqttProperties mqttProperties; |
| | | @Autowired |
| | | private MqttProviderCallBack mqttProviderCallBack; |
| | | /** |
| | | * 客户端对象 |
| | | */ |
| | | private MqttClient client; |
| | | |
| | | /** |
| | | * |
| | | * 在bean初始化后连接到服务器 |
| | | */ |
| | | @PostConstruct |
| | | public void init(){ |
| | | connect(); |
| | | @Bean |
| | | public MqttConnectOptions getReceiverMqttConnectOptionsForSend() { |
| | | MqttConnectOptions mqttConnectOptions = new MqttConnectOptions(); |
| | | mqttConnectOptions.setUserName(mqttProperties.getUsername()); |
| | | mqttConnectOptions.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | List<String> hostList = Arrays.asList(mqttProperties.getHost().trim().split(",")); |
| | | String[] serverURIs = new String[hostList.size()]; |
| | | hostList.toArray(serverURIs); |
| | | mqttConnectOptions.setServerURIs(serverURIs); |
| | | mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); |
| | | |
| | | return mqttConnectOptions; |
| | | } |
| | | |
| | | /** |
| | | * 客户端连接服务端 |
| | | */ |
| | | public void connect(){ |
| | | try{ |
| | | //创建MQTT客户端对象 |
| | | client = new MqttClient(mqttProperties.getHost(),mqttProperties.getClientId(),new MemoryPersistence()); |
| | | //连接设置 |
| | | MqttConnectOptions options = new MqttConnectOptions(); |
| | | //是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息 |
| | | //设置为true表示每次连接服务器都是以新的身份 |
| | | options.setCleanSession(true); |
| | | //设置连接用户名 |
| | | options.setUserName(mqttProperties.getUsername()); |
| | | //设置连接密码 |
| | | options.setPassword(mqttProperties.getPassword().toCharArray()); |
| | | //设置超时时间,单位为秒 |
| | | options.setConnectionTimeout(100); |
| | | //设置心跳时间 单位为秒,表示服务器每隔 1.5*20秒的时间向客户端发送心跳判断客户端是否在线 |
| | | options.setKeepAliveInterval(20); |
| | | //设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息 |
| | | options.setWill("willTopic",(mqttProperties.getClientId()+ "与服务器断开连接").getBytes(),0,false); |
| | | //设置回调 |
| | | client.setCallback(mqttProviderCallBack); |
| | | client.connect(options); |
| | | } catch(MqttException e){ |
| | | e.printStackTrace(); |
| | | } |
| | | @Bean |
| | | public MqttPahoClientFactory receiverMqttClientFactoryForSend() { |
| | | DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); |
| | | factory.setConnectionOptions(getReceiverMqttConnectOptionsForSend()); |
| | | return factory; |
| | | } |
| | | |
| | | public void publish(String topic,String message){ |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(mqttProperties.getQos()); |
| | | mqttMessage.setRetained(true); |
| | | mqttMessage.setPayload(message.getBytes()); |
| | | //主题的目的地,用于发布/订阅信息 |
| | | MqttTopic mqttTopic = client.getTopic(topic); |
| | | //提供一种机制来跟踪消息的传递进度 |
| | | //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 |
| | | MqttDeliveryToken token; |
| | | try { |
| | | //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 |
| | | //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 |
| | | token = mqttTopic.publish(mqttMessage); |
| | | token.waitForCompletion(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | @Bean |
| | | @ServiceActivator(inputChannel = "mqttOutboundChannel") |
| | | public MessageHandler mqttOutbound() { |
| | | MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientOutId(), receiverMqttClientFactoryForSend()); |
| | | messageHandler.setAsync(false); |
| | | messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); |
| | | return messageHandler; |
| | | } |
| | | |
| | | public void publish(int qos,boolean retained,String topic,String message){ |
| | | MqttMessage mqttMessage = new MqttMessage(); |
| | | mqttMessage.setQos(qos); |
| | | mqttMessage.setRetained(retained); |
| | | mqttMessage.setPayload(message.getBytes()); |
| | | //主题的目的地,用于发布/订阅信息 |
| | | MqttTopic mqttTopic = client.getTopic(topic); |
| | | //提供一种机制来跟踪消息的传递进度 |
| | | //用于在以非阻塞方式(在后台运行)执行发布是跟踪消息的传递进度 |
| | | MqttDeliveryToken token; |
| | | try { |
| | | //将指定消息发布到主题,但不等待消息传递完成,返回的token可用于跟踪消息的传递状态 |
| | | //一旦此方法干净地返回,消息就已被客户端接受发布,当连接可用,将在后台完成消息传递。 |
| | | token = mqttTopic.publish(mqttMessage); |
| | | token.waitForCompletion(); |
| | | } catch (MqttException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | @Bean |
| | | public MessageChannel mqttOutboundChannel() { |
| | | return new DirectChannel(); |
| | | } |
| | | |
| | | } |