In this seventh part of our IoT project series, we explore precision weight measurement with ESP32-based load cell integration. This project demonstrates high-resolution analog-to-digital conversion, signal amplification, calibration procedures, and advanced data visualization with statistical analysis and export capabilities.
Project OverviewThis ESP32 project implements a comprehensive weight measurement system with MQTT communication and intelligent data processing. It features an HX711 ADC module for precise load cell signal amplification, configurable calibration factors, and real-time monitoring with statistical analysis. The Wokwi simulation shows a professional weighing system setup:
- HX711 Load Cell Amplifier: 24-bit ADC with programmable gain amplification
- 50kg Load Cell: Strain gauge-based weight sensor with Wheatstone bridge configuration
- GPIO Connections: SCK (GPIO 5) for serial clock, DT (GPIO 4) for data transmission
- Power Distribution: 5V supply for HX711 module, stable ground reference
- Signal Processing: 100 ADC updates per measurement for noise reduction and accuracy
The firmware uses a precision-focused approach with four core components. Connection Management handles WiFi with retry logic, MQTT with authentication, and automatic reconnection. Load Cell Processing provides HX711 initialization with stabilization time, calibration factor configuration (420.00), and multi-sample averaging for accuracy. Signal Conditioning implements tare functionality during startup, timeout detection for wiring issues, and 100-update sampling before data transmission. System Activation features MQTT-based enable/disable, status reporting, and 500ms update intervals.
MQTT Communication Protocol
The system uses structured MQTT topics for precision measurement:
arduino/LoadCell
- Weight data publishing with unitsmqtt/request
- System activation and status requestsmqtt/response
- Device status and health reports
Message Formats:
Weight Data: "Load: 12.34 kg"
Status Response: "Board : ESP32 Status : Connected"
Status Request: "status_request"
System Control: "TurnOFF"
Key Firmware Features
HX711 Load Cell Initialization:
void setup() {
LoadCell.begin();
unsigned long stabilizingtime = 2000;
boolean _tare = true;
LoadCell.start(stabilizingtime, _tare);
if (LoadCell.getTareTimeoutFlag() || LoadCell.getSignalTimeoutFlag()) {
Serial.println("Timeout, check MCU > HX711 wiring and pin designations");
while (1);
} else {
LoadCell.setCalFactor(420.00);
Serial.println("Load Cell initialized");
}
}
Multi-Sample Weight Measurement:
void loop() {
if (systemActive && (now - lastMsg > 500)) {
// Call LoadCell.update() 100 times for accuracy
for (int i = 0; i < numUpdates; i++) {
LoadCell.update();
delay(1);
}
// Get smoothed value after multiple updates
loadCellValue = LoadCell.getData();
String payload = "Load: " + String(loadCellValue, 2) + " kg";
client.publish("arduino/LoadCell", payload.c_str());
}
}
System State Management:
void callback(char* topic, byte* payload, unsigned int length) {
if (topicStr == mqttTopicRequest && message == "status_request") {
systemActive = true;
client.publish(mqttTopicResponse, "Board : ESP32 Status : Connected");
Serial.println("System activated by status request");
}
if (String(topic) == mqttTopicRequest && message == "TurnOFF") {
systemActive = false;
Serial.println("System Deactivated");
}
}
PyQt5 Dashboard IntegrationThe dashboard interface provides comprehensive weight monitoring through six main sections:
- MQTT Status Panel - Connection monitoring with LED indicators
- Weight Display - Real-time measurement with precision formatting
- History Plot - PyQtGraph visualization with auto-scaling and legend
- Statistics Display - Min/Max/Average tracking with session statistics
- Control Buttons - Status refresh, history clear, and data export
- Data Export - CSV export with timestamp and elapsed time columns
Signal-Based Architecture
Signal-slot mechanism for thread-safe updates:
class LOADCELL(QObject):
# Signal definitions for thread-safe UI updates
weight_changed = pyqtSignal(float, str)
status_update = pyqtSignal(str, str)
update_plot_signal = pyqtSignal()
board_connected_signal = pyqtSignal(bool)
clear_history_signal = pyqtSignal()
Thread-Safe Weight Updates:
@pyqtSlot(float, str)
def update_weight_ui(self, weight_value, display_text):
if hasattr(self.ui, 'loadCell_val_label'):
self.ui.loadCell_val_label.setText(display_text)
# Update session statistics
self.update_session_statistics(weight_value)
def update_session_statistics(self, weight_value):
if self.max_weight_seen is None or weight_value > self.max_weight_seen:
self.max_weight_seen = weight_value
if self.min_weight_seen is None or weight_value < self.min_weight_seen:
self.min_weight_seen = weight_value
self.total_weight_sum += weight_value
self.total_weight_count += 1
avg_weight = self.total_weight_sum / self.total_weight_count
self.update_statistics_ui(avg_weight)
MQTT Message Handling
Weight Data Processing with Regex:
def handle_loadcell_message(self, topic, payload):
try:
if ": " in payload_str:
_, value_with_unit = payload_str.split(": ", 1)
weight_value = self.extract_weight_value(value_with_unit)
if weight_value is not None:
self._current_weight = weight_value
self.add_to_history(weight_value)
self.weight_changed.emit(weight_value, value_with_unit)
except Exception as e:
print(f"Error processing load cell message: {e}")
def extract_weight_value(self, payload_text):
weight_pattern = r'(\d+\.?\d*)\s*(?:kg|g|lbs?)?'
matches = re.findall(weight_pattern, payload_text.lower())
if matches:
return float(matches[0])
return None
Status Monitoring with Board Name Extraction:
def handle_status_message4(self, topic, payload):
try:
if "Board :" in payload and "Status :" in payload:
board_part, status_part = payload.split("Status :")
board_name = board_part.replace("Board :", "").strip()
status = status_part.strip()
if hasattr(self.ui, 'lab_board_LC'):
self.ui.lab_board_LC.setText(board_name)
connected = status.lower() == "connected"
self.board_connected_signal.emit(connected)
except Exception as e:
print(f"Error processing status message: {e}")
Control Flow Analysis
Weight Measurement Flow:
- Sensor Reading: HX711 performs 100 consecutive ADC conversions for noise reduction
- Signal Processing: Load cell data converted using calibration factor (420.00)
- Data Publishing: Formatted weight message sent to
arduino/LoadCell
topic - Dashboard Processing: Regex extraction of numeric weight value from payload
- UI Update: Real-time display update with statistical analysis and history plotting
Status Monitoring Flow:
- Status Request: Dashboard sends "status_request" to activate measurement system
- System Activation: ESP32 enables weight measurement loop and confirms connection
- Connection Verification: Board name extracted and connection status updated
- Periodic Monitoring: Optional timer-based status checks for connection reliability
- Timeout Handling: Automatic disconnection detection after response timeout
Advanced Features
History Plotting with PyQtGraph:
def setup_history_plot(self):
plot_widget = self.ui.Weight_History_Plot
plot_widget.setBackground('transparent')
plot_widget.setLabel('left', 'Weight', units='kg')
plot_widget.setLabel('bottom', 'Time', units='s')
plot_widget.showGrid(x=True, y=True, alpha=0.3)
pen = pg.mkPen(color='#2196F3', width=2)
self.weight_curve = plot_widget.plot(pen=pen, name='Weight')
plot_widget.enableAutoRange('xy', True)
plot_widget.addLegend()
CSV Data Export with Timestamps:
def export_weight_data(self, filename=None):
if not filename:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"weight_history_{timestamp}.csv"
with open(filename, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
writer.writerow(['Timestamp', 'Elapsed Time (s)', 'Weight (kg)'])
start_datetime = datetime.now() - timedelta(
seconds=max(self.history_timestamps) if self.history_timestamps else 0
)
for elapsed_time, weight_val in zip(self.history_timestamps, self.history_weights):
actual_time = start_datetime + timedelta(seconds=elapsed_time)
writer.writerow([
actual_time.strftime('%Y-%m-%d %H:%M:%S'),
f"{elapsed_time:.1f}",
f"{weight_val:.3f}"
])
Comprehensive Statistics Tracking:
def get_weight_statistics(self):
if len(self.history_weights) == 0:
return None
weights = list(self.history_weights)
return {
'current': self._current_weight,
'min': min(weights),
'max': max(weights),
'avg': sum(weights) / len(weights),
'count': len(weights)
}
System Deactivation with Resource Cleanup:
def deactivate(self):
# Send shutdown commands
self.mqtt_client.publish(MQTT_TOPIC_MQTT_Rq, "TurnOFF")
# Stop all timers
if hasattr(self, 'plot_timer') and self.plot_timer.isActive():
self.plot_timer.stop()
# Unsubscribe from MQTT topics
self.mqtt_client.unsubscribe_from_topic(MQTT_TOPIC_LOADCELL)
self.mqtt_client.unsubscribe_from_topic(MQTT_TOPIC_MQTT_Rs)
# Clear history and reset UI
self.clear_history_data()
ConclusionThis ESP32 load cell weight measurement system demonstrates precision analog sensing with professional-grade signal processing, statistical analysis, and data export capabilities. The system provides accurate weight measurement with noise reduction, real-time visualization, and comprehensive data logging suitable for industrial and laboratory applications.
In the next part of this series, IoT Projects Part 8: Motion Tracking with MPU6050, we'll explore 6-axis motion sensing with accelerometer and gyroscope integration, orientation calculation, and advanced motion analysis algorithms.
That's all!If you have any questions or suggestions, don’t hesitate to leave a comment below.
Comments