Sagar Bhojane
Published © GPL3+

Industrial Gas Leakage Monitor

This is a system that will alert in case of gas leakage in a manufacturing plant.

IntermediateFull instructions provided711
Industrial Gas Leakage Monitor

Things used in this project

Hardware components

Bolt WiFi Module
Bolt IoT Bolt WiFi Module
×1
MQ-135 Gas Sensor
×1
Jumper wires (generic)
Jumper wires (generic)
×1

Software apps and online services

Bolt Cloud
Bolt IoT Bolt Cloud

Story

Read more

Code

Python code for Gas Sensor

Python
import requests,json,math,statistics
from boltiot import Bolt
import conf

mybolt = Bolt(conf.bolt_api_key, conf.device_id)
history_data=[]


def compute_bounds(history_data,frame_size,factor):
  if len(history_data)<frame_size :
    return None
  if len(history_data)>frame_size :
    del history_data[0:len(history_data)-frame_size]
    Mn=statistics.mean(history_data)
    Variance=0
    for data in history_data :
    Variance += math.pow((data-Mn),2)
    Zn = factor * math.sqrt(Variance / frame_size)
    High_bound = history_data[frame_size-1]+Zn
    Low_bound = history_data[frame_size-1]-Zn
    return [High_bound,Low_bound]


def get_sensor_value_from_pin(pin):
  """Returns the sensor value. Returns -999 if request fails"""
  try:
    response = mybolt.analogRead(pin)
    data = json.loads(response)
    if data["success"] != 1:
      print("Request not successfull")
      print("This is the response->", data)
      return -999
      sensor_value = int(data["value"])
      return sensor_value
  except Exception as e:
    print("Something went wrong when returning the sensor value")
    print(e)
    return -999


def send_telegram_message(message):
  """Sends message via Telegram"""
  url = "https://api.telegram.org/" + conf.telegram_bot_id + "/sendMessage"
  data = {
  "chat_id": conf.telegram_chat_id,
  "text": message
  }
  try:
    response = requests.request("POST",url,params=data)
    print("This is the Telegram URL")
    print(url)
    print("This is the Telegram response")
    print(response.text)
    telegram_data = json.loads(response.text)
    return telegram_data["ok"]
  except Exception as e:
    print("An error occurred in sending the alert message via Telegram")
    print(e)
    return False


while True:
  sensor_value = get_sensor_value_from_pin("A0")
  print("The current sensor value is:", sensor_value)
  if sensor_value == -999:
    print("Request was unsuccessfull. Skipping.")
    time.sleep(10)
    continue
    
  bound = compute_bounds(history_data,conf.FRAME_SIZE,conf.MUL_FACTOR)
  if not bound:
    required_data_count=conf.FRAME_SIZE-len(history_data)
    print("Not enough data to compute Z-score. Need",required_data_count," \more data points")
    history_data.append(sensor_value)
    time.sleep(10)
    continue
  
  try:
    if sensor_value > bound[0] :
      print ("There’s sudden rise in temp")
      telegram_status = send_telegram_message("Alert!! There’s sudden rise in\ the sensor reading, check for gas leak. Current sensor reading is:"  + str\(sensor_value))
      print("This is the response ",response)
      history_data.append(sensor_value);
  
  
  except Exception as e:
    print ("Error",e)


time.sleep(10)

Credits

Sagar Bhojane
1 project • 0 followers

Comments