# Ubuntu Testing Automation Harness
# Copyright 2012 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""Provide routines used to process command line arguments and run tests."""
from argparse import ArgumentTypeError
import logging
from logging.handlers import TimedRotatingFileHandler
import os
import shutil
import signal
import sys
import time
import urllib
import jsonschema
from utah import template
from utah.client.common import (
parse_yaml_file,
ReturnCodes as ClientReturnCodes,
)
from utah.config import config
from utah.client.exceptions import (
YAMLEmptyFile,
YAMLParsingError,
)
from utah.client.runner import Runner
from utah.exceptions import UTAHException
from utah.experimental import feature
from utah.provisioning.data import ProvisionData
from utah.provisioning.debs import get_client_debs
from utah.retry import retry
from utah.timeout import timeout
from utah.url import url_argument
# Return codes for the server
[docs]class ReturnCodes:
r"""Provide standard return codes for run\_ scripts."""
SUCCESS = 0 # No problems found
UTAH_EXCEPTION_ERROR = 1 # UTAH exception caught
TIMEOUT_ERROR = 2 # Unable to complete before config.jobtimeout seconds
GROUP_ERROR = 3 # User isn't in UTAH group
UNHANDLED_ERROR = 4 # Non UTAH exception caught
SIGTERM_RECEIVED = 5 # SIGTERM signal received
CMD_PARSING_ERROR = 7 # Command line arguments parsing error
@staticmethod
[docs] def client_error(returncode):
"""Add offset to client error to avoid overlapping.
This is useful to be able to know when a failure happened in the server
or in the client at the shell level.
:param returncode: The code returned by the client
:type returncode: int
:returns: The code to be returned by the server
:rtype: int
"""
offset = 100
return returncode + offset
[docs]def master_runlist_argument(url):
"""Get master runlist and validate it against its schema.
This function calls :func:`utah.url.url_argument` to download the runlist
file if needed and then uses the schema defined in
:class:`utah.client.runner.Runner` to validate it.
This functions is expected to be passed as the `type` argument of an
`argparse.ArgumentParser` object to make sure the master runlist is
reachable and valid before provisioning the system under test.
:param url: URL as passed to parser object.
:type url: `basestring`
:returns: URL or path to the local file
:rtype: `basestring`
.. seealso::
:func:`utah.url.url_argument`,
:class:`utah.client.runner.Runner`.
:Example:
>>> from utah.run import master_runlist_argument
>>> import argparse
>>> parser = argparse.ArgumentParser()
>>> parser.add_argument('url',
... type=master_runlist_argument) # doctest: +ELLIPSIS
_StoreAction(... dest='url', ...)
>>> runlist = 'lp:utah/utah/client/examples/pass.run'
>>> parser.parse_args([runlist]) # doctest: +ELLIPSIS
Namespace(url='/tmp/utah_...')
"""
filename = url_argument(url)
try:
data = parse_yaml_file(filename)
except YAMLEmptyFile:
raise ArgumentTypeError(
'Master runlist seems to be empty: {!r}'
.format(filename))
except YAMLParsingError as exception:
raise ArgumentTypeError(
'Master runlist failed to parse as a yaml file: {!r}\n'
'Detailed information: {}'
.format(filename, exception))
try:
jsonschema.validate(data, Runner.MASTER_RUNLIST_SCHEMA)
except jsonschema.ValidationError as exception:
raise ArgumentTypeError(
'Master runlist failed to validate: {!r}\n'
'Detailed information: {}'
.format(filename, exception))
return filename
def _get_runlist(url):
# TODO: Make something that this and utah.iso._get_resource can both use
try:
return urllib.urlretrieve(url)[0]
except urllib.ContentTooShortError as e:
err = ('Error when downloading {} (probably interrupted): {}'
.format(url, e))
except IOError as e:
err = 'IOError when downloading {}: {}'.format(url, e)
raise UTAHException(err, external=True)
def _get_client_cmd(tgt_runlist, extraopts):
"""Generate the command required to execute UTAH on the target.
:returns: command and path to the report on the target
:rtype: tuple
"""
report_path = '/var/lib/utah/utah.out'
options = '-r {} -o {}'.format(tgt_runlist, report_path)
cmd = 'utah {} {}'.format(extraopts, options)
return (cmd, report_path)
def _install_and_run(machine, runlist, extraopts):
"""Connect to a provisioned machine to execute the runlist.
This will attempt installing the utah-client and executing the runlist
using a target that's already been provisioned.
:returns: exit_status and path to the report on the target
:rtype: tuple
"""
machine.installclient()
try:
runlist_local_path = _get_runlist(runlist)
target_dir = '/tmp'
machine.uploadfiles([runlist_local_path], target_dir)
runlist_file = os.path.basename(runlist_local_path)
runlist_remote_path = os.path.join(target_dir, runlist_file)
(cmd, tgt_report) = _get_client_cmd(runlist_remote_path, extraopts)
exitstatus, _stdout, _stderr = machine.run(cmd, root=True)
# Check state file to make sure client finished
# when some reboot commands where executed during the test run
if exitstatus == ClientReturnCodes.REBOOT:
logging.info('System under test is rebooting')
exitstatus = timeout(config.runtimeout, retry,
is_utah_done, machine, 180,
logmethod=logging.info)
# Make sure that it's possible to know when the server failed
# and when the client failed just checking the return code
if exitstatus != 0:
exitstatus = ReturnCodes.client_error(exitstatus)
except UTAHException as error:
logging.error('Failed to run test: %s', str(error))
exitstatus = ReturnCodes.UTAH_EXCEPTION_ERROR
return exitstatus, tgt_report
def _provision_and_run(machine, runlist_url, extraopts):
tgt_tmp = '/var/lib/utah/tmp'
pd = ProvisionData()
debs = get_client_debs()
pd.add_files(debs, tgt_tmp)
runlist = _get_runlist(runlist_url)
pd.add_files(runlist, tgt_tmp)
# convert the path to what it will be in the target
debs = [os.path.join(tgt_tmp, os.path.basename(x)) for x in debs]
# build commands to install client debs
x = 2
for deb in debs:
cmd = template.as_buff('install-deb-command.jinja2', deb=deb)
pd.add_autostart_cmd('0{}_install-client'.format(x), cmd)
x += 1
# build command to run utah
tgt_runlist = os.path.join(tgt_tmp, os.path.basename(runlist))
(cmd, tgt_report) = _get_client_cmd(tgt_runlist, extraopts)
pd.add_autostart_cmd('0{}_run-utah'.format(x), cmd)
machine.provisioncheck(pd)
try:
machine.activecheck()
except UTAHException:
# Running a reboot test under a VM in TMPFS, the active check seems to
# somehow run too fast. Retrying seems to make it always work.
logging.warning('activecheck failed, retrying once')
machine.activecheck()
_provision_and_run.rc = 0
def _complete_cb(match):
if not match:
raise UTAHException('Failed to get return code from UTAH runlist')
_provision_and_run.rc = int(match.group(1))
machine.rsyslog.wait_for_utah_run(_complete_cb)
return _provision_and_run.rc, tgt_report
def _run(machine, runlist_url, extraopts):
"""Run a single runlist.
:returns: exit_status and path to report on target
:rtype: tuple
"""
if feature.PROVISIONED_AUTORUN and not machine.provisioned:
return _provision_and_run(machine, runlist_url, extraopts)
return _install_and_run(machine, runlist_url, extraopts)
def _write(machine, args, locallogs, report_remote_path):
"""Write runlist report.
Report will be written using appropriate filename based on the runlist.
:param machine: Path to the runlist file
:param args: options from argparse
:param locallogs: array of files resulting from this job
:param report_remote_path:
Path to the report file in the machine being tested
"""
timestamp = time.strftime('%Y-%m-%d_%H-%m-%S', time.gmtime())
listname = os.path.basename(args.runlist)
log_filename = ('{machine}_{runlist}_{timestamp}.{suffix}'
.format(machine=machine.name,
runlist=listname,
timestamp=timestamp,
suffix=args.report_type))
report_local_path = os.path.join(config.logpath, log_filename)
report_local_path = os.path.normpath(report_local_path)
try:
machine.downloadfiles(report_remote_path, report_local_path)
except UTAHException:
logging.error('Test log not retrieved')
else:
locallogs.append(report_local_path)
logging.info('Test log copied to %s', report_local_path)
if args.dumplogs:
try:
with open(report_local_path, 'r') as f:
print(f.read())
except IOError as err:
logging.warning('Test log could not be dumped: {}'
.format(err))
def _copy_preseed(machine, args, locallogs):
"""Copy preseed to locallogs.
If we are provisioning a system, we can optionally copy its preseed along
with other locallogs from the job.
"""
if args.outputpreseed or config.outputpreseed:
if args.outputpreseed:
logging.debug('Capturing preseed due to command line option')
p = os.path.join(config.logpath, '{}-preseed.cfg'.format(machine.name))
if hasattr(machine, 'finalpreseed'):
try:
if os.path.isfile(p):
os.chmod(p, 0664)
shutil.copyfile(machine.finalpreseed, p)
except (IOError, OSError, shutil.Error) as err:
logging.warning('Failed to copy preseed file: %s', err)
else:
locallogs.append(p)
else:
logging.debug('Machine has no preseed to capture')
[docs]def run_tests(args, machine):
"""Run a runlist and retrieve results.
:returns: exitstatus and list of logs returned
:rtype: tuple(int, list(str))
"""
install_sigterm_handler()
args.report_type = 'yaml'
if args.json:
args.report_type = 'json'
# Write the machine name to standard out for log gathering
print('Running on machine: {}'.format(machine.name))
extraopts = '-f {}'.format(args.report_type)
locallogs = []
# Server will return success code only if the execution
# of every runlist was successful
exitstatus, remote_path = _run(machine, args.runlist, extraopts)
_write(machine, args, locallogs, remote_path)
if args.files is not None:
try:
locallogs += getfiles(args, machine)
except UTAHException as err:
logging.warning('Failed to download files: %s', str(err))
_copy_preseed(machine, args, locallogs)
return exitstatus, locallogs
[docs]def getfiles(args, machine):
"""Download files from machine.
:returns: list of download files
:rtype: list(str)
"""
outdir = (args.outdir or os.path.join(config.logpath, machine.name))
if not os.path.isdir(outdir):
try:
os.makedirs(outdir)
except OSError as err:
raise UTAHException('Failed to create output directory {}: {}'
.format(outdir, err), external=True)
machine.downloadfilesrecursive(args.files, outdir)
localfiles = []
for mydir, dirs, files in os.walk(outdir):
myfiles = [os.path.join(mydir, x) for x in files]
for myfile in myfiles:
localfiles.append(myfile)
return localfiles
[docs]def install_sigterm_handler():
"""Capture SIGTERM signal to avoid abrupt process termination.
This function registers a handler that raises an exception when SIGTERM is
received to ensure that machine object cleanup methods are called.
Otherwise, cleanup methods aren't called when the process is abruptly
terminated.
"""
def handler(signum, frame):
"""Raise exception when signal is received."""
logging.warning('Signal received: %d', signum)
sys.exit(ReturnCodes.SIGTERM_RECEIVED)
signal.signal(signal.SIGTERM, handler)
[docs]def is_utah_done(machine, checktimeout):
"""Use utah-done.py to check if the run is finished.
:param machine: Machine object to check
:type machine: Machine
:param checktimeout: Time to wait before raising exception if check fails
:type checktimeout: int
:returns: The exit status of utah-done.py if it's not `UNKNOWN`.
:raises UTAHException:
When the utah client process isn't finished yet.
"""
logging.info('Checking if UTAH client is finished')
machine.activecheck()
try:
exitstatus = machine.run('/usr/share/utah/client/utah-done.py',
quiet=True)[0]
if exitstatus == ClientReturnCodes.UNKNOWN:
logging.info('UTAH client is *not* finished yet')
raise UTAHException('UTAH is not finished', retry=True)
else:
logging.info('UTAH client is finished')
return exitstatus
except Exception as err:
raise UTAHException(str(err), retry=True)