import serial import serial.tools.list_ports import time import streamlit as st import threading import pickle from sklearn.tree import DecisionTreeClassifier from sklearn.preprocessing import LabelEncoder import pandas as pd # Global variables to store bus information and hardware statuses bus_info = { 'bus_name': 'N/A', 'seat_status': 'N/A', 'distance': 'N/A', 'led_status': 'OFF', 'buzzer_status': 'OFF', } # Function to collect available serial ports data def collect_serial_ports(): ports = serial.tools.list_ports.comports() port_data = [] for port in ports: data = { 'port': port.device, 'description': port.description, 'manufacturer': port.manufacturer if port.manufacturer else 'Unknown' } port_data.append(data) return port_data # Function to preprocess and label the data for model training def preprocess_data(port_data): df = pd.DataFrame(port_data) # Encode the 'description' and 'manufacturer' fields to numeric le = LabelEncoder() df['description'] = le.fit_transform(df['description']) df['manufacturer'] = le.fit_transform(df['manufacturer']) return df, le # Train a simple decision tree classifier (this is just an example) def train_model(): # Example: manually labeled data where '1' represents Arduino and '0' is other devices port_data = collect_serial_ports() df, le = preprocess_data(port_data) df['label'] = [1, 0] # 1 = Arduino, 0 = Other device (Replace with actual labeled data) # Train the classifier X = df.drop(columns=['port', 'label']) y = df['label'] clf = DecisionTreeClassifier() clf.fit(X, y) # Save the model and label encoder with open('serial_port_model.pkl', 'wb') as model_file: pickle.dump(clf, model_file) with open('label_encoder.pkl', 'wb') as le_file: pickle.dump(le, le_file) # Function to predict the serial port based on the AI model def predict_serial_port(port_data): # Load the trained model and label encoder with open('serial_port_model.pkl', 'rb') as model_file: clf = pickle.load(model_file) with open('label_encoder.pkl', 'rb') as le_file: le = pickle.load(le_file) # Preprocess the data df, _ = preprocess_data(port_data) # Predict the label for the serial ports (Arduino or not) prediction = clf.predict(df.drop(columns=['port', 'label'])) # Return the port if it's predicted to be an Arduino (label = 1) for i, pred in enumerate(prediction): if pred == 1: # If prediction is 1, it's the Arduino return port_data[i]['port'] return None # No Arduino found # Function to initialize the serial connection with the correct port def initialize_serial_connection(): port_data = collect_serial_ports() # Get available ports data com_port = predict_serial_port(port_data) # Get the correct port from the AI model if com_port: try: ser = serial.Serial(com_port, 9600) # Open serial port time.sleep(2) # Wait for the connection to initialize print(f"Connected to {com_port}") return ser except serial.SerialException as e: st.error(f"Error opening serial port: {e}") return None else: st.error("No Arduino device found.") return None # Initialize serial connection ser = initialize_serial_connection() # Function to update bus information based on received serial data def display_bus_info(): global bus_info try: while True: if ser.in_waiting > 0: # Read data from serial port data = ser.readline().decode('utf-8', errors='ignore').strip() # Debug print to see the raw serial data print(f"Raw Data: {data}") if "Bus Name:" in data: bus_info['bus_name'] = data.split("Bus Name:")[1].strip() elif "Seat Status:" in data: bus_info['seat_status'] = data.split("Seat Status:")[1].strip() elif "Distance:" in data: bus_info['distance'] = data.split("Distance:")[1].strip() elif "LED:" in data: bus_info['led_status'] = data.split("LED:")[1].strip() elif "Buzzer:" in data: bus_info['buzzer_status'] = data.split("Buzzer:")[1].strip() time.sleep(1) # Small delay to prevent flooding the terminal except KeyboardInterrupt: print("Exiting...") # Start the bus information display in a separate thread bus_info_thread = threading.Thread(target=display_bus_info) bus_info_thread.daemon = True bus_info_thread.start() # Streamlit Web Interface st.title('Bus System Status') # Display the bus information col1, col2 = st.columns(2) with col1: st.subheader('Bus Name') bus_name = st.empty() with col2: st.subheader('Seat Status') seat_status = st.empty() st.subheader('Distance to Bus') distance_display = st.empty() # Display LED and Buzzer status st.subheader('LED Status') led_status_display = st.empty() st.subheader('Buzzer Status') buzzer_status_display = st.empty() # Update the information on the web interface every second while True: bus_name.text(bus_info['bus_name']) seat_status.text(bus_info['seat_status']) distance_display.text(f"Distance: {bus_info['distance']} cm") led_status_display.text(f"LED: {bus_info['led_status']}") buzzer_status_display.text(f"Buzzer: {bus_info['buzzer_status']}") time.sleep(1)