add database manager and media manager
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -219,3 +219,4 @@ __marimo__/
|
|||||||
|
|
||||||
|
|
||||||
media/
|
media/
|
||||||
|
*.db
|
||||||
148
core/database.py
Normal file
148
core/database.py
Normal file
@@ -0,0 +1,148 @@
|
|||||||
|
# core/database.py
|
||||||
|
|
||||||
|
import sqlite3
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
DB_FILE = Path("app.db")
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseManager:
|
||||||
|
def __init__(self, db_path: Path = DB_FILE):
|
||||||
|
self.db_path = db_path
|
||||||
|
self.conn: sqlite3.Connection | None = None
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Połączenie z bazą
|
||||||
|
# -------------------------
|
||||||
|
def connect(self):
|
||||||
|
"""Nawiązuje połączenie z bazą i tworzy tabele, jeśli ich nie ma."""
|
||||||
|
self.conn = sqlite3.connect(self.db_path)
|
||||||
|
self.conn.row_factory = sqlite3.Row
|
||||||
|
self._create_tables()
|
||||||
|
|
||||||
|
def disconnect(self):
|
||||||
|
if self.conn:
|
||||||
|
self.conn.close()
|
||||||
|
self.conn = None
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Tworzenie tabel
|
||||||
|
# -------------------------
|
||||||
|
def _create_tables(self):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS colors (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
name TEXT UNIQUE NOT NULL,
|
||||||
|
has_icon INTEGER DEFAULT 0
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
cur.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE IF NOT EXISTS media (
|
||||||
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||||
|
color_id INTEGER NOT NULL,
|
||||||
|
filename TEXT NOT NULL,
|
||||||
|
file_type TEXT CHECK(file_type IN ('photo','video')),
|
||||||
|
timestamp TEXT DEFAULT CURRENT_TIMESTAMP,
|
||||||
|
FOREIGN KEY(color_id) REFERENCES colors(id) ON DELETE CASCADE
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Operacje na kolorach
|
||||||
|
# -------------------------
|
||||||
|
def add_color(self, name: str, has_icon: bool = False):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"INSERT OR IGNORE INTO colors (name, has_icon) VALUES (?, ?)",
|
||||||
|
(name, int(has_icon)),
|
||||||
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def get_color_id(self, name: str) -> int | None:
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("SELECT id FROM colors WHERE name = ?", (name,))
|
||||||
|
row = cur.fetchone()
|
||||||
|
return row["id"] if row else None
|
||||||
|
|
||||||
|
def get_all_colors(self) -> list[dict]:
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("SELECT * FROM colors ORDER BY name")
|
||||||
|
rows = cur.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
def update_color_name(self, old_name: str, new_name: str):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("UPDATE colors SET name = ? WHERE name = ?", (new_name, old_name))
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def update_color_icon_flag(self, name: str, has_icon: bool):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("UPDATE colors SET has_icon = ? WHERE name = ?", (int(has_icon), name))
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def delete_color(self, name: str):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("DELETE FROM colors WHERE name = ?", (name,))
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
# -------------------------
|
||||||
|
# Operacje na plikach
|
||||||
|
# -------------------------
|
||||||
|
def add_media(self, color_id: int, filename: str, file_type: str, timestamp: str | None = None):
|
||||||
|
if timestamp is None:
|
||||||
|
timestamp = datetime.now().isoformat()
|
||||||
|
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute(
|
||||||
|
"INSERT INTO media (color_id, filename, file_type, timestamp) VALUES (?, ?, ?, ?)",
|
||||||
|
(color_id, filename, file_type, timestamp),
|
||||||
|
)
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def get_media_for_color(self, color_id: int) -> list[dict]:
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("SELECT * FROM media WHERE color_id = ? ORDER BY timestamp DESC", (color_id,))
|
||||||
|
rows = cur.fetchall()
|
||||||
|
return [dict(r) for r in rows]
|
||||||
|
|
||||||
|
def delete_media(self, color_id: int, filename: str):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("DELETE FROM media WHERE color_id = ? AND filename = ?", (color_id, filename))
|
||||||
|
self.conn.commit()
|
||||||
|
|
||||||
|
def delete_all_media_for_color(self, color_id: int):
|
||||||
|
if self.conn is None:
|
||||||
|
raise RuntimeError("Database not connected")
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("DELETE FROM media WHERE color_id = ?", (color_id,))
|
||||||
|
self.conn.commit()
|
||||||
79
core/media.py
Normal file
79
core/media.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
import shutil
|
||||||
|
from core.database import DatabaseManager
|
||||||
|
|
||||||
|
MEDIA_DIR = Path("media")
|
||||||
|
DEFAULT_ICON = Path("media/default_icon.jpg")
|
||||||
|
|
||||||
|
class MediaRepository:
|
||||||
|
def __init__(self, db: DatabaseManager):
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def sync_media(self):
|
||||||
|
disk_colors = {d.name for d in MEDIA_DIR.iterdir() if d.is_dir()}
|
||||||
|
db_colors = {c["name"] for c in self.db.get_all_colors()}
|
||||||
|
|
||||||
|
# usuwanie brakujących kolorów
|
||||||
|
for missing in db_colors - disk_colors:
|
||||||
|
self.db.delete_color(missing)
|
||||||
|
|
||||||
|
# dodawanie nowych kolorów
|
||||||
|
for color in disk_colors - db_colors:
|
||||||
|
has_icon = (MEDIA_DIR / color / "icon.jpg").exists()
|
||||||
|
self.db.add_color(color, has_icon=has_icon)
|
||||||
|
|
||||||
|
# sprawdzanie plików dla każdego koloru
|
||||||
|
for color in disk_colors:
|
||||||
|
color_id = self.db.get_color_id(color)
|
||||||
|
if color_id is None:
|
||||||
|
continue
|
||||||
|
color_dir = MEDIA_DIR / color
|
||||||
|
|
||||||
|
disk_files = {f.name for f in color_dir.iterdir() if f.is_file() and f.name != "icon.jpg"}
|
||||||
|
db_files = {m["filename"] for m in self.db.get_media_for_color(color_id)}
|
||||||
|
|
||||||
|
# usuń brakujące
|
||||||
|
for missing in db_files - disk_files:
|
||||||
|
self.db.delete_media(color_id, missing)
|
||||||
|
|
||||||
|
# dodaj nowe
|
||||||
|
for new in disk_files - db_files:
|
||||||
|
ftype = "photo" if Path(new).suffix.lower() in [".jpg", ".png"] else "video"
|
||||||
|
self.db.add_media(color_id, new, ftype)
|
||||||
|
|
||||||
|
def add_color(self, name: str, icon_path: Path | None = None):
|
||||||
|
color_dir = MEDIA_DIR / name
|
||||||
|
color_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
icon_file = color_dir / "icon.jpg"
|
||||||
|
if icon_path and icon_path.exists():
|
||||||
|
shutil.copy(icon_path, icon_file)
|
||||||
|
else:
|
||||||
|
shutil.copy(DEFAULT_ICON, icon_file)
|
||||||
|
|
||||||
|
self.db.add_color(name, has_icon=True)
|
||||||
|
|
||||||
|
def remove_color(self, name: str):
|
||||||
|
if (MEDIA_DIR / name).exists():
|
||||||
|
shutil.rmtree(MEDIA_DIR / name)
|
||||||
|
self.db.delete_color(name)
|
||||||
|
|
||||||
|
def add_file(self, color: str, file_path: Path):
|
||||||
|
target_dir = MEDIA_DIR / color
|
||||||
|
target_dir.mkdir(exist_ok=True)
|
||||||
|
target_file = target_dir / file_path.name
|
||||||
|
shutil.copy(file_path, target_file)
|
||||||
|
|
||||||
|
ftype = "photo" if file_path.suffix.lower() in [".jpg", ".png"] else "video"
|
||||||
|
color_id = self.db.get_color_id(color)
|
||||||
|
if color_id is not None:
|
||||||
|
self.db.add_media(color_id, file_path.name, ftype)
|
||||||
|
|
||||||
|
def remove_file(self, color: str, filename: str):
|
||||||
|
file_path = MEDIA_DIR / color / filename
|
||||||
|
if file_path.exists():
|
||||||
|
file_path.unlink()
|
||||||
|
|
||||||
|
color_id = self.db.get_color_id(color)
|
||||||
|
if color_id is not None:
|
||||||
|
self.db.delete_media(color_id, filename)
|
||||||
8
main.py
8
main.py
@@ -2,11 +2,19 @@ import sys
|
|||||||
from PySide6.QtWidgets import QApplication
|
from PySide6.QtWidgets import QApplication
|
||||||
|
|
||||||
from ui.main_window import MainWindow
|
from ui.main_window import MainWindow
|
||||||
|
from core.database import DatabaseManager
|
||||||
|
from core.media import MediaRepository
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
app = QApplication(sys.argv)
|
app = QApplication(sys.argv)
|
||||||
|
db = DatabaseManager()
|
||||||
|
db.connect()
|
||||||
|
media_repo = MediaRepository(db)
|
||||||
|
media_repo.sync_media()
|
||||||
|
|
||||||
|
print(db.get_all_colors())
|
||||||
window = MainWindow()
|
window = MainWindow()
|
||||||
window.show()
|
window.show()
|
||||||
sys.exit(app.exec())
|
sys.exit(app.exec())
|
||||||
|
|||||||
Reference in New Issue
Block a user