Source code for chantal.db.models

from __future__ import annotations

"""
SQLAlchemy database models for Chantal.

This module defines the database schema for packages, repositories,
snapshots, and their relationships.
"""

import enum
from datetime import datetime

from sqlalchemy import (
    JSON,
    Boolean,
    Column,
    DateTime,
    Enum,
    ForeignKey,
    Index,
    Integer,
    String,
    Table,
    Text,
    UniqueConstraint,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


[docs] class RepositoryMode(str, enum.Enum): """Repository operation modes. - MIRROR: Full mirror, no filtering, metadata unchanged - FILTERED: Filtered packages with customized metadata (include/exclude, retention, etc.) - HOSTED: Self-hosted packages (for future use) """ MIRROR = "mirror" FILTERED = "filtered" HOSTED = "hosted"
[docs] class Base(DeclarativeBase): """Base class for all database models.""" pass
# Association table for many-to-many relationship between repositories and content items # This tracks which content items are currently in a repository (the "latest" state) repository_content_items = Table( "repository_content_items", Base.metadata, Column("repository_id", Integer, ForeignKey("repositories.id"), primary_key=True), Column("content_item_id", Integer, ForeignKey("content_items.id"), primary_key=True), Column("added_at", DateTime, default=datetime.utcnow, nullable=False), ) # Association table for many-to-many relationship between snapshots and content items # This tracks immutable point-in-time copies of repository state snapshot_content_items = Table( "snapshot_content_items", Base.metadata, Column("snapshot_id", Integer, ForeignKey("snapshots.id"), primary_key=True), Column("content_item_id", Integer, ForeignKey("content_items.id"), primary_key=True), ) # Association table for many-to-many relationship between repositories and repository files # This tracks which metadata/installer files are currently in a repository (the "latest" state) repository_repository_files = Table( "repository_repository_files", Base.metadata, Column("repository_id", Integer, ForeignKey("repositories.id"), primary_key=True), Column("repository_file_id", Integer, ForeignKey("repository_files.id"), primary_key=True), Column("added_at", DateTime, default=datetime.utcnow, nullable=False), ) # Association table for many-to-many relationship between snapshots and repository files # This tracks immutable point-in-time copies of repository metadata/installer files snapshot_repository_files = Table( "snapshot_repository_files", Base.metadata, Column("snapshot_id", Integer, ForeignKey("snapshots.id"), primary_key=True), Column("repository_file_id", Integer, ForeignKey("repository_files.id"), primary_key=True), )
[docs] class Repository(Base): """Repository model - represents a configured RPM/APT repository.""" __tablename__ = "repositories" id: Mapped[int] = mapped_column(Integer, primary_key=True) repo_id: Mapped[str] = mapped_column(String(255), unique=True, nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False) type: Mapped[str] = mapped_column(String(50), nullable=False) # rpm, apt feed: Mapped[str] = mapped_column(Text, nullable=False) # upstream URL (Pulp terminology) enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False) mode: Mapped[str] = mapped_column( Enum(RepositoryMode), default=RepositoryMode.FILTERED, nullable=False ) # Paths latest_path: Mapped[str | None] = mapped_column(Text, nullable=True) snapshots_path: Mapped[str | None] = mapped_column(Text, nullable=True) # Metadata created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # Sync state last_sync_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) last_sync_status: Mapped[str | None] = mapped_column( String(50), nullable=True ) # success, failed, running # Relationships snapshots: Mapped[list[Snapshot]] = relationship( "Snapshot", back_populates="repository", cascade="all, delete-orphan" ) sync_history: Mapped[list[SyncHistory]] = relationship( "SyncHistory", back_populates="repository", cascade="all, delete-orphan" ) content_items: Mapped[list[ContentItem]] = relationship( "ContentItem", secondary=repository_content_items, back_populates="repositories" ) repository_files: Mapped[list[RepositoryFile]] = relationship( "RepositoryFile", secondary=repository_repository_files, back_populates="repositories" ) def __repr__(self) -> str: return f"<Repository(repo_id='{self.repo_id}', type='{self.type}')>"
[docs] class ContentItem(Base): """Generic content model for all package types (RPM, Helm, APT, PyPI, etc.). Uses content-addressed storage - one content item file can be referenced by multiple repositories and snapshots. Type-specific metadata is stored as JSON in the metadata field. """ __tablename__ = "content_items" id: Mapped[int] = mapped_column(Integer, primary_key=True) # Content type (determines which plugin handles it) content_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True) # Values: 'rpm', 'helm', 'pypi', 'npm', 'rubygems', 'nuget', 'go', 'apt', 'apk', 'terraform', etc. # Universal fields (all content types have these) name: Mapped[str] = mapped_column(String(255), nullable=False, index=True) version: Mapped[str] = mapped_column(String(255), nullable=False, index=True) # Content addressing (pool storage) sha256: Mapped[str] = mapped_column(String(64), unique=True, nullable=False, index=True) size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) pool_path: Mapped[str] = mapped_column( Text, nullable=False ) # Relative path in pool (e.g., "ab/cd/abc123...rpm") filename: Mapped[str] = mapped_column(String(255), nullable=False) # Type-specific metadata as JSON # Structure depends on content_type: # - rpm: {epoch, release, arch, summary, description, provides, requires, ...} # - helm: {app_version, keywords, maintainers, icon, dependencies, ...} # - pypi: {python_requires, author, license, requires_dist, file_type, ...} # - etc. # Note: Named 'content_metadata' because 'metadata' is reserved by SQLAlchemy content_metadata: Mapped[dict] = mapped_column(JSON, nullable=False) # Timestamps created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) # Reference counting (for garbage collection) reference_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # Relationships (generic) repositories: Mapped[list[Repository]] = relationship( "Repository", secondary=repository_content_items, back_populates="content_items" ) snapshots: Mapped[list[Snapshot]] = relationship( "Snapshot", secondary=snapshot_content_items, back_populates="content_items" ) # Composite indexes for common queries __table_args__ = ( Index("idx_content_type_name", "content_type", "name"), Index("idx_content_type_name_version", "content_type", "name", "version"), ) def __repr__(self) -> str: return f"<ContentItem(type='{self.content_type}', name='{self.name}', version='{self.version}', sha256='{self.sha256[:8]}...')>" @property def epoch(self) -> str | None: """Get epoch from content metadata (RPM-specific).""" if self.content_type == "rpm": return self.content_metadata.get("epoch") return None @property def release(self) -> str | None: """Get release from content metadata (RPM-specific).""" if self.content_type == "rpm": return self.content_metadata.get("release") return None @property def arch(self) -> str | None: """Get architecture from content metadata (RPM/DEB-specific).""" if self.content_type in ("rpm", "deb"): return self.content_metadata.get("arch") return None @property def nevra(self) -> str | None: """Get NEVRA string for RPM packages (Name-Epoch:Version-Release.Arch). Returns None for non-RPM content types. """ if self.content_type != "rpm": return None epoch = self.content_metadata.get("epoch", "") release = self.content_metadata.get("release", "") arch = self.content_metadata.get("arch", "") epoch_str = f"{epoch}:" if epoch else "" release_str = f"-{release}" if release else "" return f"{self.name}-{epoch_str}{self.version}{release_str}.{arch}"
[docs] class Snapshot(Base): """Snapshot model - represents an immutable point-in-time repository state. A snapshot is a collection of packages. Multiple snapshots can reference the same package (content-addressed storage with reference counting). """ __tablename__ = "snapshots" id: Mapped[int] = mapped_column(Integer, primary_key=True) repository_id: Mapped[int] = mapped_column( Integer, ForeignKey("repositories.id"), nullable=False ) # Snapshot identification name: Mapped[str] = mapped_column(String(255), nullable=False, index=True) description: Mapped[str | None] = mapped_column(Text, nullable=True) # State created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) published_path: Mapped[str | None] = mapped_column(Text, nullable=True) # Statistics (cached for performance) package_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) total_size_bytes: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # Relationships repository: Mapped[Repository] = relationship("Repository", back_populates="snapshots") content_items: Mapped[list[ContentItem]] = relationship( "ContentItem", secondary=snapshot_content_items, back_populates="snapshots" ) repository_files: Mapped[list[RepositoryFile]] = relationship( "RepositoryFile", secondary=snapshot_repository_files, back_populates="snapshots" ) # Unique constraint: snapshot name must be unique per repository __table_args__ = (UniqueConstraint("repository_id", "name", name="uq_snapshot_name"),) def __repr__(self) -> str: return f"<Snapshot(name='{self.name}', repository_id={self.repository_id}, packages={self.package_count})>"
[docs] class SyncHistory(Base): """Sync history model - tracks repository synchronization events.""" __tablename__ = "sync_history" id: Mapped[int] = mapped_column(Integer, primary_key=True) repository_id: Mapped[int] = mapped_column( Integer, ForeignKey("repositories.id"), nullable=False ) # Sync timing started_at: Mapped[datetime] = mapped_column(DateTime, nullable=False) completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) # Sync result status: Mapped[str] = mapped_column(String(50), nullable=False) # running, success, failed error_message: Mapped[str | None] = mapped_column(Text, nullable=True) # Statistics packages_added: Mapped[int] = mapped_column(Integer, default=0, nullable=False) packages_removed: Mapped[int] = mapped_column(Integer, default=0, nullable=False) packages_updated: Mapped[int] = mapped_column(Integer, default=0, nullable=False) bytes_downloaded: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # Snapshot created during this sync snapshot_id: Mapped[int | None] = mapped_column( Integer, ForeignKey("snapshots.id"), nullable=True ) # Relationships repository: Mapped[Repository] = relationship("Repository", back_populates="sync_history") snapshot: Mapped[Snapshot | None] = relationship("Snapshot") def __repr__(self) -> str: return f"<SyncHistory(repository_id={self.repository_id}, status='{self.status}', started_at='{self.started_at}')>" @property def duration_seconds(self) -> float | None: """Calculate sync duration in seconds.""" if self.started_at and self.completed_at: delta = self.completed_at - self.started_at return delta.total_seconds() return None
[docs] class View(Base): """View model - groups multiple repositories into a single virtual repository. A view is a collection of repositories that can be published together as one. Views can be published as "latest" (mutable) or as snapshots (immutable). """ __tablename__ = "views" id: Mapped[int] = mapped_column(Integer, primary_key=True) # View identification name: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True) description: Mapped[str | None] = mapped_column(Text, nullable=True) # Repository type (all repos in view must have same type) repo_type: Mapped[str] = mapped_column(String(50), nullable=False) # rpm, apt # Timestamps created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # Publishing status (for "latest" publish) is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) published_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) published_path: Mapped[str | None] = mapped_column(Text, nullable=True) # Relationships view_repositories: Mapped[list[ViewRepository]] = relationship( "ViewRepository", back_populates="view", cascade="all, delete-orphan" ) view_snapshots: Mapped[list[ViewSnapshot]] = relationship( "ViewSnapshot", back_populates="view", cascade="all, delete-orphan" ) def __repr__(self) -> str: return f"<View(name='{self.name}', type='{self.repo_type}')>"
[docs] class ViewRepository(Base): """Junction table: View -> Repositories (with ordering). Defines which repositories are part of a view and their precedence order. """ __tablename__ = "view_repositories" id: Mapped[int] = mapped_column(Integer, primary_key=True) view_id: Mapped[int] = mapped_column(Integer, ForeignKey("views.id"), nullable=False) repository_id: Mapped[int] = mapped_column( Integer, ForeignKey("repositories.id"), nullable=False ) # Order/precedence for metadata merging (lower = higher priority) order: Mapped[int] = mapped_column(Integer, nullable=False) # Timestamps added_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) # Relationships view: Mapped[View] = relationship("View", back_populates="view_repositories") repository: Mapped[Repository] = relationship("Repository") # Unique constraint: repository can only be in view once __table_args__ = (UniqueConstraint("view_id", "repository_id", name="uq_view_repository"),) def __repr__(self) -> str: return f"<ViewRepository(view_id={self.view_id}, repository_id={self.repository_id}, order={self.order})>"
[docs] class ViewSnapshot(Base): """View snapshot model - represents an atomic snapshot of all repositories in a view. When creating a view snapshot, all repositories in the view are snapshotted simultaneously, creating an immutable point-in-time state of the entire view. """ __tablename__ = "view_snapshots" id: Mapped[int] = mapped_column(Integer, primary_key=True) view_id: Mapped[int] = mapped_column(Integer, ForeignKey("views.id"), nullable=False) # Snapshot identification name: Mapped[str] = mapped_column(String(255), nullable=False, index=True) description: Mapped[str | None] = mapped_column(Text, nullable=True) # State created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) # Which repository snapshots are included (JSON array of snapshot IDs) # Example: [12, 45, 67] - references Snapshot.id snapshot_ids: Mapped[list[int]] = mapped_column(JSON, nullable=False) # Publishing status is_published: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) published_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) published_path: Mapped[str | None] = mapped_column(Text, nullable=True) # Statistics (cached for performance) package_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False) total_size_bytes: Mapped[int] = mapped_column(Integer, default=0, nullable=False) # Relationships view: Mapped[View] = relationship("View", back_populates="view_snapshots") # Unique constraint: snapshot name must be unique per view __table_args__ = (UniqueConstraint("view_id", "name", name="uq_view_snapshot_name"),) def __repr__(self) -> str: return f"<ViewSnapshot(name='{self.name}', view_id={self.view_id}, snapshots={len(self.snapshot_ids)})>"
[docs] class RepositoryFile(Base): """Repository metadata and installer files. Stores non-package files like: - Metadata: updateinfo.xml, filelists.xml, comps.xml, modules.yaml, etc. - Signatures: repomd.xml.asc, Release.gpg - Kickstart: vmlinuz, initrd.img, .treeinfo - Debian installer: debian-installer/ - SUSE specific: susedata.xml, patterns.xml, products.xml Uses content-addressed storage like ContentItem - files can be shared across multiple repositories and snapshots. """ __tablename__ = "repository_files" id: Mapped[int] = mapped_column(Integer, primary_key=True) # Classification (NO ENUM - flexible for future SUSE/other formats!) file_category: Mapped[str] = mapped_column(String(50), nullable=False, index=True) # Values: "metadata", "signature", "kickstart", "debian-installer" file_type: Mapped[str] = mapped_column(String(100), nullable=False, index=True) # Values: "updateinfo", "filelists", "comps", "modules", # "vmlinuz", "initrd", ".treeinfo", # "susedata", "suseinfo", "patterns", "products" (SUSE future) # Content-addressed storage (in pool/files/ subdirectory) sha256: Mapped[str] = mapped_column(String(64), nullable=False, index=True) pool_path: Mapped[str] = mapped_column(Text, nullable=False) # Format: "files/ab/cd/abc123_updateinfo.xml.gz" size_bytes: Mapped[int] = mapped_column(Integer, nullable=False) # Publishing path (preserve exact upstream structure) original_path: Mapped[str] = mapped_column(Text, nullable=False) # Examples: # "repodata/abc123-updateinfo.xml.gz" # "images/pxeboot/vmlinuz" # ".treeinfo" # "v3.19/main/x86_64/APKINDEX.tar.gz" # Flexible metadata (type-specific info stored as JSON) # Note: Named 'file_metadata' because 'metadata' is reserved by SQLAlchemy file_metadata: Mapped[dict | None] = mapped_column(JSON, nullable=True) # Examples: # {"checksum_type": "sha256", "open_checksum": "xyz", "timestamp": 123456} # {"kernel_version": "5.14.0-362.8.1.el9_3"} # Timestamps created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow, nullable=False) updated_at: Mapped[datetime] = mapped_column( DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False ) # Relationships (many-to-many like ContentItem) repositories: Mapped[list[Repository]] = relationship( "Repository", secondary=repository_repository_files, back_populates="repository_files" ) snapshots: Mapped[list[Snapshot]] = relationship( "Snapshot", secondary=snapshot_repository_files, back_populates="repository_files" ) # Indexes for common queries __table_args__ = ( # Composite index for common query: "get all updateinfo files for repo X" Index("idx_repo_file_category", "file_category"), Index("idx_repo_file_type", "file_type"), ) def __repr__(self) -> str: return f"<RepositoryFile(category='{self.file_category}', type='{self.file_type}', path='{self.original_path}', sha256='{self.sha256[:8]}...')>"