New functionality added inspired by Wicked Quest game.
This commit is contained in:
313
save_manager.py
Normal file
313
save_manager.py
Normal file
@@ -0,0 +1,313 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Save/Load management system for Storm Games.
|
||||
|
||||
Provides atomic save operations with XDG-compliant paths,
|
||||
corruption detection, and automatic cleanup.
|
||||
"""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import tempfile
|
||||
import shutil
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from .services import PathService
|
||||
|
||||
|
||||
class SaveManager:
|
||||
"""Generic save/load manager for game state persistence.
|
||||
|
||||
Features:
|
||||
- XDG-compliant save directories
|
||||
- Atomic file operations using temporary files
|
||||
- Pickle-based serialization with version tracking
|
||||
- Automatic cleanup of old saves
|
||||
- Corruption detection and recovery
|
||||
- Comprehensive error handling
|
||||
|
||||
Example usage:
|
||||
# Initialize for a game
|
||||
save_manager = SaveManager("my-awesome-game")
|
||||
|
||||
# Save game state with metadata
|
||||
game_state = {"level": 5, "score": 1000, "inventory": ["sword", "potion"]}
|
||||
metadata = {"display_name": "Boss Level", "level": 5}
|
||||
save_manager.create_save(game_state, metadata)
|
||||
|
||||
# Load a save
|
||||
save_files = save_manager.get_save_files()
|
||||
if save_files:
|
||||
game_state, metadata = save_manager.load_save(save_files[0])
|
||||
"""
|
||||
|
||||
def __init__(self, game_name: Optional[str] = None, max_saves: int = 10):
|
||||
"""Initialize SaveManager.
|
||||
|
||||
Args:
|
||||
game_name: Name of the game for save directory. If None, uses PathService
|
||||
max_saves: Maximum number of saves to keep (older saves auto-deleted)
|
||||
"""
|
||||
self.max_saves = max_saves
|
||||
self.version = "1.0" # Save format version
|
||||
|
||||
# Get or initialize path service
|
||||
self.path_service = PathService.get_instance()
|
||||
|
||||
if game_name:
|
||||
# Initialize with specific game name
|
||||
self.path_service.initialize(game_name)
|
||||
|
||||
# Ensure we have a valid game path
|
||||
if not self.path_service.gamePath:
|
||||
raise ValueError("Game path not initialized. Either provide game_name or initialize PathService first.")
|
||||
|
||||
self.save_dir = Path(self.path_service.gamePath) / "saves"
|
||||
|
||||
# Create saves directory if it doesn't exist
|
||||
self.save_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def create_save(self, save_data: Any, metadata: Optional[Dict[str, Any]] = None) -> str:
|
||||
"""Create a new save file with the given data.
|
||||
|
||||
Args:
|
||||
save_data: Any pickle-serializable object to save
|
||||
metadata: Optional metadata dictionary for display purposes
|
||||
|
||||
Returns:
|
||||
Path to the created save file
|
||||
|
||||
Raises:
|
||||
Exception: If save operation fails
|
||||
"""
|
||||
if metadata is None:
|
||||
metadata = {}
|
||||
|
||||
# Generate filename with timestamp
|
||||
timestamp = int(time.time())
|
||||
display_name = metadata.get("display_name", f"Save {timestamp}")
|
||||
safe_name = self._sanitize_filename(display_name)
|
||||
filename = f"{timestamp}_{safe_name}.save"
|
||||
save_path = self.save_dir / filename
|
||||
|
||||
# Prepare save structure
|
||||
save_structure = {
|
||||
"version": self.version,
|
||||
"timestamp": timestamp,
|
||||
"metadata": metadata,
|
||||
"data": save_data
|
||||
}
|
||||
|
||||
# Atomic save operation using temporary file
|
||||
with tempfile.NamedTemporaryFile(mode='wb', dir=self.save_dir, delete=False) as temp_file:
|
||||
try:
|
||||
pickle.dump(save_structure, temp_file)
|
||||
temp_file.flush()
|
||||
os.fsync(temp_file.fileno()) # Force write to disk
|
||||
|
||||
# Atomically move temporary file to final location
|
||||
shutil.move(temp_file.name, save_path)
|
||||
|
||||
# Clean up old saves if we exceed max_saves
|
||||
self._cleanup_old_saves()
|
||||
|
||||
return str(save_path)
|
||||
|
||||
except Exception as e:
|
||||
# Clean up temporary file on error
|
||||
try:
|
||||
os.unlink(temp_file.name)
|
||||
except:
|
||||
pass
|
||||
raise Exception(f"Failed to create save: {e}")
|
||||
|
||||
def load_save(self, filepath: Union[str, Path]) -> tuple[Any, Dict[str, Any]]:
|
||||
"""Load save data from file.
|
||||
|
||||
Args:
|
||||
filepath: Path to the save file
|
||||
|
||||
Returns:
|
||||
Tuple of (save_data, metadata)
|
||||
|
||||
Raises:
|
||||
Exception: If load operation fails or file is corrupted
|
||||
"""
|
||||
filepath = Path(filepath)
|
||||
|
||||
if not filepath.exists():
|
||||
raise FileNotFoundError(f"Save file not found: {filepath}")
|
||||
|
||||
try:
|
||||
with open(filepath, 'rb') as save_file:
|
||||
save_structure = pickle.load(save_file)
|
||||
|
||||
# Validate save structure
|
||||
if not isinstance(save_structure, dict):
|
||||
raise ValueError("Invalid save file format")
|
||||
|
||||
required_fields = ["version", "timestamp", "data"]
|
||||
for field in required_fields:
|
||||
if field not in save_structure:
|
||||
raise ValueError(f"Save file missing required field: {field}")
|
||||
|
||||
# Extract data
|
||||
save_data = save_structure["data"]
|
||||
metadata = save_structure.get("metadata", {})
|
||||
|
||||
return save_data, metadata
|
||||
|
||||
except Exception as e:
|
||||
# Log corruption and attempt cleanup
|
||||
print(f"Warning: Corrupted save file detected: {filepath} - {e}")
|
||||
self._handle_corrupted_save(filepath)
|
||||
raise Exception(f"Failed to load save: {e}")
|
||||
|
||||
def get_save_files(self) -> List[Path]:
|
||||
"""Get list of save files sorted by creation time (newest first).
|
||||
|
||||
Returns:
|
||||
List of Path objects for save files
|
||||
"""
|
||||
if not self.save_dir.exists():
|
||||
return []
|
||||
|
||||
save_files = []
|
||||
for file_path in self.save_dir.glob("*.save"):
|
||||
try:
|
||||
# Validate file can be opened
|
||||
with open(file_path, 'rb') as f:
|
||||
save_structure = pickle.load(f)
|
||||
if isinstance(save_structure, dict) and "timestamp" in save_structure:
|
||||
save_files.append((file_path, save_structure["timestamp"]))
|
||||
except:
|
||||
# Skip corrupted files
|
||||
print(f"Warning: Skipping corrupted save file: {file_path}")
|
||||
continue
|
||||
|
||||
# Sort by timestamp (newest first)
|
||||
save_files.sort(key=lambda x: x[1], reverse=True)
|
||||
return [file_path for file_path, _ in save_files]
|
||||
|
||||
def get_save_info(self, filepath: Union[str, Path]) -> Dict[str, Any]:
|
||||
"""Get metadata information from a save file without loading the full data.
|
||||
|
||||
Args:
|
||||
filepath: Path to the save file
|
||||
|
||||
Returns:
|
||||
Dictionary with save information (metadata + timestamp)
|
||||
"""
|
||||
filepath = Path(filepath)
|
||||
|
||||
try:
|
||||
with open(filepath, 'rb') as save_file:
|
||||
save_structure = pickle.load(save_file)
|
||||
|
||||
return {
|
||||
"timestamp": save_structure.get("timestamp", 0),
|
||||
"metadata": save_structure.get("metadata", {}),
|
||||
"version": save_structure.get("version", "unknown"),
|
||||
"filepath": str(filepath),
|
||||
"filename": filepath.name
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
return {
|
||||
"timestamp": 0,
|
||||
"metadata": {"display_name": "Corrupted Save"},
|
||||
"version": "unknown",
|
||||
"filepath": str(filepath),
|
||||
"filename": filepath.name,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def has_saves(self) -> bool:
|
||||
"""Check if any save files exist.
|
||||
|
||||
Returns:
|
||||
True if save files exist, False otherwise
|
||||
"""
|
||||
return len(self.get_save_files()) > 0
|
||||
|
||||
def delete_save(self, filepath: Union[str, Path]) -> bool:
|
||||
"""Delete a specific save file.
|
||||
|
||||
Args:
|
||||
filepath: Path to the save file to delete
|
||||
|
||||
Returns:
|
||||
True if file was deleted, False if it didn't exist
|
||||
"""
|
||||
filepath = Path(filepath)
|
||||
|
||||
try:
|
||||
if filepath.exists():
|
||||
filepath.unlink()
|
||||
return True
|
||||
return False
|
||||
except Exception as e:
|
||||
print(f"Warning: Failed to delete save file {filepath}: {e}")
|
||||
return False
|
||||
|
||||
def cleanup_all_saves(self) -> int:
|
||||
"""Delete all save files.
|
||||
|
||||
Returns:
|
||||
Number of files deleted
|
||||
"""
|
||||
save_files = self.get_save_files()
|
||||
deleted_count = 0
|
||||
|
||||
for save_file in save_files:
|
||||
if self.delete_save(save_file):
|
||||
deleted_count += 1
|
||||
|
||||
return deleted_count
|
||||
|
||||
def _cleanup_old_saves(self) -> None:
|
||||
"""Remove old save files if we exceed max_saves limit."""
|
||||
save_files = self.get_save_files()
|
||||
|
||||
if len(save_files) > self.max_saves:
|
||||
# Delete oldest saves
|
||||
for save_file in save_files[self.max_saves:]:
|
||||
self.delete_save(save_file)
|
||||
|
||||
def _handle_corrupted_save(self, filepath: Path) -> None:
|
||||
"""Handle a corrupted save file by moving it to a backup location."""
|
||||
try:
|
||||
backup_dir = self.save_dir / "corrupted"
|
||||
backup_dir.mkdir(exist_ok=True)
|
||||
|
||||
backup_path = backup_dir / f"{filepath.name}.corrupted"
|
||||
shutil.move(str(filepath), str(backup_path))
|
||||
print(f"Moved corrupted save to: {backup_path}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to move corrupted save: {e}")
|
||||
# As last resort, try to delete it
|
||||
try:
|
||||
filepath.unlink()
|
||||
print(f"Deleted corrupted save: {filepath}")
|
||||
except:
|
||||
print(f"Could not clean up corrupted save: {filepath}")
|
||||
|
||||
def _sanitize_filename(self, filename: str) -> str:
|
||||
"""Sanitize filename for cross-platform compatibility."""
|
||||
# Remove invalid characters
|
||||
invalid_chars = '<>:"/\\|?*'
|
||||
for char in invalid_chars:
|
||||
filename = filename.replace(char, "_")
|
||||
|
||||
# Limit length and handle edge cases
|
||||
filename = filename.strip()
|
||||
if not filename:
|
||||
filename = "save"
|
||||
|
||||
# Truncate if too long
|
||||
if len(filename) > 100:
|
||||
filename = filename[:100]
|
||||
|
||||
return filename
|
Reference in New Issue
Block a user