Source code for utah.iso

# Ubuntu Testing Automation Harness
# Copyright 2012 Canonical Ltd.

# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.

# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.

# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""
All commands use bsdtar and accept a logmethod to use for logging.
"""
import logging
import logging.handlers
import os
import shutil
import subprocess
import sys
import urllib
import re

from collections import defaultdict
from hashlib import md5

from utah.config import config
from utah.exceptions import UTAHException


[docs]class UTAHISOException(UTAHException): """Provide an exception specific to ISO errors.""" pass
def _get_resource(method, url, *args, **kw): try: resource = method(url, *args, **kw) return resource except IOError as err: raise UTAHISOException( 'IOError when downloading {}: {}' .format(url, err), external=True) except urllib.ContentTooShortError as err: raise UTAHISOException( 'Error when downloading {} (probably interrupted): {}' .format(url, err), external=True)
[docs]class ISO(object): """Provide a simplified method of interfacing with images.""" def __init__(self, arch=None, image=None, installtype=None, logger=None, series=None): if logger is None: self._loggersetup() else: self.logger = logger if image is None: path = self.downloadiso(arch=arch, installtype=installtype, series=series) else: if image.startswith('~'): path = os.path.expanduser(image) self.logger.debug('Rewriting ~-based path: %s to: %s', image, path) else: path = image self.percent = 0 self.logger.info('Preparing image: %s', path) self.image = _get_resource(urllib.urlretrieve, path, reporthook=self.dldisplay)[0] self.logger.info('%s is locally available as %s', path, self.image) self.installtype = self.getinstalltype() if self.installtype == 'mini': self.logger.debug('Using mini image') self.infofile = '.disk/mini-info' else: self.logger.debug('Using normal image') self.infofile = './.disk/info' # Info file looks like this: # Ubuntu 12.04 LTS "Precise Pangolin" - Release i386 (20120423) # or this: # Ubuntu 12.10 "Quantal Quetzal" - Alpha amd64 (20120820) # i.e. Ubuntu Version (LTS) "Series Codename" - Alpha/Beta/Release # arch (buildnumber) self.media_info = self.dump(self.infofile) self.arch = self.getarch() self.series = self.getseries() self.buildnumber = self.getbuildnumber() def _loggersetup(self): """Initialize the logging for the image.""" self.logger = logging.getLogger('utah.iso') self.logger.propagate = False self.logger.setLevel(logging.INFO) for handler in list(self.logger.handlers): self.logger.removeHandler(handler) del handler self.consolehandler = logging.StreamHandler(stream=sys.stderr) console_formatter = logging.Formatter('%(levelname)s: %(message)s') self.consolehandler.setFormatter(console_formatter) self.consolehandler.setLevel(config.consoleloglevel) self.logger.addHandler(self.consolehandler) self.filehandler = logging.handlers.WatchedFileHandler(config.logfile) file_formatter = logging.Formatter( '%(asctime)s iso %(levelname)s: %(message)s') self.filehandler.setFormatter(file_formatter) self.filehandler.setLevel(config.fileloglevel) self.logger.addHandler(self.filehandler) if config.debuglog is not None: self.logger.setLevel(logging.DEBUG) self.debughandler = (logging.handlers .WatchedFileHandler(config.debuglog)) self.debughandler.setFormatter(file_formatter) self.debughandler.setLevel(logging.DEBUG) self.logger.addHandler(self.debughandler)
[docs] def getinstalltype(self): """Inspect the image's files to get the image type. If .disk/mini-info exists, it's mini. If the casper directory exists, it's desktop. If ubuntu-server.seed exists in the preseeds directory, it's server. :returns: Image type :rtype: str """ self.logger.info('Getting image type of %s', self.image) files = set(self.listfiles(returnlist=True)) installtype = 'alternate' if '.disk/mini-info' in files: installtype = 'mini' elif 'casper' in files or './casper' in files: installtype = 'desktop' elif ('preseed/ubuntu-server.seed' in files or './preseed/ubuntu-server.seed' in files): installtype = 'server' self.logger.info('Image type is: %s', installtype) return installtype
[docs] def getarch(self): """Unpack the image's info file to get the arch. :returns: Image architecture :rtype: str """ arch = self.media_info.split()[-2] self.logger.info('Arch is: %s', arch) return arch
[docs] def getseries(self): """Unpack the image's info file to get the series. :returns: Image series :rtype: str """ for word in self.media_info.split(): if word.startswith('"'): series = word.strip('"').lower() break self.logger.info('Series is %s', series) return series
[docs] def getbuildnumber(self): """Unpack the image's info file to get the build number. :returns: Build number of the image. :rtype: str """ match = re.search('.*\(([0-9.]+)\)$', self.media_info) build_number = match.group(1) if match else '?' return build_number
[docs] def dldisplay(self, blocks, size, total): """Log download information (i.e., as a urllib callback). :param blocks: Number of blocks downloaded :type blocks: int :param size: Size of blocks downloaded :type size: int :param total: Total size of download :type size: int """ read = blocks * size percent = 100 * read / total if percent >= self.percent: self.logger.info('File %s%% downloaded', percent) self.percent += config.dlpercentincrement self.logger.debug('%s read, %s%% of %s total', read, percent, total)
[docs] def listfiles(self, returnlist=False): """Return the contents of the ISO. Return either a subprocess instance listing the contents of an ISO, or a list of files in the ISO if returnlist is True. :returns: The contents of the ISO :rtype: list or object """ # TODO: See if we still need this outside of validation # move it to validation if it's only needed there cmd = ['bsdtar', '-t', '-f', self.image] self.logger.debug('bsdtar list command: %s', ' '.join(cmd)) if returnlist: try: proc = subprocess.check_output(cmd).strip().split('\n') except (OSError, subprocess.CalledProcessError) as err: raise UTAHISOException('Error listing files in ISO: {}' .format(err)) else: try: proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate() except OSError as err: raise UTAHISOException('Error listing files in ISO: {}' .format(err)) return proc
[docs] def getrealfile(self, filename): """Return a command to safely extract a file from an ISO. Based on unbsdtar-safelink from ubuntu ISO testing. :params filename: Path to the file in the ISO to be extracted :type filename: str :returns: Command that can be passed to a subprocess method :rtype: list """ cmd = ['bsdtar', '-t', '-v', '-f', self.image, filename] self.logger.debug('bsdtar list command: %s', ' '.join(cmd)) try: output = subprocess.check_output(cmd) except (OSError, subprocess.CalledProcessError) as err: raise UTAHISOException('Cannot list {} in {}: {}' .format(filename, self.image, err)) try: columns = output.splitlines()[0].split() realfile = columns[-1] except IndexError as err: raise UTAHISOException('Cannot parse bsdtar list output ' 'for {} in {}: {}' .format(filename, self.image, output)) cmd = ['bsdtar', '-x', '-f', self.image, '-O', realfile] self.logger.debug('bsdtar extract command: %s', ' '.join(cmd)) return cmd
[docs] def extract(self, filename, outdir='', outfile=None, **kw): """Extract file from an ISO. :param filename: Path to the file in the ISO to be extracted :type filename: str :param outdir: Destination directory :type outdir: str :param outfile: Destination filename :type outfile: str :returns: Path to the extracted file :rtype: str .. seealso:: :meth:`getrealfile` """ cmd = self.getrealfile(filename) try: stdout = subprocess.check_output(cmd, **kw) except (OSError, subprocess.CalledProcessError) as err: raise UTAHISOException('Cannot extract {} from {}: {}' .format(filename, self.image, err)) if outfile is None: outfile = os.path.join(outdir, filename) dirname = os.path.dirname(outfile) if not os.path.isdir(dirname): os.makedirs(dirname) with open(outfile, 'w') as fp: fp.write(stdout) return outfile
[docs] def dump(self, filename, **kw): """Extract file contents from an ISO. :param filename: Name of the file to be extracted :type filename: str :returns: Contents of the file :rtype: str .. seealso:: :meth:`getrealfile` """ cmd = self.getrealfile(filename) try: stdout = subprocess.check_output(cmd, **kw) except (OSError, subprocess.CalledProcessError) as err: raise UTAHISOException('Cannot extract {} from {}: {}' .format(filename, self.image, err)) return stdout
[docs] def getmd5(self, path=None): """Return the MD5 checksum of a file. Default file is this image. :param path: Path of file to checksum :type path: str :returns: MD5 checksum of file :rtype: str """ if path is None: path = self.image self.logger.debug('Getting md5 of %s', path) isohash = md5() with open(path) as myfile: for block in iter(lambda: myfile.read(128), ""): isohash.update(block) filemd5 = isohash.hexdigest() self.logger.debug('md5 of %s is %s', path, filemd5) return filemd5
[docs] def downloadiso(self, arch=None, installtype=None, series=None): """Download an ISO given series, type, and arch. :param arch: Architecture of image to download :type arch: str :param installtype: Install type of image to download :type installtype: str :param series: Series codename of image to download :type series: str :returns: Local path of downloaded image :rtype: str """ if arch is None: arch = config.arch if installtype is None: installtype = config.installtype if series is None: series = config.series filename = '{}.iso'.format('-'.join([series, installtype, arch])) self.logger.info('Attempting to retrieve %s', filename) with open(os.devnull, "w") as fnull: # If dlcommand (default dl-ubuntu-test-iso) is available, use it cmd = ['which', config.dlcommand] try: if subprocess.call(cmd, stdout=fnull, stderr=fnull) == 0: self.logger.debug('Using %s', config.dlcommand) if installtype == 'server': flavor = 'ubuntu-server' else: flavor = 'ubuntu' cmd = [config.dlcommand, '-q', '--flavor={}'.format(flavor), '--release={}'.format(series), '--variant={}'.format(installtype), '--arch={}'.format(arch), '--isoroot={}'.format(config.isodir)] self.logger.info('Downloading ISO') self.logger.debug(' '.join(cmd)) try: subprocess.check_call(cmd) except (OSError, subprocess.CalledProcessError) as err: raise UTAHISOException( "Failed to download ISO using '{}': {}" .format(' '.join(cmd), err)) path = os.path.join(config.isodir, flavor, filename) if os.path.isfile(path): return path except OSError as err: self.logger.warning('{} failed: {}'.format(cmd, err)) # If we haven't returned, dlcommand didn't give us an image # We'll need to use the utah image cache in config.isodir path = os.path.join(config.isodir, filename) # We want to verify our image matches the latest md5 from the server # To do this, we'll loop # First, we'll check the image. If it matches, we return # If not, we'll download it, and start the loop over for attempt in range(config.dlretries): self.logger.info('Download attempt %s', str(attempt)) # Set the path to our files and the name of the iso we want if installtype == 'mini': remotepath = ('http://archive.ubuntu.com/ubuntu/dists/' '{series}/main/installer-{arch}/current/' 'images/'.format(arch=arch, series=series)) isopattern = 'netboot/mini.iso' else: remotepath = ('http://releases.ubuntu.com/' '{series}/'.format(series=series)) isopattern = '{}-{}.iso'.format(installtype, arch) servermd5 = None md5path = '{}/MD5SUMS'.format(remotepath) md5list = _get_resource(urllib.urlopen, md5path) for line in md5list: if isopattern in line: servermd5, isofile = line.split() isofile = isofile.strip('*') break if servermd5 is None: raise UTAHISOException( 'Specified ISO: {} not found on mirrors.' .format(filename), external=True) self.logger.debug('Server md5 is %s', servermd5) if os.path.isfile(path) and servermd5 == self.getmd5(path): self.logger.info('Using ISO at %s', path) return path isopath = os.path.join(remotepath, isofile) self.percent = 0 self.logger.info('Attempting to download %s', isopath) temppath = _get_resource(urllib.urlretrieve, isopath, reporthook=self.dldisplay)[0] self.logger.debug('Copying %s to %s', temppath, path) shutil.copyfile(temppath, path) else: if os.path.isfile(path) and servermd5 == self.getmd5(path): return path else: raise UTAHISOException( 'Image failed to download after {} tries' .format(config.dlretries), external=True)
[docs] def kernelpath(self): """Return the path of the kernel inside the image. :returns: Path to the kernel pulled from bootloader config :rtype: str """ kernelpath = './install/vmlinuz' if self.installtype == 'mini': self.logger.debug('Getting kernel for mini image') kernelpath = 'linux' elif self.installtype == 'desktop': self.logger.debug('Getting kernel for desktop image') # Setup the old default kernel path in case none are found kernels = defaultdict(int, {'casper/vmlinuz': 0}) # Scan bootloader config files to find kernel images cfgfiles = ['isolinux/txt.cfg', 'isolinux/rqtxt.cfg', 'boot/grub/grub.cfg', 'boot/grub/loopback.cfg'] for cfgfile in cfgfiles: if cfgfile in self.listfiles(returnlist=True): stdout = self.dump(cfgfile) for line in stdout.splitlines(): fragments = line.split() if (len(fragments) >= 2 and fragments[0] in ('kernel', 'linux')): newkernel = fragments[1].strip('./') if 'mt86plus' in newkernel: self.logger.debug('Rejecting ' 'memtest kernel') else: self.logger.debug( 'Found kernel: %s', newkernel) kernels[newkernel] += 1 # Now we have a list of kernel paths and the number of times # each once occurs. We'll use the one that occurs most. kernelpath = max(kernels.iteritems(), key=lambda (_path, count): count)[0] return kernelpath
Read the Docs v: latest
Versions
latest
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.