Eksperimen dan Integrasi MQTT di Desktop
Pada pertemuan kali ini akan membahas integrasi MQTT (Message Queuing Telemetry Transport) untuk komunikasi data real-time antara ESP32 dan aplikasi desktop. Mahasiswa akan mempelajari setup broker MQTT, pembuatan MQTT client, pengiriman data sensor dari ESP32, serta visualisasi data real-time di dashboard tkinter.
Alat dan Bahan
Hardware:
- ESP32 Development Board dengan MicroPython
- Sensor (DHT11 atau sensor analog)
- LED dan Button (dari modul sebelumnya)
- Koneksi WiFi yang stabil
Software:
- Mosquitto MQTT Broker (atau Broker publik seperti mosquitto.org)
- Visual Studio Code dengan Pymakr
- Python 3.8+ di desktop
- Library:
paho-mqtt,tkinter(built-in),requests - MQTTX (MQTT X) - client untuk testing dan monitoring
Environment:
- Terminal/Command Prompt
- Koneksi ke jaringan yang sama antara ESP32 dan desktop
Bagian 1: Setup Environment dan Konfigurasi
1.1 Instalasi dan Konfigurasi Broker MQTT
Praktikum 1.1: Setup Mosquitto Broker Lokal
Langkah 1: Install Mosquitto
Untuk Windows:
Download dari: https://mosquitto.org/download/
Atau gunakan WSL: sudo apt-get install mosquitto mosquitto-clients
Untuk macOS:
Untuk Linux:
Langkah 2: Jalankan Broker
Langkah 3: Verifikasi Broker Berjalan
# Di terminal lain, subscribe ke test topic
mosquitto_sub -h localhost -t "test/#"
# Di terminal ketiga, publish message
mosquitto_pub -h localhost -t "test/message" -m "Hello MQTT"
1.2 Konfigurasi Project Desktop
Praktikum 1.2: Setup Project Structure dan Requirements
Langkah 1: Buat Struktur Folder
mqtt-iot-project/
├── mqtt/
│ ├── __init__.py
│ └── client.py
├── dashboard/
│ ├── __init__.py
│ └── ui.py
├── config.json
├── requirements.txt
├── main.py
└── README.md
Langkah 2: Buat File requirements.txt
Langkah 3: Install Dependencies
1.3 File Konfigurasi
Praktikum 1.3: Konfigurasi Broker dan Topik
Langkah 1: Buat config.json
{
"broker": {
"host": "localhost",
"port": 1883,
"username": null,
"password": null,
"keepalive": 60
},
"topics": {
"sensor_temp": "sensor/esp32/temperature",
"sensor_humidity": "sensor/esp32/humidity",
"sensor_pressure": "sensor/esp32/pressure",
"led_status": "device/esp32/led/status",
"led_command": "device/esp32/led/command",
"button_press": "device/esp32/button/press",
"system_status": "system/esp32/status"
},
"dashboard": {
"width": 1000,
"height": 700,
"title": "IoT Dashboard - Real-time Monitoring",
"refresh_rate": 500
}
}
Langkah 2: Verifikasi File Konfigurasi
Pastikan file config.json ditempatkan di root project dan valid JSON format.
Bagian 2: MQTT Client Implementation
2.1 Dasar-dasar MQTT
MQTT adalah protocol publish-subscribe yang ringan dan cocok untuk IoT. Konsep dasar:
- Broker: Server pusat yang mengelola pesan
- Publisher: Mengirim pesan ke topik tertentu
- Subscriber: Menerima pesan dari topik yang disubscribe
- Topic: Jalur pesan (contoh: sensor/esp32/temperature)
- Payload: Data yang dikirim
2.2 Membuat MQTT Client
Praktikum 2.1: Implementasi MqttClient
Langkah 1: Buat File mqtt/client.py
# mqtt/client.py - MQTT Client untuk komunikasi
import json
import paho.mqtt.client as mqtt
from queue import Queue
from threading import Thread
import time
class MqttClient:
"""MQTT Client untuk komunikasi dengan broker"""
def __init__(self, config_file='config.json'):
"""Inisialisasi MQTT Client"""
# Load konfigurasi
with open(config_file, 'r') as f:
config = json.load(f)
self.broker_config = config['broker']
self.topics = config['topics']
# Setup client
self.client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1)
self.client.on_connect = self.on_connect
self.client.on_message = self.on_message
self.client.on_disconnect = self.on_disconnect
# Message queue untuk handling di thread terpisah
self.message_queue = Queue()
# Status tracking
self.is_connected = False
self.subscribed_topics = []
print("[MQTT] Client initialized")
def on_connect(self, client, userdata, flags, rc):
"""Callback saat client terhubung ke broker"""
if rc == 0:
print("[MQTT] Connected to broker successfully")
self.is_connected = True
# Subscribe ke semua topik sensor
for topic_name, topic_path in self.topics.items():
if topic_name.startswith('sensor_') or topic_name.startswith('button_'):
self.client.subscribe(topic_path)
self.subscribed_topics.append(topic_path)
print(f"[MQTT] Subscribed to: {topic_path}")
else:
print(f"[MQTT] Connection failed with code {rc}")
self.is_connected = False
def on_message(self, client, userdata, msg):
"""Callback saat menerima message"""
topic = msg.topic
payload = msg.payload.decode()
try:
# Coba parse sebagai JSON
data = json.loads(payload)
message = {
'topic': topic,
'data': data,
'timestamp': time.time(),
'raw_payload': payload
}
except json.JSONDecodeError:
# Jika bukan JSON, simpan sebagai string
message = {
'topic': topic,
'data': payload,
'timestamp': time.time(),
'raw_payload': payload
}
self.message_queue.put(message)
print(f"[MQTT] Message received - Topic: {topic}, Payload: {payload}")
def on_disconnect(self, client, userdata, rc):
"""Callback saat client disconnect"""
if rc != 0:
print(f"[MQTT] Unexpected disconnection: {rc}")
else:
print("[MQTT] Disconnected from broker")
self.is_connected = False
def connect(self):
"""Hubungkan ke broker MQTT"""
try:
print(f"[MQTT] Connecting to {self.broker_config['host']}:{self.broker_config['port']}")
# Set username dan password jika ada
if self.broker_config['username']:
self.client.username_pw_set(
self.broker_config['username'],
self.broker_config['password']
)
# Connect ke broker
self.client.connect(
self.broker_config['host'],
self.broker_config['port'],
self.broker_config['keepalive']
)
# Start network loop di thread terpisah
self.client.loop_start()
# Tunggu sampai connected
timeout = 10
start_time = time.time()
while not self.is_connected:
if time.time() - start_time > timeout:
print("[MQTT] Connection timeout!")
return False
time.sleep(0.1)
return True
except Exception as e:
print(f"[MQTT] Connection error: {e}")
return False
def disconnect(self):
"""Disconnect dari broker"""
self.client.loop_stop()
self.client.disconnect()
print("[MQTT] Disconnected from broker")
def publish(self, topic_key, data):
"""Publish data ke topik tertentu"""
if not self.is_connected:
print("[MQTT] Not connected to broker")
return False
try:
# Get topic path dari konfigurasi
topic_path = self.topics.get(topic_key)
if not topic_path:
print(f"[MQTT] Topic key '{topic_key}' not found in config")
return False
# Convert data ke JSON jika dictionary
if isinstance(data, dict):
payload = json.dumps(data)
else:
payload = str(data)
# Publish message
result = self.client.publish(topic_path, payload, qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
print(f"[MQTT] Published to {topic_path}: {payload}")
return True
else:
print(f"[MQTT] Publish failed: {result.rc}")
return False
except Exception as e:
print(f"[MQTT] Publish error: {e}")
return False
def get_message(self, timeout=1.0):
"""Ambil message dari queue"""
try:
return self.message_queue.get(timeout=timeout)
except:
return None
def check_connection(self):
"""Cek status koneksi"""
return self.is_connected
def get_subscribed_topics(self):
"""Dapatkan list topik yang di-subscribe"""
return self.subscribed_topics
Langkah 2: Inisialisasi File mqtt/init.py
2.3 Testing MQTT Client
Praktikum 2.2: Test MQTT Connection
Langkah 1: Buat File test_mqtt.py
# test_mqtt.py - Test MQTT client connection
import json
from mqtt.client import MqttClient
import time
def test_mqtt_connection():
"""Test koneksi MQTT"""
print("=== MQTT Connection Test ===\n")
# Inisialisasi client
mqtt_client = MqttClient('config.json')
# Test 1: Connection
print("Test 1: Connecting to broker...")
if mqtt_client.connect():
print("✓ Connection successful\n")
else:
print("✗ Connection failed\n")
return
# Test 2: Check subscribed topics
print("Test 2: Subscribed topics")
topics = mqtt_client.get_subscribed_topics()
for topic in topics:
print(f" ✓ {topic}")
print()
# Test 3: Publish test message
print("Test 3: Publishing test message...")
test_data = {
'temperature': 25.5,
'humidity': 60,
'timestamp': int(time.time())
}
if mqtt_client.publish('sensor_temp', test_data):
print("✓ Message published\n")
else:
print("✗ Publish failed\n")
# Test 4: Receive messages
print("Test 4: Listening for messages (10 seconds)...")
start_time = time.time()
message_count = 0
while time.time() - start_time < 10:
msg = mqtt_client.get_message(timeout=1.0)
if msg:
message_count += 1
print(f" Message {message_count}: {msg['topic']} = {msg['data']}")
print(f"\n✓ Received {message_count} messages\n")
# Test 5: Connection status
print("Test 5: Connection status")
if mqtt_client.check_connection():
print("✓ Still connected\n")
else:
print("✗ Connection lost\n")
# Cleanup
mqtt_client.disconnect()
print("=== Test Complete ===")
if __name__ == "__main__":
test_mqtt_connection()
Langkah 2: Jalankan Test
Bagian 3: ESP32 Sensor Data Publishing
3.1 Firmware ESP32 untuk Publish Sensor Data
Praktikum 3.1: Code ESP32 Publish Sensor
Langkah 1: Buat File esp32_sensor.py untuk ESP32
# esp32_sensor.py - Publish sensor data via MQTT
import network
import utime
import ujson
from umqtt.simple import MQTTClient
import machine
import random
class ESP32MqttSensor:
"""ESP32 Sensor dengan MQTT publish"""
def __init__(self, broker_host, client_id, topics_config):
"""Inisialisasi sensor dan MQTT"""
self.broker_host = broker_host
self.client_id = client_id
self.topics = topics_config
# Setup WiFi
self.wifi = network.WLAN(network.STA_IF)
self.mqtt = None
# Setup LED untuk status
self.status_led = machine.Pin(2, machine.Pin.OUT)
self.status_led.off()
print(f"ESP32 MQTT Sensor initialized - Client ID: {client_id}")
def connect_wifi(self, ssid, password, timeout=10):
"""Hubungkan ke WiFi"""
print(f"Connecting to WiFi: {ssid}...")
self.wifi.active(True)
self.wifi.connect(ssid, password)
start_time = utime.time()
while not self.wifi.isconnected():
if utime.time() - start_time > timeout:
print("WiFi connection timeout!")
return False
self.status_led.on()
utime.sleep_ms(500)
self.status_led.off()
utime.sleep_ms(500)
print(".", end="")
self.status_led.on()
print(f"\nConnected to WiFi!")
print(f"IP: {self.wifi.ifconfig()[0]}")
return True
def connect_mqtt(self):
"""Hubungkan ke MQTT broker"""
try:
print(f"Connecting to MQTT broker: {self.broker_host}...")
self.mqtt = MQTTClient(self.client_id, self.broker_host)
self.mqtt.connect()
print("Connected to MQTT broker!")
return True
except Exception as e:
print(f"MQTT connection error: {e}")
return False
def read_sensor_data(self):
"""Baca data sensor (simulator)"""
# Ini adalah simulator - ganti dengan sensor real
data = {
'temperature': 20 + random.randint(0, 15) / 10,
'humidity': 40 + random.randint(0, 40),
'pressure': 1000 + random.randint(-50, 50) / 10,
'timestamp': int(utime.time())
}
return data
def publish_sensor_data(self):
"""Publish sensor data ke MQTT"""
try:
data = self.read_sensor_data()
payload = ujson.dumps(data)
# Publish temperature
self.mqtt.publish(
self.topics['sensor_temp'],
payload
)
print(f"Published: {payload}")
return True
except Exception as e:
print(f"Publish error: {e}")
return False
def run(self, publish_interval=5):
"""Main loop - publish data secara berkala"""
print("Starting sensor data publishing...")
while True:
try:
if not self.mqtt.sock:
print("MQTT connection lost, reconnecting...")
self.connect_mqtt()
self.publish_sensor_data()
# LED blink sebagai indicator
self.status_led.off()
utime.sleep(publish_interval - 1)
self.status_led.on()
utime.sleep(1)
except Exception as e:
print(f"Error in main loop: {e}")
utime.sleep(1)
# Setup untuk ESP32
def run_esp32_mqtt():
"""Jalankan MQTT sensor di ESP32"""
# Konfigurasi
BROKER = "192.168.1.100" # IP address PC/laptop Anda
CLIENT_ID = "esp32-sensor-01"
SSID = "YourSSID"
PASSWORD = "YourPassword"
TOPICS = {
'sensor_temp': 'sensor/esp32/temperature',
'sensor_humidity': 'sensor/esp32/humidity',
'sensor_pressure': 'sensor/esp32/pressure'
}
# Inisialisasi
sensor = ESP32MqttSensor(BROKER, CLIENT_ID, TOPICS)
# Connect ke WiFi
if not sensor.connect_wifi(SSID, PASSWORD):
print("Failed to connect WiFi!")
return
# Connect ke MQTT
if not sensor.connect_mqtt():
print("Failed to connect MQTT!")
return
# Run main loop
sensor.run(publish_interval=5)
# Upload dan jalankan di ESP32
if __name__ == "__main__":
run_esp32_mqtt()
Catatan Penting untuk ESP32:
- Ubah BROKER dengan IP address PC/laptop Anda
- Ubah SSID dan PASSWORD dengan WiFi Anda
- Module umqtt.simple harus sudah di install di ESP32
Bagian 4: Dashboard Desktop Real-time
4.1 Membuat UI Dashboard
Praktikum 4.1: Dashboard Tkinter
Langkah 1: Buat File dashboard/ui.py
# dashboard/ui.py - Dashboard UI dengan tkinter
import tkinter as tk
from tkinter import ttk, messagebox
import json
from datetime import datetime
import threading
from collections import deque
class DashboardUI:
"""Dashboard UI untuk monitoring data real-time"""
def __init__(self, mqtt_client, config_file='config.json'):
"""Inisialisasi dashboard"""
# Load konfigurasi
with open(config_file, 'r') as f:
config = json.load(f)
self.config = config
self.mqtt_client = mqtt_client
# Root window
self.root = tk.Tk()
self.root.title(config['dashboard']['title'])
self.root.geometry(f"{config['dashboard']['width']}x{config['dashboard']['height']}")
self.root.resizable(False, False)
# Data storage (untuk grafik)
self.data_history = {
'temperature': deque(maxlen=60),
'humidity': deque(maxlen=60),
'pressure': deque(maxlen=60)
}
# Current values
self.current_values = {
'temperature': 0.0,
'humidity': 0.0,
'pressure': 0.0,
'led_status': 'OFF',
'last_update': None
}
# Status tracking
self.is_running = True
self.connection_status = False
# Setup UI
self.setup_ui()
# Start message processor thread
self.start_message_processor()
print("[Dashboard] UI initialized")
def setup_ui(self):
"""Setup user interface"""
# Configure style
style = ttk.Style()
style.theme_use('clam')
# Header frame
header_frame = ttk.Frame(self.root, padding=10)
header_frame.pack(fill=tk.X, padx=10, pady=5)
# Title
title_label = ttk.Label(
header_frame,
text="IoT Real-time Dashboard",
font=("Arial", 18, "bold")
)
title_label.pack(side=tk.LEFT)
# Connection status
self.status_label = ttk.Label(
header_frame,
text="● Disconnected",
font=("Arial", 10),
foreground="red"
)
self.status_label.pack(side=tk.RIGHT)
# Main content frame
main_frame = ttk.Frame(self.root, padding=10)
main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
# === Sensor Data Section ===
sensor_frame = ttk.LabelFrame(main_frame, text="Sensor Data", padding=15)
sensor_frame.pack(fill=tk.X, pady=10)
# Temperature
temp_frame = ttk.Frame(sensor_frame)
temp_frame.pack(fill=tk.X, pady=10)
ttk.Label(temp_frame, text="Temperature:", font=("Arial", 11)).pack(side=tk.LEFT, padx=5)
self.temp_value_label = ttk.Label(
temp_frame,
text="--°C",
font=("Arial", 14, "bold"),
foreground="blue"
)
self.temp_value_label.pack(side=tk.LEFT, padx=20)
self.temp_progress = ttk.Progressbar(
temp_frame,
length=300,
maximum=50,
mode='determinate'
)
self.temp_progress.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True)
# Humidity
humidity_frame = ttk.Frame(sensor_frame)
humidity_frame.pack(fill=tk.X, pady=10)
ttk.Label(humidity_frame, text="Humidity:", font=("Arial", 11)).pack(side=tk.LEFT, padx=5)
self.humidity_value_label = ttk.Label(
humidity_frame,
text="--%",
font=("Arial", 14, "bold"),
foreground="green"
)
self.humidity_value_label.pack(side=tk.LEFT, padx=20)
self.humidity_progress = ttk.Progressbar(
humidity_frame,
length=300,
maximum=100,
mode='determinate'
)
self.humidity_progress.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True)
# Pressure
pressure_frame = ttk.Frame(sensor_frame)
pressure_frame.pack(fill=tk.X, pady=10)
ttk.Label(pressure_frame, text="Pressure:", font=("Arial", 11)).pack(side=tk.LEFT, padx=5)
self.pressure_value_label = ttk.Label(
pressure_frame,
text="-- hPa",
font=("Arial", 14, "bold"),
foreground="purple"
)
self.pressure_value_label.pack(side=tk.LEFT, padx=20)
self.pressure_progress = ttk.Progressbar(
pressure_frame,
length=300,
maximum=1050,
mode='determinate'
)
self.pressure_progress.pack(side=tk.LEFT, padx=10, fill=tk.X, expand=True)
# === Device Control Section ===
control_frame = ttk.LabelFrame(main_frame, text="Device Control", padding=15)
control_frame.pack(fill=tk.X, pady=10)
# LED Status
led_status_frame = ttk.Frame(control_frame)
led_status_frame.pack(fill=tk.X, pady=10)
ttk.Label(led_status_frame, text="LED Status:", font=("Arial", 11)).pack(side=tk.LEFT, padx=5)
self.led_status_label = ttk.Label(
led_status_frame,
text="OFF",
font=("Arial", 12, "bold"),
foreground="red"
)
self.led_status_label.pack(side=tk.LEFT, padx=20)
# LED Control buttons
button_frame = ttk.Frame(control_frame)
button_frame.pack(fill=tk.X, pady=10)
ttk.Button(
button_frame,
text="Turn ON",
command=self.send_led_command_on
).pack(side=tk.LEFT, padx=5)
ttk.Button(
button_frame,
text="Turn OFF",
command=self.send_led_command_off
).pack(side=tk.LEFT, padx=5)
# === Status Section ===
status_frame = ttk.LabelFrame(main_frame, text="System Status", padding=15)
status_frame.pack(fill=tk.X, pady=10)
self.last_update_label = ttk.Label(
status_frame,
text="Last update: --",
font=("Arial", 10)
)
self.last_update_label.pack(anchor=tk.W, padx=5)
self.message_count_label = ttk.Label(
status_frame,
text="Messages received: 0",
font=("Arial", 10)
)
self.message_count_label.pack(anchor=tk.W, padx=5)
self.broker_info_label = ttk.Label(
status_frame,
text=f"Broker: {self.config['broker']['host']}:{self.config['broker']['port']}",
font=("Arial", 10)
)
self.broker_info_label.pack(anchor=tk.W, padx=5)
def send_led_command_on(self):
"""Kirim command LED ON"""
self.mqtt_client.publish('led_command', {'action': 'on'})
messagebox.showinfo("Success", "LED ON command sent")
def send_led_command_off(self):
"""Kirim command LED OFF"""
self.mqtt_client.publish('led_command', {'action': 'off'})
messagebox.showinfo("Success", "LED OFF command sent")
def update_sensor_display(self, topic, data):
"""Update display sensor data"""
try:
if 'temperature' in data:
temp = data['temperature']
self.current_values['temperature'] = temp
self.data_history['temperature'].append(temp)
self.temp_value_label.config(text=f"{temp:.1f}°C")
self.temp_progress['value'] = temp
if 'humidity' in data:
humidity = data['humidity']
self.current_values['humidity'] = humidity
self.data_history['humidity'].append(humidity)
self.humidity_value_label.config(text=f"{humidity:.0f}%")
self.humidity_progress['value'] = humidity
if 'pressure' in data:
pressure = data['pressure']
self.current_values['pressure'] = pressure
self.data_history['pressure'].append(pressure)
self.pressure_value_label.config(text=f"{pressure:.1f} hPa")
self.pressure_progress['value'] = pressure
# Update last update time
now = datetime.now().strftime("%H:%M:%S")
self.current_values['last_update'] = now
self.last_update_label.config(text=f"Last update: {now}")
except Exception as e:
print(f"[Dashboard] Error updating display: {e}")
def process_messages(self):
"""Process incoming MQTT messages"""
message_count = 0
while self.is_running:
try:
# Check connection status
if self.mqtt_client.check_connection():
if not self.connection_status:
self.connection_status = True
self.status_label.config(text="● Connected", foreground="green")
print("[Dashboard] Connected to MQTT")
else:
if self.connection_status:
self.connection_status = False
self.status_label.config(text="● Disconnected", foreground="red")
print("[Dashboard] Disconnected from MQTT")
# Get message dari queue
msg = self.mqtt_client.get_message(timeout=0.5)
if msg:
message_count += 1
topic = msg['topic']
data = msg['data']
# Update display berdasarkan topic
if 'temperature' in topic:
self.update_sensor_display(topic, data)
elif 'humidity' in topic:
self.update_sensor_display(topic, data)
elif 'pressure' in topic:
self.update_sensor_display(topic, data)
# Update message count
self.message_count_label.config(
text=f"Messages received: {message_count}"
)
except Exception as e:
print(f"[Dashboard] Error processing messages: {e}")
def start_message_processor(self):
"""Mulai thread untuk process MQTT messages"""
thread = threading.Thread(target=self.process_messages, daemon=True)
thread.start()
def run(self):
"""Jalankan dashboard"""
try:
self.root.mainloop()
except Exception as e:
print(f"[Dashboard] Error: {e}")
finally:
self.is_running = False
self.mqtt_client.disconnect()
def get_data_history(self):
"""Dapatkan history data"""
return self.data_history
Langkah 2: Inisialisasi File dashboard/init.py
Bagian 5: Main Application
5.1 Integration Semua Komponen
Praktikum 5.1: Main Application File
Langkah 1: Buat File main.py
# main.py - Main application launcher
import sys
import time
from mqtt.client import MqttClient
from dashboard.ui import DashboardUI
def main():
"""Main application function"""
print("="*50)
print("IoT MQTT Dashboard - Startup")
print("="*50)
try:
# Step 1: Initialize MQTT Client
print("\n[STARTUP] Initializing MQTT Client...")
mqtt_client = MqttClient('config.json')
# Step 2: Connect to MQTT Broker
print("[STARTUP] Connecting to MQTT Broker...")
if not mqtt_client.connect():
print("[ERROR] Failed to connect to MQTT broker!")
print("Make sure mosquitto broker is running on the configured host")
print(f"Check config.json for broker settings")
return False
print("[SUCCESS] Connected to MQTT Broker")
time.sleep(1)
# Step 3: Initialize Dashboard
print("[STARTUP] Initializing Dashboard UI...")
dashboard = DashboardUI(mqtt_client, 'config.json')
print("[SUCCESS] Dashboard initialized")
print("\n" + "="*50)
print("Dashboard running - waiting for sensor data...")
print("="*50 + "\n")
# Step 4: Run Dashboard
dashboard.run()
except KeyboardInterrupt:
print("\n[SHUTDOWN] Application interrupted by user")
except Exception as e:
print(f"\n[ERROR] Application error: {e}")
import traceback
traceback.print_exc()
finally:
print("[SHUTDOWN] Cleaning up resources...")
try:
mqtt_client.disconnect()
except:
pass
print("[SHUTDOWN] Application stopped")
if __name__ == "__main__":
main()
Langkah 2: Jalankan Aplikasi
# Pastikan semua komponen sudah siap:
# 1. Mosquitto broker running
# 2. ESP32 terhubung dan publish data
# 3. config.json di tempat yang benar
python main.py
Bagian 6: Testing dan Monitoring
6.1 Testing dengan MQTTX
MQTTX adalah GUI client untuk MQTT yang sangat berguna untuk testing dan debugging.
Praktikum 6.1: Testing dengan MQTTX
Langkah 1: Install MQTTX
Download dari: https://mqttx.app/
Langkah 2: Connect ke Broker
- Buka MQTTX
- Create new connection
- Host: localhost (atau IP broker)
- Port: 1883
- Click Connect
Langkah 3: Subscribe ke Topics
Di MQTTX:
Subscribe ke:
- sensor/esp32/temperature
- sensor/esp32/humidity
- sensor/esp32/pressure
- device/esp32/led/status
Langkah 4: Publish Test Message
6.2 Testing Dashboard dengan Manual Data
Untuk testing tanpa ESP32 hardware, gunakan script sender test data.
Praktikum 6.2: Test Data Sender
Langkah 1: Buat File test_data_sender.py
# test_data_sender.py - Send test data untuk dashboard
import json
import random
import time
from mqtt.client import MqttClient
def send_test_data():
"""Send dummy sensor data untuk testing dashboard"""
print("=== Test Data Sender ===\n")
mqtt_client = MqttClient('config.json')
print("Connecting to broker...")
if not mqtt_client.connect():
print("Connection failed!")
return
print("Connected! Sending test data every 2 seconds...\n")
print("Press Ctrl+C to stop\n")
try:
counter = 0
while True:
counter += 1
# Generate random sensor data
temperature = 20 + random.uniform(0, 15)
humidity = 40 + random.randint(0, 40)
pressure = 1013 + random.uniform(-10, 10)
# Create payload
data = {
'temperature': round(temperature, 1),
'humidity': round(humidity, 1),
'pressure': round(pressure, 1),
'timestamp': int(time.time())
}
# Publish
mqtt_client.publish('sensor_temp', data)
print(f"[{counter}] Temp: {temperature:.1f}°C, "
f"Humidity: {humidity:.0f}%, "
f"Pressure: {pressure:.1f} hPa")
time.sleep(2)
except KeyboardInterrupt:
print("\n\nTest data sender stopped")
finally:
mqtt_client.disconnect()
if __name__ == "__main__":
send_test_data()
Langkah 2: Run Test Data Sender
Di terminal terpisah:
Sekarang dashboard akan menerima data dan menampilkan real-time updates.
Bagian 7: Advanced Features
7.1 Data Persistence dan Logging
Praktikum 7.1: Message Logger
Langkah 1: Buat File utils/logger.py
# utils/logger.py - Logging MQTT messages
import json
from datetime import datetime
import os
class MessageLogger:
"""Logger untuk MQTT messages"""
def __init__(self, log_dir='logs'):
"""Inisialisasi logger"""
self.log_dir = log_dir
# Create directory jika belum ada
if not os.path.exists(log_dir):
os.makedirs(log_dir)
# File untuk setiap topic
self.log_files = {}
print(f"[Logger] Initialized - log directory: {log_dir}")
def get_log_filename(self, topic):
"""Get filename untuk topic"""
# Sanitize topic name
safe_topic = topic.replace('/', '_')
date = datetime.now().strftime("%Y%m%d")
return os.path.join(self.log_dir, f"{safe_topic}_{date}.log")
def log_message(self, topic, data, timestamp=None):
"""Log message ke file"""
try:
if timestamp is None:
timestamp = datetime.now().isoformat()
# Create log entry
log_entry = {
'timestamp': timestamp,
'topic': topic,
'data': data
}
# Get or create file handle
filename = self.get_log_filename(topic)
# Append ke file
with open(filename, 'a') as f:
f.write(json.dumps(log_entry) + '\n')
except Exception as e:
print(f"[Logger] Error: {e}")
def read_logs(self, topic, date=None):
"""Read logs untuk topic tertentu"""
if date is None:
date = datetime.now().strftime("%Y%m%d")
safe_topic = topic.replace('/', '_')
filename = os.path.join(self.log_dir, f"{safe_topic}_{date}.log")
logs = []
try:
with open(filename, 'r') as f:
for line in f:
logs.append(json.loads(line))
except FileNotFoundError:
print(f"[Logger] Log file not found: {filename}")
return logs
7.2 Data Export
Praktikum 7.2: Export ke CSV
Langkah 1: Buat File utils/exporter.py
# utils/exporter.py - Export data ke CSV
import csv
from datetime import datetime
import json
class DataExporter:
"""Export MQTT data ke berbagai format"""
@staticmethod
def export_to_csv(logs, filename=None):
"""Export logs ke CSV file"""
if filename is None:
filename = f"export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
try:
with open(filename, 'w', newline='') as csvfile:
# Get headers dari first entry
if not logs:
print("No logs to export")
return False
first_entry = logs[0]
# Flatten data structure
fieldnames = ['timestamp', 'topic']
if isinstance(first_entry.get('data'), dict):
fieldnames.extend(first_entry['data'].keys())
else:
fieldnames.append('value')
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
# Write rows
for entry in logs:
row = {
'timestamp': entry['timestamp'],
'topic': entry['topic']
}
if isinstance(entry['data'], dict):
row.update(entry['data'])
else:
row['value'] = entry['data']
writer.writerow(row)
print(f"[Exporter] Exported {len(logs)} records to {filename}")
return True
except Exception as e:
print(f"[Exporter] Export failed: {e}")
return False