import paho.mqtt.client as mqtt import os from dotenv import load_dotenv import time import json load_dotenv(dotenv_path=".env.test") MQTT_HOST = os.getenv("MQTT_HOST") MQTT_PORT = int(os.getenv("MQTT_PORT")) MQTT_DEVICE_CODE = os.getenv("MQTT_DEVICE_CODE") def on_connect_mock(client, userdata, flags, rc): print(f"on_connect: rc={rc}") def on_disconnect_mock(client, userdata, rc): print(f"on_disconnect: rc={rc}") if rc != 0: print("Unexpected disconnection.") def on_publish_mock(client, userdata, mid): print("on_publish: {0}".format(mid)) def on_subscribe_mock(mqttc, obj, mid, granted_qos): print("on_subscribe: "+str(mid)+" "+str(granted_qos)) def on_message_mock(client, userdata, msg): print(f"on_message") print("Received message '" + str(msg.payload) + "' on topic '" + msg.topic + "' with QoS " + str(msg.qos)) def test_mqtt_mock(): # connect処理 client = mqtt.Client() client.on_connect = on_connect_mock client.on_disconnect = on_disconnect_mock client.on_publish = on_publish_mock client.on_subscribe = on_subscribe_mock client.on_message = on_message_mock client.connect(MQTT_HOST, MQTT_PORT, 60) topic = f"device/{MQTT_DEVICE_CODE}/command" client.subscribe(topic, qos=1) client.loop_start() while True: try: print("publish start.") topic = f"device/{MQTT_DEVICE_CODE}/send" data = {"message": "test"} message = json.dumps(data).encode('utf-8') client.publish(topic, message, qos=1) time.sleep(30) except KeyboardInterrupt as e: time.sleep(30) break except Exception as e: print(f"error:{e}") time.sleep(30) print(f"end") client.loop_stop() def main(): print("Mqtt Mock Client Start.") test_mqtt_mock() if __name__ == "__main__": main()