# Copyright (C) 2025 Siemens
#
# SPDX-License-Identifier: MIT
from collections import namedtuple
from collections.abc import Callable, Iterable
from dataclasses import dataclass
from debian.deb822 import Dsc, Deb822, Sources, Packages
from debian.debian_support import Version
import logging
from pathlib import Path
from ..util.compression import (
CompressionToolMissing,
find_compressed_file_variants,
stream_compressed_file,
)
from ..dpkg.package import BinaryPackage, SourcePackage
from .. import HAS_PYTHON_APT
logger = logging.getLogger(__name__)
[docs]
@dataclass
class ExtendedStates:
"""
The apt extended states encode information if a package is manually
installed or installed via a dependency only. Note, that the PackageFilter
maps arch=all binary packages to the architecture they are installed for
(usually that's the distro arch).
"""
PackageFilter = namedtuple("BinaryPackage", "name arch")
auto_installed: set[tuple[str, str]]
distro_archs: set[str]
[docs]
def is_manual(self, name: str, arch: str) -> bool:
"""
True if package is explicitly installed. Architecture unspecific
packages are mapped to the arch of the package that had it as a dependency.
As we don't know that when parsing, we simply scan in all architectures.
"""
if arch == "all":
return not any([(name, _arch) in self.auto_installed for _arch in self.distro_archs])
return (name, arch) not in self.auto_installed
[docs]
@classmethod
def from_file(
cls, file: str | Path, filter_fn: Callable[[PackageFilter], bool] | None = None
) -> "ExtendedStates":
"""Factory to create instance from the apt extended states file"""
auto_installed = set()
distro_archs = set()
with open(Path(file)) as f:
for s in Deb822.iter_paragraphs(f, use_apt_pkg=HAS_PYTHON_APT):
name = s.get("Package")
arch = s.get("Architecture")
if s.get("Auto-Installed") != "1":
continue
if (filter_fn is None) or (filter_fn(cls.PackageFilter(name, arch))):
auto_installed.add((name, arch))
distro_archs.add(arch)
return cls(auto_installed=auto_installed, distro_archs=distro_archs)
[docs]
@dataclass
class Repository:
"""Represents a debian repository as cached by apt."""
release_file: Path
origin: str | None
codename: str | None
architectures: list[str]
components: list[str] | None = None
version: Version | None = None
description: str | None = None
BinaryPackageFilter = namedtuple("BinaryPackage", "name arch version")
SourcePackageFilter = namedtuple("SourcePackage", "name version")
[docs]
@classmethod
def from_apt_cache(cls, lists_dir: str | Path) -> Iterable["Repository"]:
"""Create repositories from apt lists directory."""
for entry in Path(lists_dir).iterdir():
if entry.name.endswith("Release"):
with open(entry) as f:
repo = Deb822(f)
origin = repo.get("Origin")
codename = repo.get("Codename")
version = repo.get("Version")
architectures = repo.get("Architectures", "").split()
components = repo.get("Components")
description = repo.get("Description")
logger.info(f"Found apt lists cache repository: {entry}")
if not len(architectures):
logger.error(f"Repository does not specify 'Architectures', ignoring: {entry}")
continue
yield Repository(
release_file=entry,
origin=origin,
codename=codename,
version=Version(version) if version else None,
architectures=architectures,
components=components.split() if components else None,
description=description,
)
@classmethod
def _safe_srcpkg_filter(
cls, p: Packages, filter_fn: Callable[[SourcePackageFilter], bool] | None
) -> bool:
try:
package = p["Package"]
version = p["Version"]
except KeyError as e:
logger.warning(f"skipping invalid source package: {e}")
return False
if filter_fn is None:
return True
return filter_fn(cls.SourcePackageFilter(package, version))
@classmethod
def _safe_binpkg_filter(
cls, p: Packages, filter_fn: Callable[[BinaryPackageFilter], bool] | None
) -> bool:
try:
package = p["Package"]
arch = p["Architecture"]
version = p["Version"]
except KeyError as e:
logger.warning(f"skipping invalid binary package: {e}")
return False
if filter_fn is None:
return True
return filter_fn(cls.BinaryPackageFilter(package, arch, version))
@classmethod
def _make_srcpkgs(
cls,
sources: Iterable[Packages],
filter_fn: Callable[[SourcePackageFilter], bool] | None = None,
) -> Iterable[SourcePackage]:
_sources = filter(lambda p: cls._safe_srcpkg_filter(p, filter_fn), sources)
for source in _sources:
try:
yield SourcePackage.from_deb822(Dsc(source))
except KeyError as e:
logger.error("control file in is not valid deb822, skip entry")
logger.debug(e)
@classmethod
def _make_binpkgs(
cls,
packages: Iterable[Packages],
filter_fn: Callable[[BinaryPackageFilter], bool] | None = None,
) -> Iterable[BinaryPackage]:
_pkgs = filter(lambda p: cls._safe_binpkg_filter(p, filter_fn), packages)
for pkg in _pkgs:
try:
yield BinaryPackage.from_deb822(pkg)
except KeyError as e:
logger.error("control file in is not valid deb822, skip entry")
logger.debug(e)
@classmethod
def _parse_sources(
cls, sources_file: str, srcpkg_filter: Callable[[SourcePackageFilter], bool] | None = None
) -> Iterable["SourcePackage"]:
sources_path = Path(sources_file)
try:
if sources_path.exists():
with open(sources_path) as f:
logger.debug(f"Parsing apt cache source packages: {sources_file}")
sources_raw = Packages.iter_paragraphs(f, use_apt_pkg=HAS_PYTHON_APT)
yield from Repository._make_srcpkgs(sources_raw, srcpkg_filter)
else:
compressed_variant = find_compressed_file_variants(sources_path)[0]
content = stream_compressed_file(compressed_variant)
logger.debug(f"Parsing apt cache source packages: {sources_file}")
# TODO: in python-debian >= 1.0.0 it is possible to directly
# pass the filename of a compressed file when using apt_pkg
sources_raw = Packages.iter_paragraphs(content, use_apt_pkg=False)
yield from Repository._make_srcpkgs(sources_raw, srcpkg_filter)
except CompressionToolMissing as e:
logger.warning(f'{e}: skipping path "{compressed_variant}"')
except (FileNotFoundError, IndexError, RuntimeError):
logger.debug(f"Missing apt cache sources: {sources_file}")
@classmethod
def _parse_packages(
cls, packages_file: str, binpkg_filter: Callable[[BinaryPackageFilter], bool] | None = None
) -> Iterable[BinaryPackage]:
packages_path = Path(packages_file)
try:
if packages_path.exists():
with open(packages_path) as f:
packages_raw = Packages.iter_paragraphs(f, use_apt_pkg=HAS_PYTHON_APT)
logger.debug(f"Parsing apt cache binary packages: {packages_file}")
yield from Repository._make_binpkgs(packages_raw, binpkg_filter)
else:
compressed_variant = find_compressed_file_variants(packages_path)[0]
content = stream_compressed_file(compressed_variant)
# TODO: in python-debian >= 1.0.0 it is possible to directly
# pass the filename of a compressed file when using apt_pkg
packages_raw = Packages.iter_paragraphs(content, use_apt_pkg=False)
logger.debug(f"Parsing apt cache binary packages: {packages_file}")
yield from Repository._make_binpkgs(packages_raw, binpkg_filter)
except CompressionToolMissing as e:
logger.warning(f'{e}: skipping path "{compressed_variant}"')
except (FileNotFoundError, IndexError, RuntimeError):
logger.debug(f"Missing apt cache packages: {packages_file}")
@property
def repo_base(self):
return "_".join(str(self.release_file).split("_")[:-1])
[docs]
def sources(
self, filter_fn: Callable[[SourcePackageFilter], bool] | None = None
) -> Iterable[SourcePackage]:
"""Get all source packages from this repository."""
if self.components:
for component in self.components:
sources_file = "_".join([self.repo_base, component, "source", "Sources"])
yield from self._parse_sources(sources_file, filter_fn)
else:
sources_file = "_".join([self.repo_base, "source", "Sources"])
return self._parse_sources(sources_file, filter_fn)
[docs]
def binpackages(
self,
filter_fn: Callable[[BinaryPackageFilter], bool] | None = None,
ext_states: ExtendedStates = ExtendedStates(set(), set()),
) -> Iterable[BinaryPackage]:
"""Get all binary packages from this repository"""
if self.components:
for component in self.components:
for arch in self.architectures:
packages_file = "_".join(
[self.repo_base, component, f"binary-{arch}", "Packages"]
)
for p in self._parse_packages(packages_file, filter_fn):
p.manually_installed = ext_states.is_manual(p.name, p.architecture)
yield p
else:
for arch in self.architectures:
packages_file = "_".join([self.repo_base, f"binary-{arch}", "Packages"])
for p in self._parse_packages(packages_file, filter_fn):
p.manually_installed = ext_states.is_manual(p.name, p.architecture)
yield p