# __ # ____ _____ | | _____ # / \\__ \ | | \__ \ # | | \/ __ \| |__/ __ \_ # |___| (____ /____(____ / # \/ \/ \/ # # Copyright (C) 2021, 2022 Blake Lee # # This file is part of nala # # nala is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # nala is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with nala. If not, see . """Nala fetch Module.""" from __future__ import annotations import contextlib import itertools import re import sys from asyncio import Semaphore, gather, get_event_loop, run as aiorun from pathlib import Path from ssl import SSLCertVerificationError, SSLError from typing import Iterable, List, Optional, Union import typer from apt import Cache from apt_pkg import get_architectures from httpx import ( AsyncClient, ConnectError, ConnectTimeout, HTTPError, HTTPStatusError, Limits, ReadTimeout, Timeout, get, ) from rich.progress import Progress, TaskID from nala import _, color from nala.constants import ( ERROR_PREFIX, NALA_SOURCES, NOTICE_PREFIX, SOURCELIST, SOURCEPARTS, ) from nala.downloader import print_error from nala.error import ParserError from nala.options import ASSUME_YES, DEBUG, MAN_HELP, VERBOSE, arguments, nala from nala.rich import ELLIPSIS, Live, Panel, Table, fetch_progress from nala.utils import ask, dprint, eprint, sudo_check, term from debian.deb822 import Deb822 # isort:skip DEBIAN = "Debian" UBUNTU = "Ubuntu" DEVUAN = "Devuan" DOMAIN_PATTERN = re.compile(r"https?://([A-Za-z_0-9.-]+).*") UBUNTU_COUNTRY = re.compile(r"(.*)") UBUNTU_MIRROR = re.compile(r"(.*)") FETCH_RANGE = re.compile(r"[0-9]+\.\.[0-9]+") LIMITS = Limits(max_connections=50) TIMEOUT = Timeout(timeout=5.0, read=1.0, pool=20.0) ErrorTypes = Union[HTTPStatusError, HTTPError, SSLError, ReadTimeout, OSError] INVALID_FILENAME_CHARS = re.compile("[^a-zA-Z0-9_.-]", re.ASCII) FETCH_HELP = _( "Nala will fetch mirrors with the lowest latency.\n\n" "For Debian https://mirror-master.debian.org/status/Mirrors.masterlist\n\n" "For Ubuntu https://launchpad.net/ubuntu/+archivemirrors-rss" ) # pylint: disable=too-many-instance-attributes class MirrorTest: """Class to test mirrors.""" def __init__( self, netselect: tuple[str, ...], release: str, check_sources: bool, https_only: bool, ): """Class to test mirrors.""" self.netselect = netselect self.netselect_scored: list[str] = [] self.release = release self.sources = check_sources self.https_only = https_only self.client: AsyncClient self.progress: Progress self.task: TaskID async def run_test(self) -> None: """Test mirrors.""" with fetch_progress as self.progress: self.task = self.progress.add_task("", total=len(self.netselect)) async with AsyncClient( follow_redirects=True, limits=LIMITS, timeout=TIMEOUT ) as self.client: loop = get_event_loop() semp = Semaphore(25) tasks = [ loop.create_task(self.net_select(mirror, semp)) for mirror in self.netselect ] await gather(*tasks) async def net_select(self, mirror: str, semp: Semaphore) -> None: """Take a URL, ping the domain and score the latency.""" async with semp: debugger = [f"Current Mirror: {mirror}"] regex = re.search(DOMAIN_PATTERN, mirror) if not regex: self.progress.advance(self.task) debugger.append("Regex Failed") dprint(debugger) return domain = regex[1] debugger.append(f"Regex Match: {domain}") with contextlib.suppress(RuntimeError): await self.netping(mirror, debugger) self.progress.advance(self.task) async def netping(self, mirror: str, debugger: list[str]) -> bool: """Fetch release file and score mirror.""" secure = False try: # Try to do https first https = mirror.replace("http://", "https://") try: response = await self.client.get(f"{https}dists/{self.release}/Release") response.raise_for_status() secure = True mirror = https # We catch all Exceptions because we will fall back to http except Exception as error: # pylint: disable=broad-except debugger.append(f"https attempt failed: {error}") if self.https_only: mirror_error(error, debugger) dprint(debugger) return False # We can fall back to http if it's necessary if not secure: response = await self.client.get( f"{mirror}dists/{self.release}/Release" ) response.raise_for_status() # Get rid of the decimal so we can prefix zeros for sorting. res = f"{response.elapsed.total_seconds() * 100:.0f}" if self.sources: source_response = await self.client.get( f"{mirror}dists/{self.release}/main/source/Release" ) source_response.raise_for_status() # We catch all exceptions here because it really doesn't matter except Exception as error: # pylint: disable=broad-except mirror_error(error, debugger) dprint(debugger) return False debugger.append(f"Download ms: {res}") if len(res) == 2: res = f"0{res}" elif len(res) == 1: res = f"00{res}" elif len(res) > 3: debugger.append("Mirror too slow") dprint(debugger) return False debugger.append(f"Appended: {res} {mirror}") dprint(debugger) self.netselect_scored.append(f"{res} {mirror}") return True def get_scored(self) -> tuple[str, ...]: """Return sorted tuple.""" return tuple(sorted(self.netselect_scored)) class FetchLive: """Interactive Fetch.""" def __init__( # pylint: disable=too-many-arguments self, live: Live, release: str, sources: list[str], count: int, netselect_scored: tuple[str, ...], ) -> None: """Interactive Fetch.""" self.live = live self.errors = 0 self.count = count self.mirror_list: dict[int, str] = {} self.user_list: dict[int, str] = {} self.index_list: set[int] self._gen_mirror_list(release, sources, netselect_scored) def _gen_mirror_list( self, release: str, sources: Iterable[str], netselect_scored: Iterable[str] ) -> None: """Generate the mirror list for display.""" index = 0 for line in netselect_scored: url = line[line.index(":") :].rstrip("/") if any(url in mirror and release in mirror for mirror in sources): continue self.mirror_list[index] = line index += 1 if len(self.mirror_list) == self.count: break def clear(self, lines: int) -> None: """Clear lines for the live display.""" for _ in range(lines + self.errors): term.write(term.CURSER_UP + f"\r{' '*term.columns}\r".encode()) self.errors = 0 def debug(self, msg: object) -> None: """Display debugging information with the live display.""" if arguments.debug: term.write(term.CURSER_UP * 24 + f"\r{' ' * term.columns}\r".encode()) term.write(term.CURSER_UP + f"\r{' ' * term.columns}\r".encode()) term.write(term.CURSER_UP + f"\r{' ' * term.columns}\r".encode()) term.write(term.CURSER_UP + f"\r{' ' * term.columns}\r".encode()) term.write(f"{msg}".encode()) term.write(term.CURSER_DOWN * 26 + f"\r{' ' * term.columns}\r".encode()) def error(self, msg: object) -> None: """Print an error out and keep track of how many.""" self.errors += 1 term.write(term.CURSER_UP + f"\r{' ' * term.columns}\r".encode()) eprint(msg) def choose_mirrors(self) -> None: """Allow user to choose their mirrors.""" while True: self.live.update( Panel.fit( gen_table(self.mirror_list), title="[bold default] Fastest Mirrors", title_align="left", border_style="bold green", ), refresh=True, ) self.live.stop() self.index_list = self.ask_index(self.count) if self.index_list: break def final_mirrors(self) -> bool: """Confirm that the final mirrors are okay.""" self.live.start() self.live.update( Panel.fit( gen_table(self.user_list), title="[bold white] Selected Mirrors", title_align="left", border_style="bold green", ), refresh=True, ) self.live.stop() return ask(_("Are these mirrors okay?")) def set_user_list(self) -> None: """Set the user selected list of mirrors.""" self.user_list = { num: mirror for num, mirror in self.mirror_list.items() if num in self.index_list } def ask_index(self, count: int) -> set[int]: """Ask user about the mirrors they would like to use.""" index_list: set[int] = set() response: str = input( _( "Mirrors you want to keep, separated by space or comma {selection}:" ).format(selection=f"({color('1')}..{color(str(count))})") + " " ) # Small single use wrapper to just clean up the code a bit # Debug has to be done at the return or everything will get messed up def _debug(passthrough: set[int]) -> set[int]: self.debug( f"Response: {response or 'Default'}\n" f"Range: {range(1, count + 1)}\n" f"User Range: {passthrough}" ) return passthrough try: for index in range_from_str(response, count): # Plus one is for taking care of the user seeing # numbers starting at 1 instead of 0 if index not in range(1, count + 1): self.error( _("{error} Index {index} doesn't exist.").format( error=ERROR_PREFIX, index=color(index, "YELLOW") ) ) # Returning an empty set will restart the self.choose_mirrors loop return _debug(set()) index_list.add(index - 1) except ValueError as error: self.error( _("{error} {value_error}").format( error=ERROR_PREFIX, value_error=color(f"{error}".capitalize(), "YELLOW"), ) ) # Returning an empty set will restart the self.choose_mirrors loop return _debug(set()) except ParserError as error: self.error( _("{error} Parser: {parser}").format(error=ERROR_PREFIX, parser=error) ) return _debug(index_list) def mirror_error(error: Exception, debugger: list[str]) -> None: """Handle errors when mirror testing.""" if isinstance(error, HTTPStatusError): if arguments.verbose: print_error(error) debugger.append(f"Status Code: {error.response.status_code}") return if arguments.verbose: if isinstance(error, SSLCertVerificationError): eprint(f"{ERROR_PREFIX} {error.reason} {error.verify_message}") elif isinstance(error, SSLError): eprint(f"{ERROR_PREFIX} {error.reason}") elif isinstance(error, (ConnectError, ConnectTimeout)): print_error(error) else: eprint(f"{ERROR_PREFIX} {error}") if isinstance(error, ReadTimeout): debugger.append("Mirror too slow") def get_and_parse_mirror( distro: str, country_list: Iterable[str] | None ) -> tuple[str, ...]: """Get and parse the mirror list.""" print(_("Fetching {distro} mirrors").format(distro=distro) + ELLIPSIS) if distro == DEBIAN: mirror = fetch_mirrors( "https://mirror-master.debian.org/status/Mirrors.masterlist", "\n\n" ) # This is what one of our "Mirrors might look like after split" # Site: mirrors.edge.kernel.org # Country: NL Netherlands # Country: US United States # Location: Amsterdam # Location: Parsippany, NJ # Location: San-Jose, CA # Archive-architecture: amd64 arm64 armel armhf i386 # Archive-http: /debian/ # Sponsor: packet.net https://packet.net/ elif distro == UBUNTU: mirror = fetch_mirrors( "https://launchpad.net/ubuntu/+archivemirrors-rss", "" ) # This is what one of our "Mirrors might look like after split" # Steadfast Networks # http://mirror.steadfastnet.com/ubuntu/ # # # 80 # # North America # United States # US # # Fri, 24 Dec 2021 05:26:30 -0000 # http://mirror.steadfastnet.com/ubuntu/ # elif distro == DEVUAN: mirror = fetch_mirrors("https://pkgmaster.devuan.org/mirror_list.txt", "\n\n") # FQDN: sledjhamr.org # BaseURL: sledjhamr.org/devuan # Bandwidth: 1Gb/s # Rate: 30min # Country: Netherlands # CountryCode: NL | BE | CH | CZ | DE | DK | FR | GB | GG | IE | IM | JE | LU # Protocols: HTTP | HTTPS | FTP | RSYNC # Active: yes # DNSRR: yes # DNSRRCC: yes else: # We should never really hit this. sys.exit( _("{error} Internal Error. Distro detection must be broken").format( error=ERROR_PREFIX ) ) return parse_mirror(distro, mirror, country_list, tuple(get_architectures())) def fetch_mirrors(url: str, splitter: str) -> tuple[str, ...]: """Attempt to fetch the url and split a list based on the splitter.""" try: response = get(url, timeout=15, follow_redirects=True) response.raise_for_status() mirror_list = response.text.split(splitter) except HTTPError: sys.exit( _("{error} unable to connect to {mirror}").format( error=ERROR_PREFIX, mirror=url ) ) return tuple(mirror_list) def parse_mirror( distro: str, master_mirror: tuple[str, ...], country_list: Iterable[str] | None, arches: tuple[str, ...], ) -> tuple[str, ...]: """Parse the mirror.""" mirror_set = set() if arguments.verbose: print(_("Parsing mirror list") + ELLIPSIS) # If no country is supplied then our list will be all countries countries = country_list or get_countries(master_mirror) for country, mirror in itertools.product(countries, master_mirror): if ( distro == DEBIAN and f"Country: {country.upper()}" in mirror and (url := debian_parser(mirror, arches)) ): mirror_set.add(url) continue if ( distro == UBUNTU and f"{country.upper()}" in mirror and (url := ubuntu_parser(mirror, arches)) ): mirror_set.add(url) continue if distro == DEVUAN: for line in mirror.splitlines(): # CountryCode: NL | BE | CH if line.startswith("CountryCode:") and country.upper() in line: if url := devuan_parser(mirror): mirror_set.add(url) continue return tuple(mirror_set) def get_countries(master_mirror: tuple[str, ...]) -> tuple[str, ...]: """Iterate the mirror list and return all valid countries.""" country_list = set() # The way we split the information we get nice and pretty mirror selections for mirror in master_mirror: for line in mirror.splitlines(): # Devuan Countries if "CountryCode:" in line: # CountryCode: BG | GR | RO | MK | RS | TR for country in line.split()[1:]: if line == "|": continue country_list.add(country) # Debian Countries elif "Country:" in line: # Country: SE Sweden country_list.add(line.split()[1]) # Ubuntu Countries elif "" in line: # US if result := re.search(UBUNTU_COUNTRY, line): country_list.add(result[1]) return tuple(country_list) def devuan_parser(mirror: str) -> str | None: """Parse the Debuan mirror.""" if "HTTP" not in mirror: return None url = None for line in mirror.splitlines(): # BaseURL: sledjhamr.org/devuan if line.startswith("BaseURL:"): url = line.split()[1] return f"http://{url}/devuan/" if url else None def debian_parser(mirror: str, arches: tuple[str, ...]) -> str | None: """Parse the Debian mirror.""" url = "http://" if "Archive-http:" in mirror and all(arch in mirror for arch in arches): for line in mirror.splitlines(): if line.startswith(("Archive-http:", "Site:")): # ['Site:', 'mirror.steadfastnet.com'] # ['Archive-http:', '/debian/'] url += line.split()[1] return None if url == "http://" else url def ubuntu_parser(mirror: str, arches: tuple[str, ...]) -> str | None: """Parse the Ubuntu mirror.""" # First section we get from Ubuntu is garbage. Let's ditch it and get to business if "Ubuntu Archive Mirrors Status" in mirror: return None only_ports = "amd64" not in arches and "i386" not in arches for line in mirror.splitlines(): # http://mirror.steadfastnet.com/ubuntu/ if result := re.search(UBUNTU_MIRROR, line): return None if only_ports and "ubuntu-ports" not in result[1] else result[1] return None def detect_release( debian: str, ubuntu: str, devuan: str ) -> tuple[str | None, str | None]: """Detect the distro and release.""" # Check if the release was specified. for dist, switch in ( (DEBIAN, debian), (DEVUAN, devuan), (UBUNTU, ubuntu), ): if switch: return dist, switch # If no release is specified try to detect it by keyrings. cache = Cache() for keyring in ( "devuan-keyring", "debian-archive-keyring", "ubuntu-keyring", "apt", ): if ( keyring not in cache or not (cand := cache[keyring].candidate) or not (origin := cand.origins) ): continue return origin[0].origin, origin[0].codename # Something is very wrong if apt has no origin. # So we parse os-release to see if we can detect anything release_file = Path("/etc/os-release") if not release_file.is_file(): # This will throw an error at the next step # ERROR: There was an issue detecting release. return None, None os_release: dict[str, str] = {} for line in release_file.read_text(encoding="utf-8", errors="replace").splitlines(): entry = line.split("=") os_release[entry[0]] = entry[1].strip('"') # If there is no name we'll just have it throw an error if not (name := os_release.get("NAME")): return None, None # This block is for Debian Testing/Sid. As they don't have a codename key. release = os_release.get("DEBIAN_CODENAME") or os_release.get("UBUNTU_CODENAME") if not release and "Debian" in name: try: release = os_release["PRETTY_NAME"].split().pop() except IndexError: return name, "Unknown" return name, release def parse_sources() -> list[str]: """Read sources files on disk.""" sources: list[str] = [] for file in [*SOURCEPARTS.iterdir(), SOURCELIST]: if ( file == NALA_SOURCES or not file.is_file() or INVALID_FILENAME_CHARS.search(file.name) ): continue if file.parent == SOURCEPARTS and file.suffix not in [".list", ".sources"]: continue if file.suffix in ".sources": sources.extend( f"{deb} {uri} {suite}" for deb822 in Deb822.iter_paragraphs( file.read_text(encoding="utf-8", errors="replace") ) for deb in deb822.get("Types", "").split() for uri in deb822.get("URIs", "").split() for suite in deb822.get("Suites", "").split() for enabled in [deb822.get("Enabled", "yes").lower()] if enabled not in ["no", "false"] and any(digit not in "0" for digit in enabled) ) else: sources.extend( line for line in file.read_text( encoding="utf-8", errors="replace" ).splitlines() if not line.lstrip().startswith("#") and line ) return sources def gen_table(str_list: dict[int, str]) -> Table: """Generate table for the live display.""" master_table = Table(padding=(0, 0), box=None) table = Table(padding=(0, 2), box=None) table.add_column("Index", justify="right", style="bold blue") table.add_column("Mirror") table.add_column("Score", style="bold blue") for num, line in str_list.items(): latency, mirror = line.split() table.add_row(f"{num + 1}", mirror, f"{latency.lstrip('0')} ms") master_table.add_row(table) master_table.add_row( # Add in a new line and indentation to line up the text "\n " + _("Score is how many milliseconds it takes to download the Release file"), style="italic", ) return master_table def range_from_str(string: str, count: int) -> Iterable[int]: """Get a range of integers from a string. See live.ask_index() """ # If it's empty the default is assumed if not string: # We return plus 1 for everything to match the numbers # That the user sees return list(range(1, count + 1)) if match := FETCH_RANGE.search(string): resp = match.string.split("..") # This will mean someone is trying to get a range # 0..10 = 0, 1, 2, 3, and so forth if len(resp) == 2: start = int(resp[0]) stop = int(resp[1]) + 1 # Convert the strings from input into integers and create the range return range(start, stop) # They must be trying to get a range of even or odd # 0..0..10 = 0 2 4 6 8 10 if (step := int(resp[0])) not in {0, 1}: raise ParserError("0 for even and 1 for odd") start = int(resp[1]) stop = int(resp[2]) + 1 is_odds = step == 1 return { num for num in range(start, stop) if (is_odds and num % 2) or (not is_odds and not num % 2) } # Use a set as we don't want to have duplicate numbers return {int(num) for num in re.split(r",|\s", string) if num} def format_component(url: str, component: str, release: str, non_free: bool) -> str: """Add non-free-firmware repository if applicable.""" # Starting with bookworm there is an additional component, non-free-firmware. # The best way to do this is just check if it exists for the mirror. if not non_free: return component try: get( f"{url}/dists/{release}/non-free-firmware/", timeout=15, follow_redirects=True, ).raise_for_status() except HTTPError: return component return f"{component} non-free-firmware" def build_sources( # pylint: disable=too-many-arguments release: str, component: str, sources: list[str], netselect_scored: Iterable[str], non_free: bool, fetches: int = 3, live: bool = False, check_sources: bool = False, ) -> str: """Build the sources file and return it as a string.""" source = "# Sources file built for nala\n\n" num = 0 for line in netselect_scored: # This splits off the score '030 http://mirror.steadfast.net/debian/' line = line[line.index("h") :] # This protects us from writing mirrors that we already have in the sources if any(line.rstrip("/") in mirror and release in mirror for mirror in sources): continue deb_entry = ( f"{line} {release} {format_component(line, component, release, non_free)}" ) source += f"deb {deb_entry}\n" if check_sources: source += f"deb-src {deb_entry}\n" source += "\n" num += 1 if not live and num == fetches: break if not live and num != fetches: eprint( _("{notice} Nala was unable to fetch {num} mirrors.").format( notice=NOTICE_PREFIX, num=fetches ) ) return source def write_sources(source: str) -> None: """Write mirrors to nala-sources.list.""" with open(NALA_SOURCES, "w", encoding="utf-8") as file: file.write(source) print(_("Sources have been written to {file}").format(file=NALA_SOURCES)) def check_supported( distro: str | None, release: str | None, country_list: Iterable[str] | None, non_free: bool, ctx: typer.Context, ) -> tuple[tuple[str, ...], str]: """Check if the distro is supported or not. If the distro is supported return mirror list and component. Error if the distro is not supported. """ if distro and release: if distro in (DEBIAN, DEVUAN) and release != "n/a": component = "main contrib non-free" if non_free else "main" return get_and_parse_mirror(distro, country_list), component if distro == UBUNTU: # It's ubuntu, you probably don't care about foss return ( get_and_parse_mirror(distro, country_list), "main restricted universe multiverse", ) if distro is None or release is None: eprint( _("{error} There was an issue detecting release.").format( error=ERROR_PREFIX ), end="\n\n", ) else: eprint( _("{error} {distro} {release} is unsupported.").format( error=ERROR_PREFIX, distro=distro, release=release ) ) eprint(_("You can specify Ubuntu or Debian manually."), end="\n\n") eprint(ctx.get_help()) sys.exit(1) @nala.command( short_help=_("Fetch fast mirrors to speed up downloads."), help=FETCH_HELP ) # pylint: disable=unused-argument,too-many-arguments,too-many-locals def fetch( ctx: typer.Context, debian: str = typer.Option("", metavar="sid", help=_("Choose the Debian release.")), ubuntu: str = typer.Option( "", metavar="jammy", help=_("Choose the Ubuntu release.") ), devuan: str = typer.Option( "", metavar="stable", help=_("Choose the Devuan release.") ), fetches: int = typer.Option( 0, help=_("Number of mirrors to fetch. [defaults: 16, --auto(3)]"), show_default=False, ), https_only: bool = typer.Option( False, "--https-only", help="Only get https mirrors." ), sources: bool = typer.Option( False, "--sources", help=_("Add the source repos for the mirrors if it exists.") ), non_free: bool = typer.Option( False, "--non-free", help=_("Add contrib and non-free repos.") ), auto: bool = typer.Option( False, "--auto", help=_("Run fetch uninteractively. Will still prompt for overwrite."), ), debug: bool = DEBUG, assume_yes: bool = ASSUME_YES, country_list: Optional[List[str]] = typer.Option( None, "-c", "--country", metavar="US", help=_("Choose only mirrors of a specific ISO country code."), ), verbose: bool = VERBOSE, man_help: bool = MAN_HELP, ) -> None: """Nala will fetch mirrors with the lowest latency. For Debian https://mirror-master.debian.org/status/Mirrors.masterlist For Ubuntu https://launchpad.net/ubuntu/+archivemirrors-rss """ sudo_check() # Set dynamic default fetch option if fetches == 0: fetches = 3 if auto else 16 distro, release = detect_release(debian, ubuntu, devuan) netselect, component = check_supported(distro, release, country_list, non_free, ctx) assert distro and release dprint(netselect) dprint(f"Distro: {distro}, Release: {release}, Component: {component}") mirror_test = MirrorTest(netselect, release, sources, https_only) aiorun(mirror_test.run_test()) if not (netselect_scored := mirror_test.get_scored()): sys.exit( _("{error} Nala was unable to find any mirrors.").format(error=ERROR_PREFIX) ) dprint(netselect_scored) dprint(f"Size of original list: {len(netselect)}") dprint(f"Size of scored list: {len(netselect_scored)}") dprint(f"Writing from: {netselect_scored[:fetches]}") if auto: source = build_sources( release, component, parse_sources(), netselect_scored, non_free, fetches=fetches, check_sources=sources, ) print(source, end="") if NALA_SOURCES.exists() and not ask( _("{file} already exists.\nContinue and overwrite it?").format( file=color(NALA_SOURCES, "YELLOW") ) ): sys.exit(_("Abort.")) write_sources(source) return sources_list = parse_sources() with Live(auto_refresh=False) as live: fetch_live = FetchLive(live, release, sources_list, fetches, netselect_scored) while True: fetch_live.choose_mirrors() fetch_live.clear(2) fetch_live.set_user_list() if fetch_live.final_mirrors(): break fetch_live.clear(2) fetch_live.live.start() source = build_sources( release, component, sources_list, fetch_live.user_list.values(), non_free, live=True, check_sources=sources, ) write_sources(source)