mosquitto-mock-server/client/mqtt_client_aws.py
2024-10-21 04:32:59 +09:00

96 lines
2.7 KiB
Python

import paho.mqtt.client as mqtt
import os
from dotenv import load_dotenv
import time
import json
load_dotenv(dotenv_path=".env")
AWS_MQTT_HOST = os.getenv("AWS_MQTT_HOST")
AWS_MQTT_PORT = int(os.getenv("AWS_MQTT_PORT"))
AWS_MQTT_DEVICE = os.getenv("AWS_MQTT_DEVICE")
AWS_CA_PATH = os.getenv("AWS_CA_PATH")
AWS_CERT_PATH = os.getenv("AWS_CERT_PATH")
AWS_KEY_PATH = os.getenv("AWS_KEY_PATH")
def on_connect_mock(client, userdata, flags, rc, properties=None):
print(f"on_connect: rc={rc}")
def on_disconnect_mock(client, userdata, rc):
print(f"on_disconnect: rc={rc}")
if rc != 0:
print("Unexpected disconnection.")
def on_publish_mock(client, userdata, mid):
print("on_publish: {0}".format(mid))
def on_subscribe_mock(mqttc, obj, mid, granted_qos):
print("on_subscribe: "+str(mid)+" "+str(granted_qos))
def on_message_mock(client, userdata, msg):
print(f"on_message")
print("Received message '" + str(msg.payload) +
"' on topic '" + msg.topic + "' with QoS " + str(msg.qos))
payload_str = msg.payload.decode('utf-8')
print(f"{payload_str}")
def test_mqtt_mock():
# connect処理
print("AWS_MQTT_HOST:", AWS_MQTT_HOST)
print("AWS_MQTT_DEVICE:", AWS_MQTT_DEVICE)
print("AWS_MQTT_PORT:", AWS_MQTT_PORT)
print("AWS_CA_PATH:", AWS_CA_PATH)
print("AWS_CERT_PATH:", AWS_CERT_PATH)
print("AWS_KEY_PATH:", AWS_KEY_PATH)
client = mqtt.Client(client_id=AWS_MQTT_DEVICE, clean_session=False)
client.tls_set(AWS_CA_PATH, AWS_CERT_PATH, AWS_KEY_PATH)
client.on_connect = on_connect_mock
client.on_disconnect = on_disconnect_mock
client.on_publish = on_publish_mock
client.on_subscribe = on_subscribe_mock
client.on_message = on_message_mock
client.connect(AWS_MQTT_HOST, AWS_MQTT_PORT, 60)
time.sleep(30)
topic = f"device/{AWS_MQTT_DEVICE}/command"
client.subscribe(topic, qos=1)
print(f"client.subscribe:{topic}")
client.loop_start()
while True:
try:
client.loop()
print("publish start.")
topic = f"device/{AWS_MQTT_DEVICE}/send"
data = {"message": "test"}
message = json.dumps(data).encode('utf-8')
print(f"topic:{topic}")
client.publish(topic, message, qos=1)
time.sleep(30)
except KeyboardInterrupt as e:
time.sleep(30)
break
except Exception as e:
print(f"error:{e}")
time.sleep(30)
print(f"end")
client.loop_stop()
client.disconnect()
def main():
print("Mqtt Mock Client Start.")
test_mqtt_mock()
if __name__ == "__main__":
main()