54 lines
1.3 KiB
Python
54 lines
1.3 KiB
Python
import paho.mqtt.client as mqtt
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import time
|
|
import json
|
|
|
|
|
|
load_dotenv(dotenv_path=".env.test")
|
|
|
|
MQTT_HOST = os.getenv("MQTT_HOST")
|
|
MQTT_PORT = int(os.getenv("MQTT_PORT"))
|
|
MQTT_DEVICE_CODE = os.getenv("MQTT_DEVICE_CODE")
|
|
|
|
|
|
def on_connect_mock(client, userdata, flags, rc):
|
|
print(f"Connected with result code rc={rc}")
|
|
|
|
|
|
def test_mqtt_mock():
|
|
|
|
# connect処理
|
|
client = mqtt.Client()
|
|
client.on_connect = on_connect_mock
|
|
client.connect(MQTT_HOST, MQTT_PORT, 60)
|
|
topic = f"device/{MQTT_DEVICE_CODE}/command"
|
|
client.subscribe(topic, qos=1)
|
|
client.loop_start()
|
|
while True:
|
|
try:
|
|
print("publish start.")
|
|
topic = f"device/{MQTT_DEVICE_CODE}/send"
|
|
data = {"message": "test"}
|
|
message = json.dumps(data).encode('utf-8')
|
|
client.publish(topic, message, qos=1)
|
|
time.sleep(30)
|
|
except KeyboardInterrupt as e:
|
|
time.sleep(30)
|
|
break
|
|
except Exception as e:
|
|
print(f"error:{e}")
|
|
time.sleep(30)
|
|
|
|
print(f"end")
|
|
client.loop_stop()
|
|
|
|
|
|
def main():
|
|
print("Mqtt Mock Client Start.")
|
|
test_mqtt_mock()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|