96 lines
2.7 KiB
Python
96 lines
2.7 KiB
Python
import paho.mqtt.client as mqtt
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import time
|
|
import json
|
|
|
|
|
|
load_dotenv(dotenv_path=".env")
|
|
|
|
AWS_MQTT_HOST = os.getenv("AWS_MQTT_HOST")
|
|
AWS_MQTT_PORT = int(os.getenv("AWS_MQTT_PORT"))
|
|
AWS_MQTT_DEVICE = os.getenv("AWS_MQTT_DEVICE")
|
|
AWS_CA_PATH = os.getenv("AWS_CA_PATH")
|
|
AWS_CERT_PATH = os.getenv("AWS_CERT_PATH")
|
|
AWS_KEY_PATH = os.getenv("AWS_KEY_PATH")
|
|
|
|
|
|
def on_connect_mock(client, userdata, flags, rc, properties=None):
|
|
print(f"on_connect: rc={rc}")
|
|
|
|
|
|
def on_disconnect_mock(client, userdata, rc):
|
|
print(f"on_disconnect: rc={rc}")
|
|
if rc != 0:
|
|
print("Unexpected disconnection.")
|
|
|
|
|
|
def on_publish_mock(client, userdata, mid):
|
|
print("on_publish: {0}".format(mid))
|
|
|
|
|
|
def on_subscribe_mock(mqttc, obj, mid, granted_qos):
|
|
print("on_subscribe: "+str(mid)+" "+str(granted_qos))
|
|
|
|
|
|
def on_message_mock(client, userdata, msg):
|
|
print(f"on_message")
|
|
print("Received message '" + str(msg.payload) +
|
|
"' on topic '" + msg.topic + "' with QoS " + str(msg.qos))
|
|
payload_str = msg.payload.decode('utf-8')
|
|
print(f"{payload_str}")
|
|
|
|
|
|
def test_mqtt_mock():
|
|
# connect処理
|
|
print("AWS_MQTT_HOST:", AWS_MQTT_HOST)
|
|
print("AWS_MQTT_DEVICE:", AWS_MQTT_DEVICE)
|
|
print("AWS_MQTT_PORT:", AWS_MQTT_PORT)
|
|
print("AWS_CA_PATH:", AWS_CA_PATH)
|
|
print("AWS_CERT_PATH:", AWS_CERT_PATH)
|
|
print("AWS_KEY_PATH:", AWS_KEY_PATH)
|
|
client = mqtt.Client(client_id=AWS_MQTT_DEVICE, clean_session=False)
|
|
client.tls_set(AWS_CA_PATH, AWS_CERT_PATH, AWS_KEY_PATH)
|
|
|
|
client.on_connect = on_connect_mock
|
|
client.on_disconnect = on_disconnect_mock
|
|
client.on_publish = on_publish_mock
|
|
client.on_subscribe = on_subscribe_mock
|
|
client.on_message = on_message_mock
|
|
client.connect(AWS_MQTT_HOST, AWS_MQTT_PORT, 60)
|
|
time.sleep(30)
|
|
|
|
topic = f"device/{AWS_MQTT_DEVICE}/command"
|
|
client.subscribe(topic, qos=1)
|
|
print(f"client.subscribe:{topic}")
|
|
client.loop_start()
|
|
while True:
|
|
try:
|
|
client.loop()
|
|
print("publish start.")
|
|
topic = f"device/{AWS_MQTT_DEVICE}/send"
|
|
data = {"message": "test"}
|
|
message = json.dumps(data).encode('utf-8')
|
|
print(f"topic:{topic}")
|
|
client.publish(topic, message, qos=1)
|
|
time.sleep(30)
|
|
except KeyboardInterrupt as e:
|
|
time.sleep(30)
|
|
break
|
|
except Exception as e:
|
|
print(f"error:{e}")
|
|
time.sleep(30)
|
|
|
|
print(f"end")
|
|
client.loop_stop()
|
|
client.disconnect()
|
|
|
|
|
|
def main():
|
|
print("Mqtt Mock Client Start.")
|
|
test_mqtt_mock()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|