北海网站建设公司/外贸接单平台哪个最好
文章目录
- 简化方案
- 1、创建数据库
- 2、安装paho库
- 3、安装pymysql库
- 4、使用python连接MQTT服务器
- 5. 将数据写入MySQL数据库
推送报文格式为:
{"a":value,"b":value}
需要将里面的a、b的值存入MySQL数据库,并将接受时间保存进数据库。
简化方案
在服务器上,模拟一台设备,订阅需要接收的主题,当接收到该主题发送来的数据的时候,将报文里面的两个数值保存到MySQL数据库里面。
1、创建数据库
三个字段,分别是是时间、a、b
CREATE TABLE `mqtt_test`.`test` ( `get_time` DATETIME NOT NULL , `a` INT NOT NULL , `b` INT NOT NULL ) ENGINE = InnoDB;
2、安装paho库
paho库用于完成mqtt通讯
anaconda下或云服务器下,在prompt下安装paho库:
pip install paho-mqtt
3、安装pymysql库
pymysql库用于完成mysql操作
pip install pymysql
4、使用python连接MQTT服务器
# -*- coding: utf-8 -*-#!/usr/bin/python
# -*- coding: utf-8 -*import paho.mqtt.client as mqtt
import json
import pymysql
import timedef gettime():time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())return time1# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883username = 'username '
password = 'password '# 订阅主题名
topic = 'test'# 连接后事件
def on_connect(client, userdata, flags, respons_code):if respons_code == 0:# 连接成功print('Connection Succeed!')else:# 连接失败并显示错误代码print('Connect Error status {0}'.format(respons_code))# 订阅信息client.subscribe(topic)# 接收到数据后事件
def on_message(client, userdata, msg):# 打印订阅消息主题# print("topic", msg.topic)# 打印消息数据jsondata=json.loads(msg.payload)print("msg payload", jsondata)def main():client = mqtt.Client()# 注册事件client.on_connect = on_connectclient.on_message = on_message# 设置账号密码(如果需要的话)client.username_pw_set(username, password=password)# 连接到服务器client.connect(host, port=port, keepalive=60)# 守护连接状态client.loop_forever()if __name__ == '__main__':main()
运行效果:
打开MQTT.FX,连接至MQTT服务器,并向test主题发送json信息,可以看到程序能够正确接收到报文。
5. 将数据写入MySQL数据库
收到信息后,将载荷信息按照json解析,然后储存如数据库中
# -*- coding: utf-8 -*-#!/usr/bin/python
# -*- coding: utf-8 -*import paho.mqtt.client as mqtt
import json
import pymysql
import timedef gettime():time1=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())return time1# 服务器地址
host = 'MQTT服务器地址'
# 通信端口 默认端口1883
port = 1883username = 'username'
password = 'password'# 订阅主题名
topic = 'test'# 连接后事件
def on_connect(client, userdata, flags, respons_code):if respons_code == 0:# 连接成功print('Connection Succeed!')else:# 连接失败并显示错误代码print('Connect Error status {0}'.format(respons_code))# 订阅信息client.subscribe(topic)# 接收到数据后事件
def on_message(client, userdata, msg):global dddd# 打印订阅消息主题# print("topic", msg.topic)# 打印消息数据jsondata=json.loads(msg.payload)print("msg payload", jsondata)sqlsave(jsondata)def main():client = mqtt.Client()# 注册事件client.on_connect = on_connectclient.on_message = on_message# 设置账号密码(如果需要的话)client.username_pw_set(username, password=password)# 连接到服务器client.connect(host, port=port, keepalive=60)# 守护连接状态client.loop_forever()#MySQL保存
def sqlsave(jsonData):# 打开数据库连接db = pymysql.connect(host="host_ip",user="user_name",password="password",database="mqtt_test",charset='utf8')# 使用cursor()方法获取操作游标 cursor = db.cursor()# SQL 插入语句try:sql = "INSERT INTO test (get_time,a,b) VALUES ('%s','%s','%s');" %(gettime(),jsonData['a'],jsonData['b'])cursor.execute(sql)db.commit()print("数据库保存成功!")except:pass# 关闭数据库连接db.close()if __name__ == '__main__':main()
运行结果如图所示: