#!/usr/bin/env python3
"""
PC Animatronic Playback System
Synchronizes servo motors, LEDs, and video/audio for animatronic performances.
This script controls:
- 7 servo motors via Pololu Mini Maestro controller
- LED strips via ESP32-S3
- Video playback via VLC
- Audio playback via pygame
Performance Flow: Main Performance (4.5 min) -> Active Idle (15 min) -> repeat
"""
import json # For reading JSON configuration and animation files
import time # For timing control and delays
import threading # For running multiple tasks simultaneously
import logging # For creating log files and error tracking
import argparse # For command line argument parsing
import sys # For system operations and exit functions
import os # For file path operations
import random # For random animation selection in active idle mode
import serial # For serial communication with hardware
import vlc # For video playback control
import pygame # For audio playback
# =============================================================================
# CONFIGURATION VARIABLES - Edit these to customize the system
# =============================================================================
# Hardware Configuration (Edit these COM ports for your system)
ESP32_COM_PORT = "COM5" # USB port for ESP32-S3 (LED controller)
MAESTRO_COM_PORT = "COM4" # USB port for Pololu Mini Maestro (servo controller)
ESP32_BAUD_RATE = 115200 # Communication speed for ESP32
MAESTRO_BAUD_RATE = 9600 # Communication speed for Mini Maestro
# Timing Configuration (Edit these for performance tuning)
TIMING_PRECISION = 0.01 # How often to check timing (seconds) - smaller = more precise
SERVO_COMMAND_DELAY = 0.001 # Delay between servo commands (seconds)
SERIAL_TIMEOUT = 1.0 # How long to wait for serial responses (seconds)
# Performance Configuration
DEFAULT_CONFIG_FILE = "animatronic_config.json" # Default configuration file name
MAIN_PERFORMANCE_DURATION = 270 # Main performance length (seconds) - 4.5 minutes
ACTIVE_IDLE_DURATION = 900 # Active idle length (seconds) - 15 minutes
# LED Commands (Edit these if your ESP32 uses different commands)
LED_START_COMMAND = "p" # Command to start main LED performance
LED_RESET_COMMAND = "r" # Command to reset/stop LEDs
LED_DIALOGUE_PREFIX = "s" # Prefix for dialogue LED commands (s1, s2, etc.)
# Logging Configuration
LOG_LEVEL = logging.INFO # Set to logging.DEBUG for more detailed logs
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s' # Log message format
# =============================================================================
# GLOBAL VARIABLES - These track system state
# =============================================================================
# Hardware connection objects
esp32_serial = None # Serial connection to ESP32
maestro_serial = None # Serial connection to Mini Maestro
vlc_instance = None # VLC media player instance
vlc_player = None # VLC player object
# Performance state tracking
current_performance_type = "" # Track if we're in "main" or "active_idle" mode
performance_start_time = 0 # When current performance started
stop_requested = False # Flag to stop the entire system
performance_active = False # Flag indicating if performance is running
current_animation_thread = None # ADD THIS LINE - Track current animation thread
# =============================================================================
# LOGGING SETUP
# =============================================================================
def setup_logging():
"""
Configure logging to write to both console and file.
This helps track what the system is doing and debug problems.
"""
# Create a logger object
logger = logging.getLogger()
logger.setLevel(LOG_LEVEL)
# Create formatter for log messages (timestamp, level, message)
formatter = logging.Formatter(LOG_FORMAT)
# Set up console logging (prints to screen)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(LOG_LEVEL)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Set up file logging (saves to file)
file_handler = logging.FileHandler('animatronic_system.log')
file_handler.setLevel(LOG_LEVEL)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logging.info("Logging system initialized")
# =============================================================================
# HARDWARE COMMUNICATION FUNCTIONS
# =============================================================================
def initialize_serial_connections():
"""
Establish serial connections to ESP32 and Mini Maestro.
Returns True if successful, False if there's a problem.
"""
global esp32_serial, maestro_serial
try:
# # Connect to ESP32 for LED control
# logging.info("PC Animatronic Playback System starting...")
# logging.info(f"Using configuration file: {args.config}")
# # Load and validate configuration
# config = load_configuration(args.config)
# if not config:
# logging.error("Failed to load configuration - exiting")
# sys.exit(1)
# if not validate_configuration(config):
# logging.error("Configuration validation failed - exiting")
# sys.exit(1)
logging.info(f"Connecting to ESP32 on {ESP32_COM_PORT} at {ESP32_BAUD_RATE} baud")
esp32_serial = serial.Serial(
port=ESP32_COM_PORT,
baudrate=ESP32_BAUD_RATE,
timeout=SERIAL_TIMEOUT
)
# Wait a moment for connection to stabilize
time.sleep(2)
logging.info("ESP32 connection established")
# Connect to Mini Maestro for servo control
logging.info(f"Connecting to Mini Maestro on {MAESTRO_COM_PORT} at {MAESTRO_BAUD_RATE} baud")
maestro_serial = serial.Serial(
port=MAESTRO_COM_PORT,
baudrate=MAESTRO_BAUD_RATE,
timeout=SERIAL_TIMEOUT
)
# Wait a moment for connection to stabilize
time.sleep(2)
logging.info("Mini Maestro connection established")
return True
except serial.SerialException as e:
# If we can't connect to hardware, log the error
logging.error(f"Failed to initialize serial connections: {e}")
return False
except Exception as e:
# Catch any other unexpected errors
logging.error(f"Unexpected error during serial initialization: {e}")
return False
# # Initialize hardware connections
# logging.info("Initializing hardware connections...")
# if not initialize_serial_connections():
# logging.error("Failed to initialize serial connections - exiting")
# sys.exit(1)
# # Initialize media systems
# logging.info("Initializing media systems...")
# if not initialize_media_systems():
# logging.error("Failed to initialize media systems - exiting")
# close_serial_connections()
# sys.exit(1)
# # Everything is set up - start the performance loop
# logging.info("System initialization complete")
# logging.info("Press Ctrl+C to stop the system")
# try:
# main_performance_loop(config)
# except KeyboardInterrupt:
# logging.info("Stop signal received (Ctrl+C)")
# except Exception as e:
# logging.error(f"Critical error in main program: {e}")
# finally:
# # Always clean up, no matter how we exit
# logging.info("Cleaning up and shutting down...")
# shutdown_system()
# logging.info("Program terminated")
# =============================================================================
# PROGRAM ENTRY POINT
# =============================================================================
def send_led_command(command):
"""
Send a command to the ESP32 to control LEDs.
Args:
command (str): The command to send (e.g., "p", "r", "s1")
"""
global esp32_serial
try:
# Check if ESP32 is connected
if esp32_serial is None or not esp32_serial.is_open:
logging.warning("ESP32 not connected - skipping LED command")
return False
# Send the command as bytes (serial communication uses bytes, not strings)
esp32_serial.write(command.encode('utf-8'))
# Flush ensures the command is sent immediately
esp32_serial.flush()
logging.info(f"Sent LED command: {command}")
return True
except serial.SerialException as e:
# If ESP32 disconnects during performance, continue without LEDs
logging.warning(f"ESP32 communication error: {e} - continuing without LEDs")
return False
except Exception as e:
# Log any other unexpected errors
logging.error(f"Unexpected error sending LED command: {e}")
return False
def send_servo_position(channel, position):
"""
Send a position command to a specific servo motor.
Uses Pololu Mini Maestro compact protocol.
Args:
channel (int): Servo channel number (0-11)
position (int): Target position (typically 608-2496 for MG90S servos)
"""
global maestro_serial
try:
# Check if Mini Maestro is connected
if maestro_serial is None or not maestro_serial.is_open:
logging.error("Mini Maestro not connected - cannot send servo command")
return False
# Convert position to the format Mini Maestro expects
# Mini Maestro uses quarter-microsecond units, so multiply by 4
target = position * 4
# Create command using Pololu compact protocol
# Command format: 0x84, channel, target_low_byte, target_high_byte
command = bytearray([
0x84, # Set Target command
channel, # Channel number
target & 0x7F, # Target low 7 bits
(target >> 7) & 0x7F # Target high 7 bits
])
# Send command to Mini Maestro
maestro_serial.write(command)
maestro_serial.flush()
logging.debug(f"Servo {channel} -> position {position}")
# Small delay to prevent overwhelming the controller
time.sleep(SERVO_COMMAND_DELAY)
return True
except serial.SerialException as e:
# If Mini Maestro disconnects, this is critical - stop performance
logging.error(f"Mini Maestro communication error: {e}")
return False
except Exception as e:
# Log any other unexpected errors
logging.error(f"Unexpected error sending servo command: {e}")
return False
def close_serial_connections():
"""
Safely close all serial connections when shutting down.
"""
global esp32_serial, maestro_serial
# Close ESP32 connection
if esp32_serial and esp32_serial.is_open:
try:
esp32_serial.close()
logging.info("ESP32 connection closed")
except Exception as e:
logging.error(f"Error closing ESP32 connection: {e}")
# Close Mini Maestro connection
if maestro_serial and maestro_serial.is_open:
try:
maestro_serial.close()
logging.info("Mini Maestro connection closed")
except Exception as e:
logging.error(f"Error closing Mini Maestro connection: {e}")
# =============================================================================
# MEDIA PLAYBACK FUNCTIONS
# =============================================================================
def initialize_media_systems():
"""
Set up VLC and pygame for media playback.
Returns True if successful, False otherwise.
"""
global vlc_instance
try:
# Initialize VLC for video playback
vlc_instance = vlc.Instance()
logging.info("VLC initialized for video playback")
# Initialize pygame for audio playback
pygame.mixer.init()
logging.info("Pygame initialized for audio playback")
return True
except Exception as e:
logging.error(f"Failed to initialize media systems: {e}")
return False
def play_video(video_file_path):
"""
Start playing a video file using VLC.
Args:
video_file_path (str): Path to the video file
Returns:
vlc.MediaPlayer: VLC player object, or None if failed
"""
global vlc_instance
try:
# Check if video file exists
if not os.path.exists(video_file_path):
logging.error(f"Video file not found: {video_file_path}")
return None
# Create media object from file
media = vlc_instance.media_new(video_file_path)
# Create player object
player = vlc_instance.media_player_new()
# Set the media to play
player.set_media(media)
# Start playback
player.play()
logging.info(f"Started video playback: {video_file_path}")
# Wait a moment for playback to actually start
time.sleep(0.5)
return player
except Exception as e:
logging.error(f"Error playing video {video_file_path}: {e}")
return None
def play_audio(audio_file_path):
"""
Play an MP3 audio file using pygame.
Args:
audio_file_path (str): Path to the MP3 file
Returns:
bool: True if playback started successfully, False otherwise
"""
try:
# Check if audio file exists
if not os.path.exists(audio_file_path):
logging.error(f"Audio file not found: {audio_file_path}")
return False
# Load and play the audio file
pygame.mixer.music.load(audio_file_path)
pygame.mixer.music.play()
logging.info(f"Started audio playback: {audio_file_path}")
return True
except Exception as e:
logging.error(f"Error playing audio {audio_file_path}: {e}")
return False
def stop_all_media():
"""
Stop all video and audio playback.
"""
global vlc_player
try:
# Stop VLC video playback
if vlc_player:
vlc_player.stop()
logging.info("Video playback stopped")
# Stop pygame audio playback
pygame.mixer.music.stop()
logging.info("Audio playback stopped")
except Exception as e:
logging.error(f"Error stopping media playback: {e}")
# =============================================================================
# ANIMATION FILE PROCESSING
# =============================================================================
def load_servo_animation(json_file_path):
"""
Load a servo animation from a JSON file.
Args:
json_file_path (str): Path to the JSON animation file
Returns:
dict: Animation data, or None if failed to load
"""
try:
# Check if file exists
if not os.path.exists(json_file_path):
logging.error(f"Animation file not found: {json_file_path}")
return None
# Read and parse JSON file
with open(json_file_path, 'r') as file:
animation_data = json.load(file)
logging.info(f"Loaded animation: {animation_data.get('name', 'Unknown')} from {json_file_path}")
return animation_data
except json.JSONDecodeError as e:
# Handle corrupted JSON files
logging.error(f"Invalid JSON in animation file {json_file_path}: {e}")
return None
except Exception as e:
# Handle other file errors
logging.error(f"Error loading animation file {json_file_path}: {e}")
return None
def execute_servo_animation(animation_data):
"""
Execute a servo animation by sending position commands at the correct times.
This function runs in its own thread to avoid blocking other operations.
Args:
animation_data (dict): Animation data loaded from JSON file
"""
global stop_requested
try:
# Get animation start time for timing calculations
start_time = time.time()
animation_name = animation_data.get('name', 'Unknown Animation')
logging.info(f"Starting servo animation: {animation_name}")
# Create a list of all position commands with timestamps
all_commands = []
# Process each servo track
for track_key, track_data in animation_data.get('tracks', {}).items():
channel = track_data.get('servo_channel')
positions = track_data.get('positions', [])
# Add each position command to our list
for position_data in positions:
timestamp = position_data.get('timestamp')
position = position_data.get('position')
all_commands.append({
'timestamp': timestamp,
'channel': channel,
'position': position
})
# Sort commands by timestamp so they execute in the right order
all_commands.sort(key=lambda x: x['timestamp'])
# Execute commands at correct times
for command in all_commands:
# Check if we should stop (either global stop or thread-specific stop) AI EDIT Stop old animation if new one started
if stop_requested or (threading.current_thread() != current_animation_thread):
logging.info(f"Animation '{animation_name}' stopped (new animation started or stop requested)")
break
# Calculate how long to wait before executing this command
target_time = start_time + command['timestamp']
current_time = time.time()
wait_time = target_time - current_time
# If we need to wait, sleep until it's time
if wait_time > 0:
time.sleep(wait_time)
# Send the servo command
success = send_servo_position(command['channel'], command['position'])
if not success:
# If servo communication fails, this is critical
logging.error("Servo communication failed - stopping animation")
break
logging.info(f"Completed servo animation: {animation_name}")
except Exception as e:
logging.error(f"Error executing servo animation: {e}")
# =============================================================================
# CONFIGURATION LOADING
# =============================================================================
def load_configuration(config_file_path):
"""
Load the system configuration from a JSON file.
Args:
config_file_path (str): Path to the configuration file
Returns:
dict: Configuration data, or None if failed to load
"""
try:
# Check if configuration file exists
if not os.path.exists(config_file_path):
logging.error(f"Configuration file not found: {config_file_path}")
return None
# Read and parse configuration file
with open(config_file_path, 'r') as file:
config = json.load(file)
logging.info(f"Loaded configuration from: {config_file_path}")
return config
except json.JSONDecodeError as e:
# Handle corrupted JSON configuration
logging.error(f"Invalid JSON in configuration file: {e}")
return None
except Exception as e:
# Handle other file errors
logging.error(f"Error loading configuration file: {e}")
return None
def validate_configuration(config):
"""
Check that the configuration file contains all required sections.
Args:
config (dict): Configuration data
Returns:
bool: True if configuration is valid, False otherwise
"""
try:
# Check for main performance section
if 'main_performance' not in config:
logging.error("Configuration missing 'main_performance' section")
return False
main_perf = config['main_performance']
# Check for required main performance fields
required_main_fields = ['video_file', 'led_start_command', 'servo_sequences']
for field in required_main_fields:
if field not in main_perf:
logging.error(f"Main performance configuration missing '{field}'")
return False
# Check for active idle section
if 'active_idle' not in config:
logging.error("Configuration missing 'active_idle' section")
return False
active_idle = config['active_idle']
# Check for required active idle fields
required_idle_fields = ['countdown_video', 'selection_method', 'animations']
for field in required_idle_fields:
if field not in active_idle:
logging.error(f"Active idle configuration missing '{field}'")
return False
# Validate that referenced files exist
video_files = [main_perf['video_file'], active_idle['countdown_video']]
for video_file in video_files:
if not os.path.exists(video_file):
logging.error(f"Video file not found: {video_file}")
return False
logging.info("Configuration validation passed")
return True
except Exception as e:
logging.error(f"Error validating configuration: {e}")
return False
# =============================================================================
# MAIN PERFORMANCE FUNCTIONS
# =============================================================================
def run_main_performance(config):
"""
Execute the main 4.5-minute performance with video and timed servo sequences.
Args:
config (dict): System configuration
Returns:
bool: True if performance completed successfully, False if stopped early
"""
global current_performance_type, performance_start_time, vlc_player, stop_requested
try:
# Set performance tracking variables
current_performance_type = "main"
performance_start_time = time.time()
logging.info("Starting main performance")
# Get main performance configuration
main_config = config['main_performance']
video_file = main_config['video_file']
led_command = main_config['led_start_command']
servo_sequences = main_config['servo_sequences']
# Start the main video
vlc_player = play_video(video_file)
if not vlc_player:
logging.error("Failed to start main performance video")
return False
# Start the LED performance
send_led_command(led_command)
# Convert servo sequence timestamps to float and sort them
timed_sequences = []
for timestamp_str, json_file in servo_sequences.items():
timestamp = float(timestamp_str)
timed_sequences.append((timestamp, json_file))
# Sort by timestamp
timed_sequences.sort(key=lambda x: x[0])
# Process each servo sequence at its scheduled time
sequence_index = 0
while sequence_index < len(timed_sequences):
# Check if we should stop
if stop_requested:
logging.info("Main performance stopped by user request")
return False
# Check if VLC is still playing
if vlc_player.get_state() == vlc.State.Ended or vlc_player.get_state() == vlc.State.Error:
logging.info("Main performance video ended")
break
# Get current playback time
current_time = time.time() - performance_start_time
# Check if it's time to start the next servo sequence
if sequence_index < len(timed_sequences):
timestamp, json_file = timed_sequences[sequence_index]
if current_time >= timestamp:
# Time to start this servo sequence
logging.info(f"Triggering servo sequence at {timestamp}s: {json_file}")
# # Load and start the animation in a separate thread
# animation_data = load_servo_animation(json_file)
# if animation_data:
# animation_thread = threading.Thread(
# target=execute_servo_animation,
# args=(animation_data,)
# )
# animation_thread.daemon = True # Dies when main program exits
# animation_thread.start()
# Stop any currently running animation AI EDIT Stop old animation to start new one
global current_animation_thread
if current_animation_thread and current_animation_thread.is_alive():
logging.info("Stopping previous animation to start new one")
# Load and start the animation in a separate thread
animation_data = load_servo_animation(json_file)
if animation_data:
animation_thread = threading.Thread(
target=execute_servo_animation,
args=(animation_data,)
)
animation_thread.daemon = True # Dies when main program exits
current_animation_thread = animation_thread # Track this thread
animation_thread.start()
else:
logging.warning(f"Skipping missing servo sequence: {json_file}")
sequence_index += 1
# Small delay to prevent excessive CPU usage
time.sleep(TIMING_PRECISION)
# Wait for main performance to complete (4.5 minutes total)
while time.time() - performance_start_time < MAIN_PERFORMANCE_DURATION:
if stop_requested:
logging.info("Main performance stopped by user request")
return False
# Check if video stopped unexpectedly
if vlc_player.get_state() == vlc.State.Error:
logging.error("Main performance video error")
return False
time.sleep(TIMING_PRECISION)
logging.info("Main performance completed successfully")
return True
except Exception as e:
logging.error(f"Error during main performance: {e}")
return False
# =============================================================================
# ACTIVE IDLE PERFORMANCE FUNCTIONS
# =============================================================================
def run_active_idle_performance(config):
"""
Execute the 15-minute active idle performance with countdown video and animations.
Args:
config (dict): System configuration
Returns:
bool: True if performance completed successfully, False if stopped early
"""
global current_performance_type, performance_start_time, vlc_player, stop_requested
try:
# Set performance tracking variables
current_performance_type = "active_idle"
performance_start_time = time.time()
logging.info("Starting active idle performance")
# Get active idle configuration
idle_config = config['active_idle']
countdown_video = idle_config['countdown_video']
selection_method = idle_config['selection_method']
animations = idle_config['animations']
sequence_delay = idle_config.get('sequence_delay', 0)
# Start the countdown video
vlc_player = play_video(countdown_video)
if not vlc_player:
logging.error("Failed to start countdown video")
return False
# Prepare animation sequence based on selection method
if selection_method == "random":
# Create a shuffled copy of animations list
animation_sequence = animations.copy()
random.shuffle(animation_sequence)
else: # sequential
animation_sequence = animations.copy()
# Track which animation we're currently on
animation_index = 0
last_animation_end_time = time.time()
# Run animations for the full 15-minute duration
while time.time() - performance_start_time < ACTIVE_IDLE_DURATION:
# Check if we should stop
if stop_requested:
logging.info("Active idle performance stopped by user request")
return False
# Check if countdown video is still playing
if vlc_player.get_state() == vlc.State.Error:
logging.error("Countdown video error")
return False
# Check if it's time to start the next animation
current_time = time.time()
if current_time >= last_animation_end_time + sequence_delay:
# Time to start next animation
if animation_index >= len(animation_sequence):
# We've gone through all animations, reset or reshuffle
animation_index = 0
if selection_method == "random":
random.shuffle(animation_sequence)
logging.info("Reshuffled animation sequence")
# Get the current animation configuration
current_anim = animation_sequence[animation_index]
json_file = current_anim['json_file']
logging.info(f"Starting animation {animation_index + 1}: {json_file}")
# Load and start the servo animation
animation_data = load_servo_animation(json_file)
if animation_data:
# # Start servo animation in separate thread
# animation_thread = threading.Thread(
# target=execute_servo_animation,
# args=(animation_data,)
# )
# animation_thread.daemon = True
# animation_thread.start()
# Stop any currently running animation #AI EDIT NEW
if current_animation_thread and current_animation_thread.is_alive():
logging.info("Stopping previous animation to start new one")
# Start servo animation in separate thread
animation_thread = threading.Thread(
target=execute_servo_animation,
args=(animation_data,)
)
animation_thread.daemon = True
current_animation_thread = animation_thread # Track this thread
animation_thread.start()
# Handle audio if enabled
if current_anim.get('audio_enabled', False):
mp3_file = current_anim.get('mp3_file')
if mp3_file and os.path.exists(mp3_file):
play_audio(mp3_file)
else:
logging.warning(f"Audio file not found: {mp3_file}")
# Handle LEDs if enabled
if current_anim.get('led_enabled', False):
led_command = current_anim.get('led_command', 's1')
send_led_command(led_command)
# Calculate when this animation will end
animation_duration = animation_data.get('duration', 5.0) # Default 5 seconds
last_animation_end_time = current_time + animation_duration
else:
logging.warning(f"Skipping missing animation: {json_file}")
# If animation failed to load, move to next one quickly
last_animation_end_time = current_time + 1.0
animation_index += 1
# Small delay to prevent excessive CPU usage
time.sleep(TIMING_PRECISION)
logging.info("Active idle performance completed successfully")
return True
except Exception as e:
logging.error(f"Error during active idle performance: {e}")
return False
# =============================================================================
# MAIN SYSTEM CONTROL
# =============================================================================
def emergency_stop():
"""
Emergency stop function - stops all systems immediately.
"""
global stop_requested
logging.warning("Emergency stop initiated")
stop_requested = True
# Send LED reset command
send_led_command(LED_RESET_COMMAND)
# Stop all media
stop_all_media()
# Note: Servo positions will hold their last position
# This is safer than trying to move them during an emergency stop
def shutdown_system():
"""
Gracefully shutdown the entire system.
"""
global stop_requested
logging.info("Starting system shutdown")
stop_requested = True
# Send LED reset command
send_led_command(LED_RESET_COMMAND)
# Stop all media playback
stop_all_media()
# Close serial connections
close_serial_connections()
# Quit pygame
pygame.quit()
logging.info("System shutdown complete")
def main_performance_loop(config):
"""
Main loop that alternates between main performance and active idle.
Continues until stop is requested.
Args:
config (dict): System configuration
"""
global stop_requested, performance_active
logging.info("Starting continuous performance loop")
performance_active = True
try:
while not stop_requested:
# Run main performance (4.5 minutes)
logging.info("=" * 50)
logging.info("STARTING MAIN PERFORMANCE PHASE")
logging.info("=" * 50)
success = run_main_performance(config)
if not success or stop_requested:
break
# Small break between performances
time.sleep(2)
# Run active idle performance (15 minutes)
logging.info("=" * 50)
logging.info("STARTING ACTIVE IDLE PHASE")
logging.info("=" * 50)
success = run_active_idle_performance(config)
if not success or stop_requested:
break
# Small break before looping back to main performance
time.sleep(2)
except KeyboardInterrupt:
# User pressed Ctrl+C
logging.info("Performance loop interrupted by user (Ctrl+C)")
except Exception as e:
logging.error(f"Unexpected error in performance loop: {e}")
finally:
performance_active = False
logging.info("Performance loop ended")
# =============================================================================
# COMMAND LINE INTERFACE
# =============================================================================
def parse_command_line_arguments():
"""
Parse command line arguments.
Returns:
argparse.Namespace: Parsed arguments
"""
parser = argparse.ArgumentParser(
description='PC Animatronic Playback System',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python animatronic_controller.py
python animatronic_controller.py --config my_config.json
"""
)
parser.add_argument(
'--config',
default=DEFAULT_CONFIG_FILE,
help=f'Configuration file path (default: {DEFAULT_CONFIG_FILE})'
)
return parser.parse_args()
# =============================================================================
# MAIN PROGRAM ENTRY POINT
# =============================================================================
def main():
"""
Main program entry point.
"""
# Set up logging first so we can track everything
setup_logging()
try:
# Parse command line arguments
args = parse_command_line_arguments()
logging.info("PC Animatronic Playback System starting...")
logging.info(f"Using configuration file: {args.config}")
# Load and validate configuration
config = load_configuration(args.config)
if not config:
logging.error("Failed to load configuration - exiting")
sys.exit(1)
if not validate_configuration(config):
logging.error("Configuration validation failed - exiting")
sys.exit(1)
# Initialize hardware connections
logging.info("Initializing hardware connections...")
if not initialize_serial_connections():
logging.error("Failed to initialize serial connections - exiting")
sys.exit(1)
# Initialize media systems
logging.info("Initializing media systems...")
if not initialize_media_systems():
logging.error("Failed to initialize media systems - exiting")
close_serial_connections()
sys.exit(1)
# Everything is set up - start the performance loop
logging.info("System initialization complete")
logging.info("Press Ctrl+C to stop the system")
try:
main_performance_loop(config)
except KeyboardInterrupt:
logging.info("Stop signal received (Ctrl+C)")
except Exception as e:
logging.error(f"Critical error in main program: {e}")
finally:
# Always clean up, no matter how we exit
logging.info("Cleaning up and shutting down...")
shutdown_system()
logging.info("Program terminated")
if __name__ == "__main__":
"""
This block runs when the script is executed directly (not imported as a module).
It calls the main() function to start the program.
"""
main()