#!/usr/bin/env python3 """ PC Animatronic Playback System Synchronizes servo motors, LEDs, and video/audio for animatronic performances. This script controls: - 7 servo motors via Pololu Mini Maestro controller - LED strips via ESP32-S3 - Video playback via VLC - Audio playback via pygame Performance Flow: Main Performance (4.5 min) -> Active Idle (15 min) -> repeat """ import json # For reading JSON configuration and animation files import time # For timing control and delays import threading # For running multiple tasks simultaneously import logging # For creating log files and error tracking import argparse # For command line argument parsing import sys # For system operations and exit functions import os # For file path operations import random # For random animation selection in active idle mode import serial # For serial communication with hardware import vlc # For video playback control import pygame # For audio playback # ============================================================================= # CONFIGURATION VARIABLES - Edit these to customize the system # ============================================================================= # Hardware Configuration (Edit these COM ports for your system) ESP32_COM_PORT = "COM5" # USB port for ESP32-S3 (LED controller) MAESTRO_COM_PORT = "COM4" # USB port for Pololu Mini Maestro (servo controller) ESP32_BAUD_RATE = 115200 # Communication speed for ESP32 MAESTRO_BAUD_RATE = 9600 # Communication speed for Mini Maestro # Timing Configuration (Edit these for performance tuning) TIMING_PRECISION = 0.01 # How often to check timing (seconds) - smaller = more precise SERVO_COMMAND_DELAY = 0.001 # Delay between servo commands (seconds) SERIAL_TIMEOUT = 1.0 # How long to wait for serial responses (seconds) # Performance Configuration DEFAULT_CONFIG_FILE = "animatronic_config.json" # Default configuration file name MAIN_PERFORMANCE_DURATION = 270 # Main performance length (seconds) - 4.5 minutes ACTIVE_IDLE_DURATION = 900 # Active idle length (seconds) - 15 minutes # LED Commands (Edit these if your ESP32 uses different commands) LED_START_COMMAND = "p" # Command to start main LED performance LED_RESET_COMMAND = "r" # Command to reset/stop LEDs LED_DIALOGUE_PREFIX = "s" # Prefix for dialogue LED commands (s1, s2, etc.) # Logging Configuration LOG_LEVEL = logging.INFO # Set to logging.DEBUG for more detailed logs LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' # Log message format # ============================================================================= # GLOBAL VARIABLES - These track system state # ============================================================================= # Hardware connection objects esp32_serial = None # Serial connection to ESP32 maestro_serial = None # Serial connection to Mini Maestro vlc_instance = None # VLC media player instance vlc_player = None # VLC player object # Performance state tracking current_performance_type = "" # Track if we're in "main" or "active_idle" mode performance_start_time = 0 # When current performance started stop_requested = False # Flag to stop the entire system performance_active = False # Flag indicating if performance is running current_animation_thread = None # ADD THIS LINE - Track current animation thread # ============================================================================= # LOGGING SETUP # ============================================================================= def setup_logging(): """ Configure logging to write to both console and file. This helps track what the system is doing and debug problems. """ # Create a logger object logger = logging.getLogger() logger.setLevel(LOG_LEVEL) # Create formatter for log messages (timestamp, level, message) formatter = logging.Formatter(LOG_FORMAT) # Set up console logging (prints to screen) console_handler = logging.StreamHandler(sys.stdout) console_handler.setLevel(LOG_LEVEL) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Set up file logging (saves to file) file_handler = logging.FileHandler('animatronic_system.log') file_handler.setLevel(LOG_LEVEL) file_handler.setFormatter(formatter) logger.addHandler(file_handler) logging.info("Logging system initialized") # ============================================================================= # HARDWARE COMMUNICATION FUNCTIONS # ============================================================================= def initialize_serial_connections(): """ Establish serial connections to ESP32 and Mini Maestro. Returns True if successful, False if there's a problem. """ global esp32_serial, maestro_serial try: # # Connect to ESP32 for LED control # logging.info("PC Animatronic Playback System starting...") # logging.info(f"Using configuration file: {args.config}") # # Load and validate configuration # config = load_configuration(args.config) # if not config: # logging.error("Failed to load configuration - exiting") # sys.exit(1) # if not validate_configuration(config): # logging.error("Configuration validation failed - exiting") # sys.exit(1) logging.info(f"Connecting to ESP32 on {ESP32_COM_PORT} at {ESP32_BAUD_RATE} baud") esp32_serial = serial.Serial( port=ESP32_COM_PORT, baudrate=ESP32_BAUD_RATE, timeout=SERIAL_TIMEOUT ) # Wait a moment for connection to stabilize time.sleep(2) logging.info("ESP32 connection established") # Connect to Mini Maestro for servo control logging.info(f"Connecting to Mini Maestro on {MAESTRO_COM_PORT} at {MAESTRO_BAUD_RATE} baud") maestro_serial = serial.Serial( port=MAESTRO_COM_PORT, baudrate=MAESTRO_BAUD_RATE, timeout=SERIAL_TIMEOUT ) # Wait a moment for connection to stabilize time.sleep(2) logging.info("Mini Maestro connection established") return True except serial.SerialException as e: # If we can't connect to hardware, log the error logging.error(f"Failed to initialize serial connections: {e}") return False except Exception as e: # Catch any other unexpected errors logging.error(f"Unexpected error during serial initialization: {e}") return False # # Initialize hardware connections # logging.info("Initializing hardware connections...") # if not initialize_serial_connections(): # logging.error("Failed to initialize serial connections - exiting") # sys.exit(1) # # Initialize media systems # logging.info("Initializing media systems...") # if not initialize_media_systems(): # logging.error("Failed to initialize media systems - exiting") # close_serial_connections() # sys.exit(1) # # Everything is set up - start the performance loop # logging.info("System initialization complete") # logging.info("Press Ctrl+C to stop the system") # try: # main_performance_loop(config) # except KeyboardInterrupt: # logging.info("Stop signal received (Ctrl+C)") # except Exception as e: # logging.error(f"Critical error in main program: {e}") # finally: # # Always clean up, no matter how we exit # logging.info("Cleaning up and shutting down...") # shutdown_system() # logging.info("Program terminated") # ============================================================================= # PROGRAM ENTRY POINT # ============================================================================= def send_led_command(command): """ Send a command to the ESP32 to control LEDs. Args: command (str): The command to send (e.g., "p", "r", "s1") """ global esp32_serial try: # Check if ESP32 is connected if esp32_serial is None or not esp32_serial.is_open: logging.warning("ESP32 not connected - skipping LED command") return False # Send the command as bytes (serial communication uses bytes, not strings) esp32_serial.write(command.encode('utf-8')) # Flush ensures the command is sent immediately esp32_serial.flush() logging.info(f"Sent LED command: {command}") return True except serial.SerialException as e: # If ESP32 disconnects during performance, continue without LEDs logging.warning(f"ESP32 communication error: {e} - continuing without LEDs") return False except Exception as e: # Log any other unexpected errors logging.error(f"Unexpected error sending LED command: {e}") return False def send_servo_position(channel, position): """ Send a position command to a specific servo motor. Uses Pololu Mini Maestro compact protocol. Args: channel (int): Servo channel number (0-11) position (int): Target position (typically 608-2496 for MG90S servos) """ global maestro_serial try: # Check if Mini Maestro is connected if maestro_serial is None or not maestro_serial.is_open: logging.error("Mini Maestro not connected - cannot send servo command") return False # Convert position to the format Mini Maestro expects # Mini Maestro uses quarter-microsecond units, so multiply by 4 target = position * 4 # Create command using Pololu compact protocol # Command format: 0x84, channel, target_low_byte, target_high_byte command = bytearray([ 0x84, # Set Target command channel, # Channel number target & 0x7F, # Target low 7 bits (target >> 7) & 0x7F # Target high 7 bits ]) # Send command to Mini Maestro maestro_serial.write(command) maestro_serial.flush() logging.debug(f"Servo {channel} -> position {position}") # Small delay to prevent overwhelming the controller time.sleep(SERVO_COMMAND_DELAY) return True except serial.SerialException as e: # If Mini Maestro disconnects, this is critical - stop performance logging.error(f"Mini Maestro communication error: {e}") return False except Exception as e: # Log any other unexpected errors logging.error(f"Unexpected error sending servo command: {e}") return False def close_serial_connections(): """ Safely close all serial connections when shutting down. """ global esp32_serial, maestro_serial # Close ESP32 connection if esp32_serial and esp32_serial.is_open: try: esp32_serial.close() logging.info("ESP32 connection closed") except Exception as e: logging.error(f"Error closing ESP32 connection: {e}") # Close Mini Maestro connection if maestro_serial and maestro_serial.is_open: try: maestro_serial.close() logging.info("Mini Maestro connection closed") except Exception as e: logging.error(f"Error closing Mini Maestro connection: {e}") # ============================================================================= # MEDIA PLAYBACK FUNCTIONS # ============================================================================= def initialize_media_systems(): """ Set up VLC and pygame for media playback. Returns True if successful, False otherwise. """ global vlc_instance try: # Initialize VLC for video playback vlc_instance = vlc.Instance() logging.info("VLC initialized for video playback") # Initialize pygame for audio playback pygame.mixer.init() logging.info("Pygame initialized for audio playback") return True except Exception as e: logging.error(f"Failed to initialize media systems: {e}") return False def play_video(video_file_path): """ Start playing a video file using VLC. Args: video_file_path (str): Path to the video file Returns: vlc.MediaPlayer: VLC player object, or None if failed """ global vlc_instance try: # Check if video file exists if not os.path.exists(video_file_path): logging.error(f"Video file not found: {video_file_path}") return None # Create media object from file media = vlc_instance.media_new(video_file_path) # Create player object player = vlc_instance.media_player_new() # Set the media to play player.set_media(media) # Start playback player.play() logging.info(f"Started video playback: {video_file_path}") # Wait a moment for playback to actually start time.sleep(0.5) return player except Exception as e: logging.error(f"Error playing video {video_file_path}: {e}") return None def play_audio(audio_file_path): """ Play an MP3 audio file using pygame. Args: audio_file_path (str): Path to the MP3 file Returns: bool: True if playback started successfully, False otherwise """ try: # Check if audio file exists if not os.path.exists(audio_file_path): logging.error(f"Audio file not found: {audio_file_path}") return False # Load and play the audio file pygame.mixer.music.load(audio_file_path) pygame.mixer.music.play() logging.info(f"Started audio playback: {audio_file_path}") return True except Exception as e: logging.error(f"Error playing audio {audio_file_path}: {e}") return False def stop_all_media(): """ Stop all video and audio playback. """ global vlc_player try: # Stop VLC video playback if vlc_player: vlc_player.stop() logging.info("Video playback stopped") # Stop pygame audio playback pygame.mixer.music.stop() logging.info("Audio playback stopped") except Exception as e: logging.error(f"Error stopping media playback: {e}") # ============================================================================= # ANIMATION FILE PROCESSING # ============================================================================= def load_servo_animation(json_file_path): """ Load a servo animation from a JSON file. Args: json_file_path (str): Path to the JSON animation file Returns: dict: Animation data, or None if failed to load """ try: # Check if file exists if not os.path.exists(json_file_path): logging.error(f"Animation file not found: {json_file_path}") return None # Read and parse JSON file with open(json_file_path, 'r') as file: animation_data = json.load(file) logging.info(f"Loaded animation: {animation_data.get('name', 'Unknown')} from {json_file_path}") return animation_data except json.JSONDecodeError as e: # Handle corrupted JSON files logging.error(f"Invalid JSON in animation file {json_file_path}: {e}") return None except Exception as e: # Handle other file errors logging.error(f"Error loading animation file {json_file_path}: {e}") return None def execute_servo_animation(animation_data): """ Execute a servo animation by sending position commands at the correct times. This function runs in its own thread to avoid blocking other operations. Args: animation_data (dict): Animation data loaded from JSON file """ global stop_requested try: # Get animation start time for timing calculations start_time = time.time() animation_name = animation_data.get('name', 'Unknown Animation') logging.info(f"Starting servo animation: {animation_name}") # Create a list of all position commands with timestamps all_commands = [] # Process each servo track for track_key, track_data in animation_data.get('tracks', {}).items(): channel = track_data.get('servo_channel') positions = track_data.get('positions', []) # Add each position command to our list for position_data in positions: timestamp = position_data.get('timestamp') position = position_data.get('position') all_commands.append({ 'timestamp': timestamp, 'channel': channel, 'position': position }) # Sort commands by timestamp so they execute in the right order all_commands.sort(key=lambda x: x['timestamp']) # Execute commands at correct times for command in all_commands: # Check if we should stop (either global stop or thread-specific stop) AI EDIT Stop old animation if new one started if stop_requested or (threading.current_thread() != current_animation_thread): logging.info(f"Animation '{animation_name}' stopped (new animation started or stop requested)") break # Calculate how long to wait before executing this command target_time = start_time + command['timestamp'] current_time = time.time() wait_time = target_time - current_time # If we need to wait, sleep until it's time if wait_time > 0: time.sleep(wait_time) # Send the servo command success = send_servo_position(command['channel'], command['position']) if not success: # If servo communication fails, this is critical logging.error("Servo communication failed - stopping animation") break logging.info(f"Completed servo animation: {animation_name}") except Exception as e: logging.error(f"Error executing servo animation: {e}") # ============================================================================= # CONFIGURATION LOADING # ============================================================================= def load_configuration(config_file_path): """ Load the system configuration from a JSON file. Args: config_file_path (str): Path to the configuration file Returns: dict: Configuration data, or None if failed to load """ try: # Check if configuration file exists if not os.path.exists(config_file_path): logging.error(f"Configuration file not found: {config_file_path}") return None # Read and parse configuration file with open(config_file_path, 'r') as file: config = json.load(file) logging.info(f"Loaded configuration from: {config_file_path}") return config except json.JSONDecodeError as e: # Handle corrupted JSON configuration logging.error(f"Invalid JSON in configuration file: {e}") return None except Exception as e: # Handle other file errors logging.error(f"Error loading configuration file: {e}") return None def validate_configuration(config): """ Check that the configuration file contains all required sections. Args: config (dict): Configuration data Returns: bool: True if configuration is valid, False otherwise """ try: # Check for main performance section if 'main_performance' not in config: logging.error("Configuration missing 'main_performance' section") return False main_perf = config['main_performance'] # Check for required main performance fields required_main_fields = ['video_file', 'led_start_command', 'servo_sequences'] for field in required_main_fields: if field not in main_perf: logging.error(f"Main performance configuration missing '{field}'") return False # Check for active idle section if 'active_idle' not in config: logging.error("Configuration missing 'active_idle' section") return False active_idle = config['active_idle'] # Check for required active idle fields required_idle_fields = ['countdown_video', 'selection_method', 'animations'] for field in required_idle_fields: if field not in active_idle: logging.error(f"Active idle configuration missing '{field}'") return False # Validate that referenced files exist video_files = [main_perf['video_file'], active_idle['countdown_video']] for video_file in video_files: if not os.path.exists(video_file): logging.error(f"Video file not found: {video_file}") return False logging.info("Configuration validation passed") return True except Exception as e: logging.error(f"Error validating configuration: {e}") return False # ============================================================================= # MAIN PERFORMANCE FUNCTIONS # ============================================================================= def run_main_performance(config): """ Execute the main 4.5-minute performance with video and timed servo sequences. Args: config (dict): System configuration Returns: bool: True if performance completed successfully, False if stopped early """ global current_performance_type, performance_start_time, vlc_player, stop_requested try: # Set performance tracking variables current_performance_type = "main" performance_start_time = time.time() logging.info("Starting main performance") # Get main performance configuration main_config = config['main_performance'] video_file = main_config['video_file'] led_command = main_config['led_start_command'] servo_sequences = main_config['servo_sequences'] # Start the main video vlc_player = play_video(video_file) if not vlc_player: logging.error("Failed to start main performance video") return False # Start the LED performance send_led_command(led_command) # Convert servo sequence timestamps to float and sort them timed_sequences = [] for timestamp_str, json_file in servo_sequences.items(): timestamp = float(timestamp_str) timed_sequences.append((timestamp, json_file)) # Sort by timestamp timed_sequences.sort(key=lambda x: x[0]) # Process each servo sequence at its scheduled time sequence_index = 0 while sequence_index < len(timed_sequences): # Check if we should stop if stop_requested: logging.info("Main performance stopped by user request") return False # Check if VLC is still playing if vlc_player.get_state() == vlc.State.Ended or vlc_player.get_state() == vlc.State.Error: logging.info("Main performance video ended") break # Get current playback time current_time = time.time() - performance_start_time # Check if it's time to start the next servo sequence if sequence_index < len(timed_sequences): timestamp, json_file = timed_sequences[sequence_index] if current_time >= timestamp: # Time to start this servo sequence logging.info(f"Triggering servo sequence at {timestamp}s: {json_file}") # # Load and start the animation in a separate thread # animation_data = load_servo_animation(json_file) # if animation_data: # animation_thread = threading.Thread( # target=execute_servo_animation, # args=(animation_data,) # ) # animation_thread.daemon = True # Dies when main program exits # animation_thread.start() # Stop any currently running animation AI EDIT Stop old animation to start new one global current_animation_thread if current_animation_thread and current_animation_thread.is_alive(): logging.info("Stopping previous animation to start new one") # Load and start the animation in a separate thread animation_data = load_servo_animation(json_file) if animation_data: animation_thread = threading.Thread( target=execute_servo_animation, args=(animation_data,) ) animation_thread.daemon = True # Dies when main program exits current_animation_thread = animation_thread # Track this thread animation_thread.start() else: logging.warning(f"Skipping missing servo sequence: {json_file}") sequence_index += 1 # Small delay to prevent excessive CPU usage time.sleep(TIMING_PRECISION) # Wait for main performance to complete (4.5 minutes total) while time.time() - performance_start_time < MAIN_PERFORMANCE_DURATION: if stop_requested: logging.info("Main performance stopped by user request") return False # Check if video stopped unexpectedly if vlc_player.get_state() == vlc.State.Error: logging.error("Main performance video error") return False time.sleep(TIMING_PRECISION) logging.info("Main performance completed successfully") return True except Exception as e: logging.error(f"Error during main performance: {e}") return False # ============================================================================= # ACTIVE IDLE PERFORMANCE FUNCTIONS # ============================================================================= def run_active_idle_performance(config): """ Execute the 15-minute active idle performance with countdown video and animations. Args: config (dict): System configuration Returns: bool: True if performance completed successfully, False if stopped early """ global current_performance_type, performance_start_time, vlc_player, stop_requested try: # Set performance tracking variables current_performance_type = "active_idle" performance_start_time = time.time() logging.info("Starting active idle performance") # Get active idle configuration idle_config = config['active_idle'] countdown_video = idle_config['countdown_video'] selection_method = idle_config['selection_method'] animations = idle_config['animations'] sequence_delay = idle_config.get('sequence_delay', 0) # Start the countdown video vlc_player = play_video(countdown_video) if not vlc_player: logging.error("Failed to start countdown video") return False # Prepare animation sequence based on selection method if selection_method == "random": # Create a shuffled copy of animations list animation_sequence = animations.copy() random.shuffle(animation_sequence) else: # sequential animation_sequence = animations.copy() # Track which animation we're currently on animation_index = 0 last_animation_end_time = time.time() # Run animations for the full 15-minute duration while time.time() - performance_start_time < ACTIVE_IDLE_DURATION: # Check if we should stop if stop_requested: logging.info("Active idle performance stopped by user request") return False # Check if countdown video is still playing if vlc_player.get_state() == vlc.State.Error: logging.error("Countdown video error") return False # Check if it's time to start the next animation current_time = time.time() if current_time >= last_animation_end_time + sequence_delay: # Time to start next animation if animation_index >= len(animation_sequence): # We've gone through all animations, reset or reshuffle animation_index = 0 if selection_method == "random": random.shuffle(animation_sequence) logging.info("Reshuffled animation sequence") # Get the current animation configuration current_anim = animation_sequence[animation_index] json_file = current_anim['json_file'] logging.info(f"Starting animation {animation_index + 1}: {json_file}") # Load and start the servo animation animation_data = load_servo_animation(json_file) if animation_data: # # Start servo animation in separate thread # animation_thread = threading.Thread( # target=execute_servo_animation, # args=(animation_data,) # ) # animation_thread.daemon = True # animation_thread.start() # Stop any currently running animation #AI EDIT NEW if current_animation_thread and current_animation_thread.is_alive(): logging.info("Stopping previous animation to start new one") # Start servo animation in separate thread animation_thread = threading.Thread( target=execute_servo_animation, args=(animation_data,) ) animation_thread.daemon = True current_animation_thread = animation_thread # Track this thread animation_thread.start() # Handle audio if enabled if current_anim.get('audio_enabled', False): mp3_file = current_anim.get('mp3_file') if mp3_file and os.path.exists(mp3_file): play_audio(mp3_file) else: logging.warning(f"Audio file not found: {mp3_file}") # Handle LEDs if enabled if current_anim.get('led_enabled', False): led_command = current_anim.get('led_command', 's1') send_led_command(led_command) # Calculate when this animation will end animation_duration = animation_data.get('duration', 5.0) # Default 5 seconds last_animation_end_time = current_time + animation_duration else: logging.warning(f"Skipping missing animation: {json_file}") # If animation failed to load, move to next one quickly last_animation_end_time = current_time + 1.0 animation_index += 1 # Small delay to prevent excessive CPU usage time.sleep(TIMING_PRECISION) logging.info("Active idle performance completed successfully") return True except Exception as e: logging.error(f"Error during active idle performance: {e}") return False # ============================================================================= # MAIN SYSTEM CONTROL # ============================================================================= def emergency_stop(): """ Emergency stop function - stops all systems immediately. """ global stop_requested logging.warning("Emergency stop initiated") stop_requested = True # Send LED reset command send_led_command(LED_RESET_COMMAND) # Stop all media stop_all_media() # Note: Servo positions will hold their last position # This is safer than trying to move them during an emergency stop def shutdown_system(): """ Gracefully shutdown the entire system. """ global stop_requested logging.info("Starting system shutdown") stop_requested = True # Send LED reset command send_led_command(LED_RESET_COMMAND) # Stop all media playback stop_all_media() # Close serial connections close_serial_connections() # Quit pygame pygame.quit() logging.info("System shutdown complete") def main_performance_loop(config): """ Main loop that alternates between main performance and active idle. Continues until stop is requested. Args: config (dict): System configuration """ global stop_requested, performance_active logging.info("Starting continuous performance loop") performance_active = True try: while not stop_requested: # Run main performance (4.5 minutes) logging.info("=" * 50) logging.info("STARTING MAIN PERFORMANCE PHASE") logging.info("=" * 50) success = run_main_performance(config) if not success or stop_requested: break # Small break between performances time.sleep(2) # Run active idle performance (15 minutes) logging.info("=" * 50) logging.info("STARTING ACTIVE IDLE PHASE") logging.info("=" * 50) success = run_active_idle_performance(config) if not success or stop_requested: break # Small break before looping back to main performance time.sleep(2) except KeyboardInterrupt: # User pressed Ctrl+C logging.info("Performance loop interrupted by user (Ctrl+C)") except Exception as e: logging.error(f"Unexpected error in performance loop: {e}") finally: performance_active = False logging.info("Performance loop ended") # ============================================================================= # COMMAND LINE INTERFACE # ============================================================================= def parse_command_line_arguments(): """ Parse command line arguments. Returns: argparse.Namespace: Parsed arguments """ parser = argparse.ArgumentParser( description='PC Animatronic Playback System', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python animatronic_controller.py python animatronic_controller.py --config my_config.json """ ) parser.add_argument( '--config', default=DEFAULT_CONFIG_FILE, help=f'Configuration file path (default: {DEFAULT_CONFIG_FILE})' ) return parser.parse_args() # ============================================================================= # MAIN PROGRAM ENTRY POINT # ============================================================================= def main(): """ Main program entry point. """ # Set up logging first so we can track everything setup_logging() try: # Parse command line arguments args = parse_command_line_arguments() logging.info("PC Animatronic Playback System starting...") logging.info(f"Using configuration file: {args.config}") # Load and validate configuration config = load_configuration(args.config) if not config: logging.error("Failed to load configuration - exiting") sys.exit(1) if not validate_configuration(config): logging.error("Configuration validation failed - exiting") sys.exit(1) # Initialize hardware connections logging.info("Initializing hardware connections...") if not initialize_serial_connections(): logging.error("Failed to initialize serial connections - exiting") sys.exit(1) # Initialize media systems logging.info("Initializing media systems...") if not initialize_media_systems(): logging.error("Failed to initialize media systems - exiting") close_serial_connections() sys.exit(1) # Everything is set up - start the performance loop logging.info("System initialization complete") logging.info("Press Ctrl+C to stop the system") try: main_performance_loop(config) except KeyboardInterrupt: logging.info("Stop signal received (Ctrl+C)") except Exception as e: logging.error(f"Critical error in main program: {e}") finally: # Always clean up, no matter how we exit logging.info("Cleaning up and shutting down...") shutdown_system() logging.info("Program terminated") if __name__ == "__main__": """ This block runs when the script is executed directly (not imported as a module). It calls the main() function to start the program. """ main()