Commit df579b91 authored by João Santos's avatar João Santos

add mqtt and influxdb

parent e3329aee
import socket from paho.mqtt import client as mqtt_client
import json
import logging
import random
import time import time
from Control_Interface.controlInterface import controlInterface from Control_Interface.controlInterface import controlInterface
import datetime
from influxdb import InfluxDBClient
import signal
import sys
host = 'localhost'
port = 8086
dbname = 'telegraf'
def main():
interface = controlInterface()
interface.__init__() BROKER = 'broker.hivemq.com'
print(interface.getListIp()) PORT = 1883
interface.newNhr("9410") TOPIC_SUB = "labrei-nhr-sub/#"
interface.newNhr("9430") TOPIC_PUB = "labrei-nhr-pub"
#interface.newNhr("9430") CLIENT_ID = f'python-mqtt-{random.randint(0, 1000)}'
nhr10 = []
nhr30 = []
nhr10 = interface.getNhr9410()
#nhr30 = interface.getNhr9430()
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
for elem in nhr10: FLAG_EXIT = False
elem.setVoltage(110)
elem.start()
print("nhr10 ip: ",elem.getIp())
print("nhr 10 array: ", elem.getVoltageArray())
interface = controlInterface()
time.sleep(3) print(interface.getListIp())
interface.newNhr("9410")
interface.newNhr("9430")
NHRs = {
"9410" : interface.getNhr9410()[0],
# "9430" : interface.getNhr9430()[0],
}
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
signal.pause()
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
client.subscribe(TOPIC_SUB)
else:
print(f'Failed to connect, return code {rc}')
def write_influxdb(method_name, args, result):
client = InfluxDBClient(host, port, database=dbname)
data = [
{
"measurement": "method_call",
"tags": {
"method": method_name
},
"time": datetime.datetime.utcnow().isoformat(),
"fields": {
"arguments": str(args),
"result": str(result)
}
}
]
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
def on_message(client, userdata, message):
print(f'Received `{message.payload.decode()}` from `{message.topic}` topic')
prefix, selector, fn = message.topic.split('/')
# message = json.loads(message.payload.decode())
# selector = message['selector']
# fn = message['fn']
# args = message.get('args', [])
args = [message.payload.decode()]
if args[0] == '.':
del args[0]
else:
args[0] = float(args[0])
# print(message['selector'])
# nhr = NHRs[selector]
nhr = NHRs.get(selector, False)
if not nhr:
return client.publish(TOPIC_PUB, json.dumps("NHR not found"))
if not hasattr(nhr, fn):
return client.publish(TOPIC_PUB, json.dumps("Method not found"))
result = getattr(nhr, fn)(*args)
write_influxdb(fn, args, result)
client.publish(TOPIC_PUB, json.dumps(result))
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=3)
client.on_disconnect = on_disconnect
return client
def publish(client):
msg_count = 0
while not FLAG_EXIT:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
if not client.is_connected():
logging.error("publish: MQTT client is not connected!")
time.sleep(1)
continue
result = client.publish(TOPIC_PUB, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f'Send `{msg}` to topic `{TOPIC_SUB}`')
else:
print(f'Failed to send message to topic {TOPIC_SUB}')
msg_count += 1
time.sleep(1)
def run():
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_start()
time.sleep(1000)
if client.is_connected():
publish(client)
else:
client.loop_stop()
for elem in nhr10:
elem.close()
for elem in nhr30:
elem.close()
if __name__ == '__main__': if __name__ == '__main__':
main() run()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment