#!/usr/bin/env python%
# encoding: utf-8
"""
This module holds the helper classes to represent an artifact list::
Base_dir
└── $name
└── $version
├── $name-$version.$extension
└── $name-$version.$extension.sig
This module has the classess that manage a set of artifacts, in a hierarchical
fashion, in the order::
name 1-* version 1-* inode 1-* artifact-instance
So that translated to classes, with the first being the placeholder for the
whole data structure, is::
ArtifactList 1-* ArtifactName 1-* ArtifactVersion \\
1-* ArtifactInode 1-* Artifact
All except the Artifact class are implemented as subclasses of the python dict,
so as key-value stores.
For clarification, here's a dictionary like diagram::
ArtifactList{
name1: ArtifactName{
version1: ArtifactVersion{
inode1: ArtifactInode[Artifact, Artifact, ...]
inode2: ArtifactInode[...]
},
version2: ArtifactVersion{...}
},
name2: ArtifactName{...}
}
**NOTE**:You have to implement at least the Artifact class
"""
import hashlib
import logging
import os
from abc import ABCMeta
from abc import abstractproperty
import six
from .utils import cmpfullver
from .utils import download
from .utils import sign_detached
logger = logging.getLogger(__name__)
[docs]class Artifact(object):
__metaclass__ = ABCMeta
def __init__(self, path, temp_dir='/tmp', verify_ssl=True):
"""
:param path: Path or url to the artifact
:param temp_dir: If url specified, will use that temporary dir to store
it, the caller should take care of creating and deleting that
temporary dir if needed
"""
if path.startswith('http:') or path.startswith('https:'):
name = path.rsplit('/', 1)[-1]
if not name:
raise Exception('Passed trailing slash in path %s, '
'unable to guess package name'
% path)
fpath = temp_dir + '/' + name
download(path, fpath, verify=verify_ssl)
path = fpath
self.path = path
# will be calculated if needed
self._md5 = None
@abstractproperty
def version(self):
pass
@abstractproperty
def extension(self):
pass
@abstractproperty
def name(self):
pass
@abstractproperty
def full_name(self):
"""
Unique Artifact Name.
This property should uniquely identify an artifact entity, in the
sense that if you have two artifacts with the same full_name they must
package the same content or one of them is wrongly generated (the
version was not bumped or something).
"""
pass
@abstractproperty
def type(self):
return 'artifact'
@property
def md5(self):
"""
Lazy md5 calculation.
"""
if self._md5 is None:
with open(self.path) as fdno:
self._md5 = hashlib.md5(fdno.read()).hexdigest()
return self._md5
[docs] def generate_path(self):
"""
Returns the theoretical path that the artifact should be, instead of
the current path it is.
"""
return '{name}/{version}/{name}-{version}{extension}'.format(
name=self.name,
version=self.version,
extension=self.extension,
)
[docs] def sign(self, key_path, passwd):
"""
Defines how to sign this artifact, by default with detached signature
"""
sign_detached(self.path, key=key_path, passphrase=passwd)
[docs] def __str__(self):
"""
This string uniquely identifies a artifact file, if two have the same
string representation, the must point to the same file or a copy of
it, if not, you wrongly generated two artifact with the same
version/name and different content
"""
return '%s(%s %s %s)' % (
self.type, self.name, self.version, self.extension,
)
def __repr__(self):
return self.__str__()
[docs]class ArtifactInode(list, object):
"""
Simple list, abstracts a set of rpm instances
"""
def __init__(self, inode):
self.inode = inode
super(ArtifactInode, self).__init__(self)
[docs] def delete(self, noop=False):
for artifact in self:
if not noop and os.path.exists(artifact.path):
os.remove(artifact.path)
elif noop:
logger.info('NOOP::%s would have been removed',
artifact.path)
[docs] def get_artifacts(self, regmatch=None, fmatch=None):
logger.debug('ArtifactInode::%s', self)
arts = list(self)
logger.debug('ArtifactInode::arts=%s', arts)
logger.debug('ArtifactInode::fmatch=%s', fmatch)
logger.debug('ArtifactInode::regmatch=%s', regmatch)
if regmatch:
arts = [art for art in self if regmatch.search(art.path)]
elif fmatch:
arts = [art for art in arts if fmatch(art)]
logger.debug(
'ArtifactInode::after filter arts=%s',
arts,
)
return arts
[docs]class ArtifactVersion(dict, object):
"""Abstracts a set of artifacts inodes for a version"""
def __init__(self, version, inode_class=ArtifactInode):
self.version = version
super(ArtifactVersion, self).__init__(self)
self.inode_class = inode_class
[docs] def add_artifact(self, artifact):
if artifact.inode not in self:
self[artifact.inode] = self.inode_class(artifact.inode)
self[artifact.inode].append(artifact)
return True
[docs] def delete_inode(self, inode, noop=False):
if inode in self:
self[inode].delete(noop)
self.pop(inode)
[docs] def get_artifacts(self, regmatch=None, fmatch=None):
arts = []
logger.debug('ArtifactVersion::regmatch=%s', regmatch)
logger.debug('ArtifactVersion::fmatch=%s', fmatch)
for inode in self.itervalues():
logger.debug(
'ArtifactVersion::Iterating ArtifactInode %s',
inode,
)
arts.extend(inode.get_artifacts(
regmatch=regmatch,
fmatch=fmatch
))
return arts
[docs] def delete(self, noop=False):
for inode in self:
self[inode].delete(noop)
self.pop(inode)
[docs]class ArtifactName(dict, object):
"""Dict of available versions for an artifact name"""
def __init__(self, name, version_class=ArtifactVersion):
self.name = name
super(ArtifactName, self).__init__(self)
self.version_class = version_class
[docs] def add_artifact(self, artifact, onlyifnewer):
if onlyifnewer and (
artifact.ver_rel in self or
next(
(
ver for ver in self.keys()
if cmpfullver(ver, artifact.version) >= 0
),
None,
)
):
return False
elif artifact.version not in self:
self[artifact.version] = self.version_class(artifact.version)
return self[artifact.version].add_artifact(artifact)
[docs] def get_latest(self, num=1):
"""
Returns the list of available inodes for the latest version
if any
"""
if not self:
return None
if not num:
num = len(self)
sorted_list = self.keys()
sorted_list.sort(cmp=cmpfullver)
latest = {}
if num > len(sorted_list):
num = len(sorted_list)
for pos in six.moves.xrange(num):
latest[sorted_list[pos]] = self.get(sorted_list[pos])
return latest
[docs] def delete_version(self, version, noop=False):
if version in self:
for inode in self[version].keys():
self[version].delete_inode(inode, noop=noop)
self.pop(version)
[docs] def delete(self, noop=False):
for version in self:
for inode in self[version].keys():
self[version].delete_inode(inode, noop=noop)
self.pop(version)
[docs] def get_artifacts(self, regmatch=None, fmatch=None, latest=0):
arts = []
logger.debug('ArtifactName::regmatch=%s', regmatch)
logger.debug('ArtifactName::fmatch=%s', fmatch)
logger.debug('ArtifactName::latest=%s', latest)
if latest:
versions = self.get_latest(num=latest).values()
else:
versions = self.values()
for version in versions:
logger.debug(
'ArtifactName::Iterating ArtifactVersion %s',
version,
)
arts.extend(version.get_artifacts(
regmatch=regmatch,
fmatch=fmatch
))
return arts
[docs]class ArtifactList(dict, object):
"""
Dict of artifacts, by name
"""
def __init__(self, name, name_class=ArtifactName):
self.name = name
super(ArtifactList, self).__init__(self)
self.name_class = name_class
[docs] def add_pkg(self, artifact, onlyifnewer=False):
if artifact.name is not None:
if artifact.name not in self:
self[artifact.name] = self.name_class(artifact.name)
return self[artifact.name].add_artifact(artifact, onlyifnewer)
[docs] def delete_version(self, art_name, art_version):
"""
Removes the given artifact's version if it's in the list
Args:
art_name (str): Name of the artifact to remove it's version
art_version (str): Version to remove
Returns:
None
"""
if art_name in self:
if art_version in self[art_name]:
self[art_name].delete_version(art_version)
if not self[art_name]:
self.pop(art_name)
[docs] def delete(self):
"""
Deletes all the artifacts in this list
"""
for name in self:
for version in self[name].keys():
self[name].delete_version(version)
self.pop(name)
[docs] def get_artifacts(self, regmatch=None, fmatch=None, latest=0):
"""
Gets the list of artifacts, filtered or not.
:param regmatch: Regular expression to filter the rpms path with
:param fmatch: Filter function, must return True for packages to be
included, or False to be excluded. The package object will be
passed as parameter
:param latest: number of latest versions to return (0 for all,)
"""
arts = []
logger.debug('ArtifactVersion::regmatch=%s', regmatch)
logger.debug('ArtifactVersion::fmatch=%s', fmatch)
logger.debug('ArtifactVersion::latest=%s', latest)
for name in self.itervalues():
logger.debug(
'ArtifactList::Iterating ArtifactName %s',
name,
)
arts.extend(name.get_artifacts(
regmatch=regmatch,
fmatch=fmatch,
latest=latest,
))
return arts