import paho.mqtt.client as mqtt import os from dotenv import load_dotenv import time import json load_dotenv(dotenv_path=".env") AWS_MQTT_HOST = os.getenv("AWS_MQTT_HOST") AWS_MQTT_PORT = int(os.getenv("AWS_MQTT_PORT")) AWS_MQTT_DEVICE = os.getenv("AWS_MQTT_DEVICE") AWS_CA_PATH = os.getenv("AWS_CA_PATH") AWS_CERT_PATH = os.getenv("AWS_CERT_PATH") AWS_KEY_PATH = os.getenv("AWS_KEY_PATH") def on_connect_mock(client, userdata, flags, rc, properties=None): print(f"on_connect: rc={rc}") def on_disconnect_mock(client, userdata, rc): print(f"on_disconnect: rc={rc}") if rc != 0: print("Unexpected disconnection.") def on_publish_mock(client, userdata, mid): print("on_publish: {0}".format(mid)) def on_subscribe_mock(mqttc, obj, mid, granted_qos): print("on_subscribe: "+str(mid)+" "+str(granted_qos)) def on_message_mock(client, userdata, msg): print(f"on_message") print("Received message '" + str(msg.payload) + "' on topic '" + msg.topic + "' with QoS " + str(msg.qos)) payload_str = msg.payload.decode('utf-8') print(f"{payload_str}") def test_mqtt_mock(): # connect処理 print("AWS_MQTT_HOST:", AWS_MQTT_HOST) print("AWS_MQTT_DEVICE:", AWS_MQTT_DEVICE) print("AWS_MQTT_PORT:", AWS_MQTT_PORT) print("AWS_CA_PATH:", AWS_CA_PATH) print("AWS_CERT_PATH:", AWS_CERT_PATH) print("AWS_KEY_PATH:", AWS_KEY_PATH) client = mqtt.Client(client_id=AWS_MQTT_DEVICE, clean_session=False) client.tls_set(AWS_CA_PATH, AWS_CERT_PATH, AWS_KEY_PATH) client.on_connect = on_connect_mock client.on_disconnect = on_disconnect_mock client.on_publish = on_publish_mock client.on_subscribe = on_subscribe_mock client.on_message = on_message_mock client.connect(AWS_MQTT_HOST, AWS_MQTT_PORT, 60) time.sleep(30) topic = f"device/{AWS_MQTT_DEVICE}/command" client.subscribe(topic, qos=1) print(f"client.subscribe:{topic}") client.loop_start() while True: try: client.loop() print("publish start.") topic = f"device/{AWS_MQTT_DEVICE}/send" data = {"message": "test"} message = json.dumps(data).encode('utf-8') print(f"topic:{topic}") client.publish(topic, message, qos=1) time.sleep(30) except KeyboardInterrupt as e: time.sleep(30) break except Exception as e: print(f"error:{e}") time.sleep(30) print(f"end") client.loop_stop() client.disconnect() def main(): print("Mqtt Mock Client Start.") test_mqtt_mock() if __name__ == "__main__": main()