Files
CARIA-AUTOMOTIVE/WebControl/apps/itineraire_suivre_emergency.py
ccunatbrule 2ddf2360e6 CARIA.2.2
Update for the final presentation
huge change with previous version
2024-09-03 12:17:44 +02:00

170 lines
6.4 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import RPi.GPIO as GPIO
import json, time, sys, threading
from threading import Thread
sys.path.append('/home/christian/WebControl/modules/')
from AlphaBot import AlphaBot
Ab = AlphaBot()
base_speed = 20 # Vitesse fixe de 20
current_step_index = 0
emergency_stop = False
remaining_duration = 0
stop_event_obstacle = threading.Event()
# Fonction pour calculer la durée en fonction de la distance
def calculate_duration(distance_value):
speed_factor = 0.001 # Exemple 0.01/100m:s | 0.001/1km/s
return distance_value * speed_factor
# Fonction pour surveiller les obstacles en arrière-plan
def monitor_obstacles():
global emergency_stop, remaining_duration
while not stop_event_obstacle.is_set():
DR_status = GPIO.input(Ab.OBSTACLE_PIN)
if DR_status == 0: # Obstacle détecté
Ab.emergencystop()
emergency_stop = True
while GPIO.input(Ab.OBSTACLE_PIN) == 0: # Attendre que l'obstacle soit dégagé
time.sleep(2)
print("Obstacle dégagé. Attente de 2 secondes avant de reprendre.")
emergency_stop = False
# Fonction pour exécuter une manoeuvre avec arrêt d'urgence possible
def execute_maneuver(maneuver_function, duration, speed=None):
global remaining_duration
start_time = time.time()
while not emergency_stop and (time.time() - start_time) < duration:
time_slice = min(remaining_duration, duration - (time.time() - start_time))
# Appel de la fonction manoeuvre avec ou sans vitesse selon le besoin
if speed is not None:
maneuver_function(time_slice, speed)
else:
maneuver_function(time_slice)
remaining_duration = duration - (time.time() - start_time)
# Fonction pour traiter les étapes sélectionnées
def process_selected_steps(filename):
global current_step_index, remaining_duration
with open(filename, 'r', encoding='utf-8') as f:
selected_steps = json.load(f)
while current_step_index < len(selected_steps):
if emergency_stop: # Si arrêt d'urgence, attendre que l'obstacle soit dégagé
time.sleep(0.1)
continue
step = selected_steps[current_step_index]
maneuver = step['maneuver']
distance_value = step['distance_value']
duration = calculate_duration(distance_value)
remaining_duration = duration # Initialiser la durée restante
# Appel de la fonction de manoeuvre avec une vitesse fixe de 20
if maneuver == "maneuver-unspecified":
execute_maneuver(Ab.maneuver_unspecified, duration)
elif maneuver == "turn-slight-left":
execute_maneuver(Ab.turn_slight_left, duration, base_speed)
elif maneuver == "turn-sharp-left":
execute_maneuver(Ab.turn_sharp_left, duration, base_speed)
elif maneuver == "u-turn-left":
execute_maneuver(Ab.u_turn_left, duration, base_speed)
elif maneuver == "turn-left":
execute_maneuver(Ab.left, duration, base_speed)
elif maneuver == "turn-slight-right":
execute_maneuver(Ab.turn_slight_right, duration, base_speed)
elif maneuver == "turn-sharp-right":
execute_maneuver(Ab.turn_sharp_right, duration, base_speed)
elif maneuver == "u-turn-right":
execute_maneuver(Ab.u_turn_right, duration, base_speed)
elif maneuver == "turn-right":
execute_maneuver(Ab.right, duration, base_speed)
elif maneuver == "straight":
execute_maneuver(Ab.forward, duration, base_speed)
elif maneuver == "ramp-left":
execute_maneuver(Ab.ramp_left, duration, base_speed)
elif maneuver == "ramp-right":
execute_maneuver(Ab.ramp_right, duration, base_speed)
elif maneuver == "merge":
execute_maneuver(Ab.merge, duration, base_speed)
elif maneuver == "fork-left":
execute_maneuver(Ab.fork_left, duration, base_speed)
elif maneuver == "fork-right":
execute_maneuver(Ab.fork_right, duration, base_speed)
elif maneuver == "ferry":
execute_maneuver(Ab.stop, duration)
elif maneuver == "ferry-train":
execute_maneuver(Ab.stop, duration)
elif maneuver == "roundabout-left":
execute_maneuver(Ab.roundabout_left, duration, base_speed)
elif maneuver == "roundabout-right":
execute_maneuver(Ab.roundabout_right, duration, base_speed)
else:
print(f"Manoeuvre inconnue : {maneuver}")
# Incrémenter l'index de l'étape seulement si l'arrêt d'urgence n'a pas été déclenché
if not emergency_stop:
current_step_index += 1
# Nom du fichier contenant les étapes sélectionnées
selected_steps_filename = '/home/christian/WebControl/logs/selected_steps_short.json'
# Démarrer la surveillance des obstacles dans un thread séparé
obstacle_thread = Thread(target=monitor_obstacles)
obstacle_thread.daemon = True
obstacle_thread.start()
# Attendre avant de démarrer
print("Le départ est prévu dans 10 secondes...")
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.HIGH)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.LOW)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.HIGH)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.LOW)
print("5 secondes...")
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.HIGH)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.LOW)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.HIGH)
time.sleep(1)
GPIO.output(Ab.RED_LIGHT, GPIO.LOW)
time.sleep(1)
print("Départ imminent !")
try:
# Appel de la fonction pour traiter les étapes sélectionnées
process_selected_steps(selected_steps_filename)
status = "scenario successful"
except KeyboardInterrupt:
print("Interruption par l'utilisateur.")
status = "interrupted"
except Exception as e:
print(f"Erreur lors de l'exécution: {e}")
status = "error"
finally:
# Arrêter les threads
Ab.stop_event_obstacle.set()
# Attendre que les threads se terminent
obstacle_thread.join()
print("Fin de la surveillance d'obstacle")
Ab.cleanup()
# Vérification finale et affichage du statut
if status in ["scenario successful"]:
print("Le scenario fonctionne correctement.")
fonctionnement_ok = True
else:
print(f"Le scenario a rencontré un problème : {status}.")
fonctionnement_ok = False
Ab.enregistrer_resultats(sys.argv[0], fonctionnement_ok, status)