#!/usr/bin/env python%
# encoding: utf-8
This module holds the helper classes to represent a repository, that in our
case (oVirt) is a set of repositories, in the form::

    ├── rpm
    │   └── $dist
    │       ├── repodata
    │       ├── SRPMS
    │       └── $arch
    └── src
        └── $name
            ├── $name-$version-src.tar.gz
            └── $name-$version-src.tar.gz.sig

This module has the classess that manage a set of rpms, ina hierarchical
fashion, in the order::

    name 1-* version 1-* inode 1-* rpm-instance

So that translated to classes, with the first being the placeholder for the
whole data structure, is::

    RPMList 1-* RPMName 1-* RPMVersion 1-* RPMInode 1-* RPM

All except the RPM class are implemented as subclasses of the python dict, so
as key-value stores.

For clarification, here's a dictionary like diagram::

        name1: RPMName{
            version1: RPMVersion{
                inode1: RPMInode[RPM, RPM, ...]
                inode2: RPMInode[...]
            version2: RPMVersion{...}
        name2: RPMName{...}

import logging
import os
import re
import subprocess

import six

import pexpect
import rpm

from ...artifact import Artifact
from ...artifact import ArtifactList
from ...artifact import ArtifactName
from ...artifact import ArtifactVersion
from ...utils import cmpfullver
from ...utils import download
from ...utils import gpg_get_keyuid
from ...utils import gpg_unlock

[docs]class WrongDistroException(Exception): pass
[docs]class RPM(Artifact): def __init__( self, path, temp_dir='/tmp', distro_reg=r'\.(fc|el)\d+', to_all_distros=(), verify_ssl=True, ): """ :param path: Path or url to the rpm :param temp_dir: If url specified, will use that temporary dir to store it, the caller should take care of creating and deleting that temporary dir if needed :param distro_regs: Regular expression to match the distributions from the release string of the rpm. :param to_all_distros: Special rpm names that must go to all the distributions ignoring their release strings """ trans = rpm.TransactionSet() # Do not fail for unsigned rpms trans.setVSFlags(rpm._RPMVSF_NOSIGNATURES) if path.startswith('http:') or path.startswith('https:'): name = path.rsplit('/', 1)[-1] if not name: raise Exception('Passed trailing slash in path %s, ' 'unable to guess package name' % path) fpath = temp_dir + '/' + name download(path, fpath, verify=verify_ssl) path = fpath self.path = path with open(path) as fdno: try: hdr = trans.hdrFromFdno(fdno) except Exception: logging.error("Failed to parse header for %s", path) raise self.inode = os.fstat(fdno.fileno()).st_ino self.is_source = hdr[rpm.RPMTAG_SOURCEPACKAGE] and True or False self.sourcerpm = hdr[rpm.RPMTAG_SOURCERPM] self._name = hdr[rpm.RPMTAG_NAME] self._version = hdr[rpm.RPMTAG_VERSION] self.major_version = self._version.split('.', 1)[0] self.release = hdr[rpm.RPMTAG_RELEASE] self.signature = hdr[rpm.RPMTAG_SIGPGP] self._raw_hdr = hdr # will be calculated if needed self._md5 = None # Check if this package has to go to all distros if any(( self._name for nreg in to_all_distros if re.match(nreg, self._name) )): self.distro = 'all' else: try: self.distro = self.get_distro(self.release, distro_reg) except WrongDistroException as e: logging.error( 'Wrong distribution for package: %s-%s', self._name, self._version ) raise e self.arch = hdr[rpm.RPMTAG_ARCH] or 'none' # remove the distro from the release for the version string if self.distro: release = re.sub( r'\.%s[^.]*' % self.distro, '', self.release, 1 ) else: release = self.release self.ver_rel = '%s-%s' % (self._version, release) with open(os.devnull, 'w') as devnull: output = subprocess.Popen( ["rpm", "-qip", path], stdout=subprocess.PIPE, stderr=devnull, ).communicate()[0] match ="Key ID (?P<key_id>\w+)\\n", output) self.key_hex = None if match: self.key_hex = match.groupdict()['key_id'].upper() @property def name(self): return '%s.%s.%s' % (self._name, self.distro, self.arch) @property def full_name(self): """ Unique RPM Name. This property should uniquely identify a rpm entity, in the sense that if you have two rpms with the same full_name they must package the same content or one of them is wrongly generated (the version was not bumped or something). """ return 'rpm(%s %s %s %s)' % (self._name, self.distro, self.arch, self.is_source and 'src' or 'bin') @property def version(self): return self.ver_rel @property def extension(self): if self.is_source: return '.src.rpm' return '.rpm' @property def type(self): if self.is_source: return 'source_rpm' return 'rpm'
[docs] @staticmethod def get_distro(release, distro_reg): match =, release) if match: return[1:] raise WrongDistroException('Unknown distro for %s' % release)
[docs] def generate_path(self, base_dir='rpm'): """ Returns the theoretical path that the rpm should be, instead of the current path it is. As explained at the module docs. If the package has to go to all distros, a placeholder for it will be set in the string """ if self.is_source: arch_path = 'SRPMS' arch_name = 'src' else: arch_path = self.arch arch_name = self.arch return '%s%s/%s/%s-%s-%s.%s.rpm' % ( base_dir + '/' if base_dir else '', '%s' if self.distro == 'all' else self.distro, arch_path, self._name, self._version, self.release, arch_name, )
[docs] def sign(self, key_path, passwd):"SIGNING: %s", self.path) gpg = gpg_unlock(key_path, passphrase=passwd) keyuid = gpg_get_keyuid(key_path, gpg=gpg) # Remove any existing signature from the rpm before signing it. # This is needed because is a signature already exist, even whith our # signature, when installing it yum raise an error like: # The GPG keys listed for the "oVirt 4.2 Pre-Release" repository are # already installed but they are not correct for this package. logging.debug('\nrpm --delsign %s\n' % (self.path,)) with open(os.devnull, 'w') as devnull: res = ['rpm', '--delsign', self.path], stdout=devnull, ) if res != 0: raise Exception( "rpm --delsign failed on %s with rc %d" % (self.path, res) ) # Signing the rpm with out key rpmsign_args = [ '--resign', '-D', '_signature gpg', '-D', '_gpg_name %s' % keyuid, # TODO: make this work with gpg2 too, fc>21 throws invalid ioctl '-D', '__gpg /usr/bin/gpg', self.path, ] logging.debug('\nrpmsign /\n' + ' /\n\t'.join(rpmsign_args)) child = pexpect.spawn( 'rpmsign', rpmsign_args, timeout=1200, # rpmsign may take a lot of time... env={"LC_ALL": "C"}, ) try: child.expect( [ 'pass phrase: ', 'passphrase: ', 'Passphrase: ', ], timeout=5, ) except Exception as exc: logging.error('Failed to sign') logging.debug(child) # overriding as the default exception includes too much # info, as passwords passed exc.value = exc.value.replace(passwd, '*****') raise exc # For some reason, on fedora>21 rpmsign needs some tries until it # properly signs done = False tries = 1 while not done: child.sendline(passwd) logging.debug('Sent pass to rpmsign, try number %d', tries) try: child.expect(pexpect.EOF, timeout=10) done = True except pexpect.TIMEOUT as exc: tries += 1 # signing big rpms might take it's time if tries >= 900: logging.error('Failed to sign') logging.debug(child) exc.value = exc.value.replace(passwd, '*****') raise exc child.close() if child.exitstatus != 0: logging.debug(child) raise Exception("Failed to sign package.") self.__init__(self.path) if not self.signature: logging.error('Failed to sign') raise Exception( "Failed to sign rpm %s with key '%s'" % (self.path, keyuid) ) del(gpg)
[docs] def __str__(self): """ This string uniquely identifies a rpm file, if two rpms have the same string representation, the must point to the same file or a copy of it, if not, you wrongly generated two rpms with the same version/release and different content, or you signed them with different keys """ return 'rpm(%s %s %s %s %s %s)' % (, self._version, self.release, self.arch, self.is_source and 'src' or 'bin', self.signature and 'signed' or 'unsigned', )
def __repr__(self): return self.__str__()
[docs]class RPMName(ArtifactName): """List of available versions for a package name"""
[docs] def add_pkg(self, pkg, onlyifnewer): is_there_newer = next( ( ver for ver in self.keys() if cmpfullver(ver, pkg.ver_rel) >= 0 ), False, ) if onlyifnewer and (is_there_newer or pkg.ver_rel in self): return False elif pkg.ver_rel not in self: self[pkg.ver_rel] = ArtifactVersion(pkg.ver_rel) return self[pkg.ver_rel].add_pkg(pkg)
[docs] def get_latest(self, num=1): """ Returns the list of available inodes for the latest version if any """ if not self: return None if not num: num = len(self) sorted_list = [ ver_name for ver_name, version in self.items() if version.get_artifacts( fmatch=lambda art: not art.is_source ) ] sorted_list.sort(cmp=cmpfullver) latest = {} if num > len(sorted_list): num = len(sorted_list) for pos in six.moves.xrange(num): latest[sorted_list[pos]] = self.get(sorted_list[pos]) return latest
[docs]class RPMList(ArtifactList): """ List of rpms, separated by name """ def __init__(self, name_class=RPMName): super(RPMList, self).__init__(self) self.name_class = name_class