Android基于MQTT实现发布消息与监听topic接收消息
Android基于MQTT实现发布消息与监听topic接收消息
-
- 标题
- 代码中所用的地址都是测试地址,禁止复制粘贴使用于他处。
- 以上就是所有的代码
- 附上demo源码。
- CSDN:[点击下载源码](https://download.csdn.net/download/qq_35840038/12252764)
- GitHub:
- **如果有什么问题,欢迎大家指导。并相互联系,希望能够通过文章互相学习。**
老规矩,先上图:
标题
MQTT 是一种基于发布/订阅范式的“轻量级”消息协议,由 IBM 发布。
MQTT 可以被解释为一种低开销,低带宽占用的即时通讯协议,可以用极少的代码和带宽的为连接远程设备提供实时可靠的消息服务,它适用于硬件性能低下的远程设备以及网络状况糟糕的环境下,因此 MQTT 协议在 IoT(Internet of things,物联网),小型设备应用,移动应用等方面有较广泛的应用。
IoT 设备要运作,就必须连接到互联网,设备才能相互协作,以及与后端服务协同工作。而互联网的基础网络协议是 TCP/IP,MQTT 协议是基于 TCP/IP 协议栈而构建的,因此它已经慢慢的已经成为了 IoT 通讯的标准。
个人见解:mqtt比起http来说更轻便,使用更简单,更便利,不需要过多的操作。
代码中所用的地址都是测试地址,禁止复制粘贴使用于他处。
下来直接看代码吧
首先,在app下的build.gradle里面添加mqtt;
//mqtt implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
然后配置xml(三个textview):
主要代码都在activity里面,如下:
package com.bangni.mqtt_test;import android.annotation.SuppressLint;import android.os.Handler;import android.os.Message;import android.support.v7.app.AppCompatActivity;import android.os.Bundle;import android.util.Log;import android.view.View;import android.widget.TextView;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;import org.json.JSONObject;/** * Mqtt测试 */public class MainActivity extends AppCompatActivity implements View.OnClickListener { //订阅mqtt地址(此地址一直在发消息供接收) private String publish = "ddzl/broker/projector/clientid/command/msg"; //跟上面那个一样, private String East_PU = "ddzl/pu/broker/clientid/communication/msg"; //发布的消息体 private String East_INIT = "ddzl/pu/broker/clientid/get_elevator_init/msg"; //Mac是唯一的 private String Mac = "72:67:2d:31:d3:48"; private TextView txt_dis, txt_publish, txt_content; //连接client public MqttClient mqttClient; //订阅的集合 private String[] topicSubList; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); initView(); connMqttClient(); } private void initView() { txt_dis = findViewById(R.id.txt_dis); txt_publish = findViewById(R.id.txt_publish); txt_content = findViewById(R.id.txt_content); txt_dis.setOnClickListener(this); txt_publish.setOnClickListener(this); } /** * 连接MQTT */ private void connMqttClient() { new Thread(){ @Override public void run() { String host = "tcp://39.100.88.26:1883"; MqttConnectOptions options = new MqttConnectOptions(); //断开后,是否自动连接// options.setAutomaticReconnect(true); options.setUserName("xupeng"); options.setPassword("000".toCharArray()); options.setCleanSession(false); try { MemoryPersistence persistence = new MemoryPersistence(); mqttClient = new MqttClient(host, "来自呼伦贝尔大草原的驴" + Mac + System.currentTimeMillis(), persistence); mqttClient.setCallback(mqttCallback);//设置回调函数 if (mqttClient.isConnected()) { mqttClient.close(); mqttClient.disconnect(); } mqttClient.connect(options);//连接broker //设置需要订阅的主题集合 topicSubList = new String[]{publish, East_PU, East_INIT}; for (int i = 0; i 0) { for (int i = 0; i < topicSubList.length; i++) {//设置监听的topicmqttClient.subscribe(topicSubList[i], 1); } } } catch (MqttException e) { e.printStackTrace(); } } }.start(); } MqttCallbackExtended mqttCallback = new MqttCallbackExtended() { /** * 连接成功 * @param reconnect * @param serverURI */ @Override public void connectComplete(boolean reconnect, String serverURI) { Log.d("TYY_CC", "连接成功" + mqttClient.isConnected()); } @Override public void connectionLost(Throwable cause) { //这里不需要做处理 //因为在option有一个方法可以自动重连(如下) //断开后,是否自动连接 //options.setAutomaticReconnect(true); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { //收到的数据 final String content0 = new String(message.getPayload(), "GB2312");//GB2312 Log.d("MyLog_TYY_CC", "订阅的topic是:" + topic); Log.d("MyLog_TYY_CC", "接收到消息是" + content0); runOnUiThread(new Runnable() { @Override public void run() { txt_content.setText(System.currentTimeMillis() + ":接收到的消息是:" + content0); } }); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { token.waitForCompletion(); } catch (MqttException e) { e.printStackTrace(); } } }; @Override public void onClick(View v) { int temdId = v.getId(); if(temdId == R.id.txt_dis){ try { if (mqttClient.isConnected()) { mqttClient.disconnect(); } }catch (Exception e){ e.printStackTrace(); } runOnUiThread(new Runnable() { @Override public void run() { txt_content.setText(""); } }); }else if(temdId == R.id.txt_publish){ //请求 new request().run(); } } /** * 获取初始化消息 */ class request implements Runnable { @Override public void run() { JSONObject object = new JSONObject(); try { object.put("taskId", System.currentTimeMillis()); object.put("taskType", "get_elevator_init"); object.put("mac", Mac); object.put("json", ""); object.put("order", ""); object.put("data", ""); //上报给云端 Message msg = resultHandler.obtainMessage(); msg.obj = object.toString(); resultHandler.sendMessage(msg); }catch (Exception e){ e.printStackTrace(); } } } /** * 发布的handler */ @SuppressLint("HandlerLeak") Handler resultHandler = new Handler(){ @Override public void handleMessage(Message msg) { try { MqttMessage mqttMsg0 = new MqttMessage(); mqttMsg0.setPayload(msg.obj.toString().getBytes()); mqttMsg0.setQos(1); if(mqttClient != null && mqttClient.isConnected()){ mqttClient.getTopic(East_INIT).publish(mqttMsg0); } } catch (MqttException e) { connMqttClient(); e.printStackTrace(); } } };}
注:当mqtt订阅多条的时候需要逐条订阅,否则有可能订阅失败。
以上就是所有的代码
附上demo源码。
CSDN:点击下载源码
GitHub:
q:486789970
email:[email protected]
如果有什么问题,欢迎大家指导。并相互联系,希望能够通过文章互相学习。
---财财亲笔