Commit 9fa86b3c authored by Sebastian Böhm's avatar Sebastian Böhm
Browse files

initial commit of edge-iot-simulator

parents
# Created by https://www.toptal.com/developers/gitignore/api/python
# Edit at https://www.toptal.com/developers/gitignore?templates=python
### Python ###
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# End of https://www.toptal.com/developers/gitignore/api/python
FROM python:3.8.12
WORKDIR /usr/src/app
COPY requirements.txt ./
RUN pip3 install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD [ "python", "./edge_iot_simulator/main.py" ]
\ No newline at end of file
# Edge-IoT Simulator
The following project contains a small edge-IoT simulator.
Components:
* `core`: IoT device simulators, e.g., `temperature` simulator
* `messaging`: MQTT messaging components to send data to MQTT broker
* `web`: A small flask application which provides access to selected data provided by `core` and `messaging`.
Architecture:
```bash
web <-> core <-> queue <-> mqtt_publisher
```
Details:
* `web`: The web application has a reference to the core services (e.g., temperature) and has currently two endpoints:
* `/`: Dashboard which shows the latest temperature value
* `/temperature`: Returns the latest temperature value as json
* `core, queue, mqtt_publisher`: The core services (e.g., `temperature`) are decoupled from the `mqtt_publisher` by a shared memory `threadsafe` queue. The `mqtt_publisher` forwards the message with a corresponding topic and payload.
For example:
```bash
temperature <-> TemperatureMeasurement (timestamp=XXX,value=XXX,unit=XXX)
|
v
MqttMessage(topic=sensors/id/data payload=TemperatureMeasurement)
|
v
queue
^
|
mqtt_publisher.client.publish(MqttMessage.topic, MqttMessage.payload)
```
All components can easily be configured with the following `.env` file with should be present in the root of this project.
```bash
# messaging
MQTT_SERVER_NAME=localhost
MQTT_PORT=1883
MQTT_TLS=false
MQTT_USERNAME=
MQTT_PASSWORD=
MQTT_MAX_CONNECT_RETRIES=5
MQTT_RECONNECT_TIMEOUT=30
MQTT_CLIENT_ID=ab444537-cf67-47aa-a5ef-3548292e225b
MQTT_PUBLISH_QOS=1
MQTT_TOPIC_PUBLISHER=publisher
MQTT_TOPIC_PUBLISHER_STATE=state
# core
MQTT_TOPIC_SENSORS=sensors
MQTT_TOPIC_SENSORS_DATA=data
# web
WEB_HOSTNAME=localhost
```
## Prerequisites
* Make sure that you installed `python3` on your system
* Make sure that you install `docker` on your system
## Run the project locally
* Clone/Pull this repository
* Go into the root directory
* Create the env file `.env` with the contents above and change the values according to your needs
* Create a virtual environment at first: `python3 -m venv venv`
* Change to the virtual environment: `source venv/bin/activate`
* Install the necessary dependencies: `pip3 install -r requirements.txt`
* Change the working directory: `cd edge_iot_simulator`
* Run:`python3 main.py`
You should see the following output:
```bash
11/11/2021 02:48:48 PM main Please any key to interrupt...
11/11/2021 02:48:48 PM mqtt_publisher Successfully started mqtt publisher...
11/11/2021 02:48:48 PM temperature_svc Successfully started temperature sensor...
11/11/2021 02:48:48 PM temperature_svc New value measured {"timestamp": 1636638528.2080338, "value": 26, "unit": "celsius"}
11/11/2021 02:48:48 PM app Successfully started WebbApp...
11/11/2021 02:48:48 PM mqtt_publisher Error establishing connection to localhost:1883[Errno 111] Connection refused
11/11/2021 02:48:48 PM mqtt_publisher mqtt publisher received shutdown signal
11/11/2021 02:48:51 PM temperature_svc New value measured {"timestamp": 1636638531.2109501, "value": 20, "unit": "celsius"}
11/11/2021 02:48:54 PM temperature_svc New value measured {"timestamp": 1636638534.2141488, "value": 7, "unit": "celsius"}
11/11/2021 02:48:57 PM temperature_svc New value measured {"timestamp": 1636638537.2166193, "value": 59, "unit": "celsius"}
11/11/2021 02:48:58 PM main Unknown error occurred: Could not establish the connection to the mqtt broker
11/11/2021 02:48:58 PM mqtt_publisher mqtt publisher received shutdown signal
11/11/2021 02:48:58 PM temperature_svc Temperature service received shutdown signal...
11/11/2021 02:48:58 PM app WebApp received shutdown signal....
11/11/2021 02:48:58 PM main Wait for graceful termination...
11/11/2021 02:49:00 PM main Successfully terminated edge-iot simulator
```
We can see the first measurements of the sensor. However, the edge-IoT simulator is terminating because no mqtt message broker is reachable.
For testing purposes, start a mqtt message broker via docker: `docker run -d -p 1883:1883 -p 9001:9001 eclipse-mosquitto`
Run the application again: `python3 main.py`
You should see the following output:
```bash
11/11/2021 02:55:39 PM main Please any key to interrupt...
11/11/2021 02:55:39 PM mqtt_publisher Successfully started mqtt publisher...
11/11/2021 02:55:39 PM temperature_svc Successfully started temperature sensor...
11/11/2021 02:55:39 PM temperature_svc New value measured {"timestamp": 1636638939.4815965, "value": 46, "unit": "celsius"}
11/11/2021 02:55:39 PM app Successfully started WebbApp...
11/11/2021 02:55:39 PM mqtt_publisher Publisher connected with result code 0
11/11/2021 02:55:39 PM mqtt_publisher Successfully published message {"timestamp": 1636638939.4815965, "value": 46, "unit": "celsius"}
11/11/2021 02:55:42 PM temperature_svc New value measured {"timestamp": 1636638942.4844544, "value": 14, "unit": "celsius"}
11/11/2021 02:55:42 PM mqtt_publisher Successfully published message {"timestamp": 1636638942.4844544, "value": 14, "unit": "celsius"}
11/11/2021 02:55:45 PM temperature_svc New value measured {"timestamp": 1636638945.4878645, "value": 36, "unit": "celsius"}
11/11/2021 02:55:45 PM mqtt_publisher Successfully published message {"timestamp": 1636638945.4878645, "value": 36, "unit": "celsius"}
^C11/11/2021 02:55:48 PM main Received user's shutdown signal...
11/11/2021 02:55:48 PM mqtt_publisher mqtt publisher received shutdown signal
11/11/2021 02:55:48 PM temperature_svc New value measured {"timestamp": 1636638948.4884603, "value": 1, "unit": "celsius"}
11/11/2021 02:55:48 PM mqtt_publisher Successfully published message {"timestamp": 1636638948.4884603, "value": 1, "unit": "celsius"}
11/11/2021 02:55:48 PM mqtt_publisher mqtt publisher received shutdown signal
11/11/2021 02:55:48 PM temperature_svc Temperature service received shutdown signal...
11/11/2021 02:55:48 PM app WebApp received shutdown signal....
11/11/2021 02:55:48 PM main Wait for graceful termination...
11/11/2021 02:55:51 PM main Successfully terminated edge-iot simulator
```
Now the edge-IoT simulator is connected to the local mqtt message broker.
```bash
-----------------------localhost----------------------
edge-IoT simulator <-> docker <-> mqtt message broker
------------------------------------------------------
```
## Run with docker
To run edge-IoT simulator with docker, do the follpwing:
* Change to the root directory of this repository
* Build the image: `docker build -t edge:1.0 .`
* Change the working directory to the directory: `cd edge_iot_simulator`
* Create and adjust the `.env` file! Make sure that you take the right IP address for the message broker. If you are running the mqtt broker on the same system (physical host), obtain the broker's ip address via `docker inspect` or take the ip address of the machine (`ip a`)
* Run the image: `docker run -p 5000:5000 --env-file=.env edge:1.0`, choose `docker run -d -p 5000:5000 --env-file=.env edge:1.0`
from enum import Enum
import threading
import time
import random
import json
import copy
import logging
import os
from messaging.mqtt_publisher import MqttMessage
logging.basicConfig(format='%(asctime)s %(module)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
class TemperatureService(threading.Thread):
def __init__(self, queue, interval, unit):
threading.Thread.__init__(self)
self.queue = queue
self.interval = interval
self.unit = unit
self.last_temperature_measurement = None
self.lock = threading.Lock()
self.exit_event = threading.Event()
self.logger = logging.getLogger(__name__)
def run(self):
self.logger.debug("Successfully started temperature sensor...")
while not self.exit_event.is_set():
# temperature will be generated in celsius by default
temperature_celsius = random.randrange(0,60)
actual_temperature_measurement = None
if (self.unit == TemperatureUnits.celsius.name):
# put in queue
actual_temperature_measurement = TemperatureMeasurement(time.time(),temperature_celsius,TemperatureUnits.celsius.name)
self.create_mqtt_message(actual_temperature_measurement)
else:
actual_temperature_measurement = TemperatureMeasurement(time.time(),temperature_celsius,TemperatureUnits.fahrenheit.name)
self.create_mqtt_message(actual_temperature_measurement.convert(TemperatureUnits.fahrenheit.name))
with self.lock:
self.last_temperature_measurement = actual_temperature_measurement
self.logger.info("New value measured " + self.last_temperature_measurement.to_string())
time.sleep(self.interval)
def stop(self):
self.exit_event.set()
self.logger.info('Temperature service received shutdown signal...')
def get_temperature(self, unit):
with self.lock:
temperature_measurement = copy.deepcopy(self.last_temperature_measurement)
if (temperature_measurement is None):
raise TemperatureServiceException("No valid measurement available!")
return temperature_measurement.convert(unit)
def create_mqtt_message(self, temperature_measurement):
self.queue.put(MqttMessage(os.getenv('MQTT_TOPIC_SENSORS')+'/'+os.getenv('MQTT_CLIENT_ID')+'/'+os.getenv('MQTT_TOPIC_SENSORS_DATA'), temperature_measurement))
class TemperatureMeasurement():
def __init__(self, timestamp, value, unit):
self.timestamp = timestamp
self.value = value
self.unit = unit
def convert(self, unit):
if(unit == self.unit):
return self
if (unit == TemperatureUnits.celsius.name):
return self.fahrenheit_to_celsius(self.value)
else:
return self.celsius_to_fahrenheit(self.value)
def celsius_to_fahrenheit(self, temperature):
self.value = (temperature * 1.8) + 32
self.unit = TemperatureUnits.fahrenheit.name
return self
def fahrenheit_to_celsius(self, temperature):
self.value = (temperature - 32) * (5 / 9)
self.unit = TemperatureUnits.celsius.name
return self
def to_json(self):
return json.dumps(self.__dict__)
def to_string(self):
return str(self.to_json())
class TemperatureUnits(Enum):
celsius = 1
fahrenheit = 2
class TemperatureServiceException(Exception):
pass
#!/usr/bin/env python3
from core.temperature_svc import TemperatureMeasurement, TemperatureService,TemperatureUnits
from messaging.mqtt_publisher import MqttException, MqttPublisher, MqttStatus
from web.app import WebApp
import logging
import os
import random
import queue
import time
from dotenv import load_dotenv
import signal
import time
import sys
logging.basicConfig(format='%(asctime)s %(module)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
load_dotenv()
if __name__=="__main__":
queue = queue.Queue()
publisher = MqttPublisher(queue)
temperature_svc = TemperatureService(queue, 3, TemperatureUnits.celsius.name)
web_app = WebApp(publisher, temperature_svc)
try:
logging.info('Please any key to interrupt...')
publisher.start()
temperature_svc.start()
web_app.start()
time.sleep(10) # wait for connection to mqtt broker
if(publisher.get_mqtt_statistics().status == MqttStatus.disconnected.name):
raise Exception("Could not establish the connection to the mqtt broker")
signal.pause()
except KeyboardInterrupt as e:
logging.info("Received user's shutdown signal...")
except Exception as e:
logging.error("Unknown error occurred: " + str(e))
finally:
publisher.stop()
temperature_svc.stop()
web_app.stop()
logging.info('Wait for graceful termination...')
publisher.join()
temperature_svc.join()
web_app.join()
logging.info('Successfully terminated edge-iot simulator')
from enum import Enum
import paho.mqtt.client as mqtt
import logging
import threading
import os
import json
import time
import copy
logging.basicConfig(format='%(asctime)s %(module)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
logger = logging.getLogger(__name__)
class MqttPublisher(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
self.logger = logging.getLogger(__name__)
self.lock = threading.Lock()
self.exit_event = threading.Event()
self.queue = queue
self.client = self.init_mqtt()
self.mqtt_message_counter = 0
def init_mqtt(self):
client = mqtt.Client(client_id=os.getenv('MQTT_CLIENT_ID'), clean_session=False)
if (os.getenv('MQTT_USERNAME') is not None and os.getenv('MQTT_PASSWORD') is not None):
client.username_pw_set(username=os.getenv('mosquitto_username'), password=os.getenv('mosquitto_password'))
if(os.getenv('MQTT_TLS') == 'True'):
client.tls_set()
client.on_connect = self.on_connect
client.on_disconnect = self.on_disconnect
client.reconnect_delay_set(min_delay=1, max_delay=3600)
return client
def on_connect(self, client, userdata, flags, rc):
self.logger.info("Publisher connected with result code " + str(rc))
def on_disconnect(self, client, userdata, rc):
if (rc != 0):
self.logger.error('Unexpected disconnect: ' + str(rc))
def run(self):
self.logger.debug("Successfully started mqtt publisher...")
try:
self.client.connect(os.getenv('MQTT_SERVER_NAME'), int(os.getenv('MQTT_PORT')))
self.client.loop_start()
while not self.exit_event.is_set():
mqtt_message = self.queue.get()
if (os.getenv('MQTT_TOPIC_PUBLISHER')+'/'+os.getenv('MQTT_CLIENT_ID')+'/'+os.getenv('MQTT_TOPIC_PUBLISHER_STATE')):
if (mqtt_message.payload == "stopped"):
retries = 5
break
else:
topic = mqtt_message.topic
payload = mqtt_message.payload
mqtt_message_info = self.client.publish(topic, json.dumps(payload.to_json()), int(os.getenv('MQTT_PUBLISH_QOS')))
mqtt_message_info.wait_for_publish()
if (mqtt_message_info.is_published()):
with self.lock:
self.mqtt_message_counter += 1
logger.info("Successfully published message {}".format(payload.to_string()))
except Exception as e:
self.logger.error('Error establishing connection to {}:{}'.format(os.getenv('MQTT_SERVER_NAME'),os.getenv('MQTT_PORT')) + str(e))
finally:
self.stop()
def get_status(self):
if (self.client.is_connected()):
return MqttStatus.connected.name
else:
return MqttStatus.disconnected.name
def get_mqtt_message_counter(self):
with self.lock:
return self.mqtt_message_counter
def get_mqtt_statistics(self):
return copy.deepcopy(MqttStatistics(self.get_status(), self.get_mqtt_message_counter()))
def stop(self):
self.logger.info("mqtt publisher received shutdown signal")
self.exit_event.set()
self.client.loop_stop()
self.client.disconnect()
self.queue.put(MqttMessage(os.getenv("MQTT_TOPIC_PUBLISHER")+'/'+os.getenv('MQTT_CLIENT_ID') + '/' +os.getenv('MQTT_TOPIC_PUBLISHER_STATE'), "stopped"))
if (self.lock.locked()):
self.lock.release()
class MqttMessage():
def __init__(self, topic, payload):
self.topic = topic
self.payload = payload
def to_json(self):
return json.dumps(self.__dict__)
def to_string(self):
return str(self.to_json())
class MqttStatus(Enum):
connected = 1
disconnected = 2
class MqttStatistics():
def __init__(self, status, message_counter):
self.host = os.getenv('MQTT_SERVER_NAME')
self.port = os.getenv('MQTT_PORT')
self.tls = os.getenv('MQTT_TLS')
self.status = status
self.message_counter = message_counter
def to_json(self):
return json.dumps(self.__dict__)
def to_string(self):
return str(self.to_json())
class MqttException(Exception):
pass
import threading
import logging
from flask import Flask, render_template
from werkzeug.serving import make_server
from core.temperature_svc import TemperatureUnits
logging.basicConfig(format='%(asctime)s %(module)s %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
class WebApp(threading.Thread):
def __init__(self, publisher, temperature_svc):
threading.Thread.__init__(self)
app = self.create_app(publisher, temperature_svc)
self.srv = make_server('0.0.0.0', 5000, app)
self.ctx = app.app_context()
self.ctx.push()
self.logger = logging.getLogger(__name__)
def create_app(self, publisher, temperature_svc):
app = Flask(__name__)
@app.route("/")
def index():
return render_template('dashboard.html', TemperatureMeasurement=temperature_svc.get_temperature(TemperatureUnits.celsius.name), MqttStatistics=publisher.get_mqtt_statistics())
@app.route("/temperature")
def get_temperature():
return temperature_svc.get_temperature(TemperatureUnits.celsius.name).to_string()
return app