Cache system added using sqlite. Should be nicer to the server.
This commit is contained in:
168
tests/test_cache_extra.py
Normal file
168
tests/test_cache_extra.py
Normal file
@@ -0,0 +1,168 @@
|
||||
"""
|
||||
Additional high-value cache and settings tests.
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure repo root on sys.path before importing project modules
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
if str(ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(ROOT))
|
||||
|
||||
import pytest
|
||||
from PySide6.QtWidgets import QApplication
|
||||
|
||||
from src.cache import CacheDatabase, CacheManager, CachedClient
|
||||
from src.cache.database import SCHEMA_VERSION
|
||||
from src.api.models import Artist, Album, Song, Playlist, Genre
|
||||
from src.config.settings import Settings
|
||||
from src.widgets.accessible_text_dialog import AccessibleTextDialog
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def qt_offscreen_env():
|
||||
os.environ.setdefault("QT_QPA_PLATFORM", "offscreen")
|
||||
return
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def qt_app():
|
||||
app = QApplication.instance()
|
||||
if app:
|
||||
return app
|
||||
return QApplication([])
|
||||
|
||||
|
||||
def test_schema_migration_updates_version(tmp_path: Path):
|
||||
db = CacheDatabase("https://fake.example", tmp_path)
|
||||
db.setMeta("schema_version", "0")
|
||||
db.close()
|
||||
|
||||
db2 = CacheDatabase("https://fake.example", tmp_path)
|
||||
assert db2.getSchemaVersion() == SCHEMA_VERSION
|
||||
db2.close()
|
||||
|
||||
|
||||
def test_update_starred_sets_and_clears(tmp_path: Path):
|
||||
db = CacheDatabase("https://fake.example", tmp_path)
|
||||
cache = CacheManager(db)
|
||||
|
||||
song = Song(id="s1", title="Star me", album="a", albumId="al", artist="x")
|
||||
cache.setSong(song)
|
||||
cache.updateStarred("s1", "song", True)
|
||||
starred = cache.getStarredSongs()
|
||||
assert starred and starred[0].id == "s1"
|
||||
|
||||
cache.updateStarred("s1", "song", False)
|
||||
starred = cache.getStarredSongs()
|
||||
assert starred == []
|
||||
db.close()
|
||||
|
||||
|
||||
def test_playlist_changed_triggers_refetch(tmp_path: Path):
|
||||
class Client:
|
||||
def __init__(self):
|
||||
self.serverUrl = "https://fake.example"
|
||||
now = datetime.now()
|
||||
self._playlists = [
|
||||
Playlist(
|
||||
id="p1",
|
||||
name="List",
|
||||
songCount=1,
|
||||
songs=[Song(id="s1", title="Old", album="a", albumId="al", artist="x")],
|
||||
changed=now,
|
||||
)
|
||||
]
|
||||
self._changed_calls = 0
|
||||
|
||||
def getArtists(self, musicFolderId=None):
|
||||
return []
|
||||
|
||||
def getArtist(self, artistId):
|
||||
return {"artist": None, "albums": []}
|
||||
|
||||
def getAlbum(self, albumId):
|
||||
return {"album": None, "songs": []}
|
||||
|
||||
def getPlaylists(self):
|
||||
return [Playlist(id=p.id, name=p.name, songCount=p.songCount, changed=p.changed) for p in self._playlists]
|
||||
|
||||
def getPlaylist(self, playlistId):
|
||||
self._changed_calls += 1
|
||||
return next(p for p in self._playlists if p.id == playlistId)
|
||||
|
||||
def getGenres(self):
|
||||
return []
|
||||
|
||||
def getStarred(self):
|
||||
return {"artists": [], "albums": [], "songs": []}
|
||||
|
||||
def ping(self):
|
||||
return True
|
||||
|
||||
def clearCache(self):
|
||||
pass
|
||||
|
||||
client = Client()
|
||||
cached = CachedClient(client, tmp_path)
|
||||
cached.syncIncremental()
|
||||
# Update playlist change timestamp and add a new song
|
||||
client._playlists[0].changed = datetime.now() + timedelta(minutes=1)
|
||||
client._playlists[0].songs.append(
|
||||
Song(id="s2", title="New", album="a", albumId="al", artist="x")
|
||||
)
|
||||
cached.syncIncremental()
|
||||
playlist = cached.cache.getPlaylist("p1")
|
||||
assert playlist and [s.id for s in playlist.songs] == ["s1", "s2"]
|
||||
assert client._changed_calls == 2 # one per incremental run
|
||||
|
||||
|
||||
def test_genre_prune(tmp_path: Path):
|
||||
db = CacheDatabase("https://fake.example", tmp_path)
|
||||
cache = CacheManager(db)
|
||||
genres = [Genre(name="Rock", songCount=1, albumCount=1), Genre(name="Pop", songCount=2, albumCount=1)]
|
||||
cache.setGenres(genres)
|
||||
cache.deleteGenresNotIn(["Rock"])
|
||||
remaining = cache.getGenres()
|
||||
assert [g.name for g in remaining] == ["Rock"]
|
||||
db.close()
|
||||
|
||||
|
||||
def test_settings_respect_xdg(tmp_path: Path, monkeypatch):
|
||||
cfg = tmp_path / "cfg"
|
||||
data = tmp_path / "data"
|
||||
cache = tmp_path / "cache"
|
||||
monkeypatch.setenv("XDG_CONFIG_HOME", str(cfg))
|
||||
monkeypatch.setenv("XDG_DATA_HOME", str(data))
|
||||
monkeypatch.setenv("XDG_CACHE_HOME", str(cache))
|
||||
|
||||
settings = Settings()
|
||||
settings.set("interface", "pageStep", 7)
|
||||
settings.addServer("demo", "https://demo", "u", "p")
|
||||
|
||||
assert settings.configDir.is_relative_to(cfg)
|
||||
assert settings.configFile.exists()
|
||||
assert settings.serversFile.exists()
|
||||
with open(settings.configFile, "r", encoding="utf-8") as f:
|
||||
assert "pageStep" in f.read()
|
||||
|
||||
|
||||
def test_error_dialog_debounce(qt_app, monkeypatch):
|
||||
calls = []
|
||||
|
||||
def fake_exec(self):
|
||||
calls.append(time.monotonic())
|
||||
return 0
|
||||
|
||||
# Reset debounce state
|
||||
AccessibleTextDialog._lastErrorKey = ""
|
||||
AccessibleTextDialog._lastErrorTime = 0.0
|
||||
monkeypatch.setattr(AccessibleTextDialog, "exec", fake_exec)
|
||||
|
||||
AccessibleTextDialog.showError("Err", "Same", parent=None)
|
||||
AccessibleTextDialog.showError("Err", "Same", parent=None)
|
||||
assert len(calls) == 1
|
||||
Reference in New Issue
Block a user