Source code for utah.client.common

# Ubuntu Testing Automation Harness
# Copyright 2012 Canonical Ltd.

# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.

# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE.  See the GNU General Public License for more details.

# You should have received a copy of the GNU General Public License along
# with this program.  If not, see <http://www.gnu.org/licenses/>.

"""UTAH client common classes and functions."""


import datetime
import getpass
import os
import platform
import pwd
import re

import jsonschema
import yaml

from utah import logger
from utah.client.probe import probes

from utah.client.exceptions import (
    BadDir,
    YAMLParsingError,
    YAMLEmptyFile,
)

from utah.process import run


CONFIG = {
    'DEBUG': False,
    'TEST_DIR': '/var/lib/utah',
    'UTAH_CLIENT': '/usr/share/utah/client',
}

# Default TestSuite filenames
DEFAULT_TSLIST = 'tslist.run'
DEFAULT_TSCONTROL = 'ts_control'

UTAH_DIR = CONFIG['TEST_DIR']
UTAH_TESTDIR = os.path.join(UTAH_DIR, 'testsuites')
MASTER_RUNLIST = os.path.join(UTAH_DIR, "master.run")
UTAH_CLIENT = CONFIG['UTAH_CLIENT']
CLIENT_CONFIG = os.path.join(UTAH_DIR, 'config', 'client.json')
DEFAULT_STATE_FILE = os.path.join(UTAH_DIR, "state.yaml")

MEDIA_INFO = '/etc/utah/media-info'
INSTALL_TYPE = '/etc/utah/install-type'
PRODUCT_UUID = '/sys/class/dmi/id/product_uuid'


[docs]class ReturnCodes: """Provide consistent return codes for UTAH client. PASS: All test cases were executed and passed FAIL: All test cases were executed, but at least one of them failed ERROR: At least one error was detected that prevented a test case from being executed. Examples of situations that are considered an error are: - Fetch command failure - Setup command failure REBOOT: The system under test rebooted as required by a test case. To get final result the state file has to be checked. UNKNOWN: Unable to retrieve state file to check if client finished properly. INVALID_USER: The client was launched with a user other than root. EXCEPTION_ERROR: An exception error was encountered. CMD_PARSING_ERROR: Command line arguments parsing error. """ PASS = 0 FAIL = 1 ERROR = 2 REBOOT = 3 UNKNOWN = 4 INVALID_USER = 5 EXCEPTION_ERROR = 6 CMD_PARSING_ERROR = 7
CMD_TC_BUILD = 'testcase_build' CMD_TC_SETUP = 'testcase_setup' CMD_TC_TEST = 'testcase_test' CMD_TC_CLEANUP = 'testcase_cleanup' CMD_TC_REBOOT = 'testcase_reboot' CMD_TS_FETCH = 'testsuite_fetch' CMD_TS_BUILD = 'testsuite_build' CMD_TS_SETUP = 'testsuite_setup' CMD_TS_CLEANUP = 'testsuite_cleanup' DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
[docs]def do_nothing(_obj=None): """Placeholder for save_state_callbacks. .. seealso:: :class:`.TestSuite`, :class:`.TestCase` """
def _get_cmd(command, run_as): if run_as is not None: pwd.getpwnam(run_as) # this will raise an exception for no such user # add -n so sudo will not prompt for a password on the tty cmd_prefix = "sudo -n -u {} ".format(run_as) else: cmd_prefix = "" run_as = getpass.getuser() return '{}{}'.format(cmd_prefix, command), run_as
[docs]def run_cmd(command, cwd=None, timeout=0, cmd_type=CMD_TC_TEST, run_as=None): """Run command and return result using the client's format. :param command: Command as it would be written in a console :type command: str :param cwd: Current working directory path :type cwd: str :param timeout: Maximum amount of time in seconds to wait for the command to complete :type timeout: int :param cmd_type: Command type as displayed in the result object :type cmd_type: str :param run_as: Username to use to run the command :type run_as: int :returns: Command execution result :rtype: dict .. seealso:: :func:`make_result` """ if command is None: return try: cmd, run_as = _get_cmd(command, run_as) except KeyError: return make_result( command, 1, # sudo return value for configuration/permission problem stderr=('{!r} user not found in the password database' .format(run_as))) logger.log('Running command: {}'.format(cmd)) start = datetime.datetime.now() probe_data = {} with probes.run(probe_data): rc, timedout, out, err = run(cmd, cwd, timeout, True) time_delta = datetime.datetime.now() - start if rc: logger.log_error('Process exited with error code: {}'.format(rc)) kwargs = {'command': command, 'retcode': rc, 'stdout': _normalize_encoding(out), 'stderr': _normalize_encoding(err), 'start_time': start.strftime(DATE_FORMAT), 'time_delta': str(time_delta), 'cmd_type': cmd_type, 'user': run_as, 'probes': probe_data, } return make_result(**kwargs)
def _normalize_encoding(value, encoding='utf-8'): """Normalize string encoding. Make sure that byte strings are used only for ascii data and unicode strings for strings using any other encoding. :param value: Data string that should be normalized :type value: str :param encoding: Encoding used to decode the given string :type encoding: str :returns: Data string reencoded using in ascii if possible :rtype: str """ unicode_value = value.decode(encoding, 'replace') output = unicode_value.encode('ascii', 'replace') return output # TODO: it might make sense to have a result object that keeps track of # test pass, fail, error and serializes it's data.
[docs]def make_result(command, retcode, stdout='', stderr='', start_time='', time_delta='', cmd_type=CMD_TC_TEST, user='unknown', probes=None): """Make a result data structure. .. note:: Battery information will only be included if some value is passed as argument. :param command: Command that was executed :type command: str :param retcode: Return code :type retcode: int :param stdout: Standard output :type stdout: str :param stderr: Standard error :type stderr: str :param start_time: Time in which command execution started :type start_time: str :param time_delta: Amount of time that the command took to complete :type time_delta: str :param cmd_type: Command type to be displayed in the report :type cmd_type: str :param user: User name the executed the command :type user: str :param probes: probe data obtained while running the command :type probes: dict :returns: Result data :rtype: dict """ res = { 'command': command, 'returncode': retcode, 'stdout': stdout, 'stderr': stderr, 'start_time': start_time, 'time_delta': time_delta, 'cmd_type': cmd_type, 'user': user, 'probes': probes, } return res
[docs]def parse_yaml_file(filename): """Parse yaml file. :param filename: Path to the file that should be read :type filename: str :returns: Parsed data :rtype: object :raises YAMLParsingError: If there's a problem while parsing the file with the information about where in the file the problem was detected. .. seealso:: :func:`parse_control_file` """ try: with open(filename, 'r') as fp: data = yaml.load(fp) if data is None: raise YAMLEmptyFile('Empty YAML file: {}'.format(filename)) except yaml.YAMLError as exception: if hasattr(exception, 'problem_mark'): mark = exception.problem_mark error_message = ('YAML parsing error ' 'at line {line}, column {column}: {filename}' .format(line=mark.line + 1, column=mark.column + 1, filename=filename)) else: error_message = ('YAML parsing error: {filename}' .format(filename=filename)) raise YAMLParsingError(error_message) return data
if os.environ.get('READTHEDOCS', None) == 'True': # readthedocs has a sandboxed environment, so we just need a stub # here so we can satisfy it DefaultValidator = object elif jsonschema.__version__ == '1.3.0': # Validator that sets values to defaults as explained in: # https://github.com/Julian/jsonschema/issues/4 class DefaultValidator(jsonschema.Draft4Validator): """jsonschema validator that sets default values. During the validation, if some field is missing in the data, it will be set to the default value specified in the schema if defined. """ @classmethod def check_schema(self, schema): """Call the superclass directly since super doesn't work here.""" jsonschema.Draft4Validator.check_schema(schema) def validate_properties(self, properties, instance, schema): """Set missing properties to default value.""" if not self.is_type(instance, 'object'): return errors = (super(DefaultValidator, self) .validate_properties(properties, instance, schema)) for error in errors: yield error default_values = [ (k, v['default']) for k, v in properties.iteritems() if k not in instance and 'default' in v] instance.update(default_values) else: # extracted this logic from: # http://python-jsonschema.readthedocs.org/en/latest/faq/#why-doesn-t-my def extend_with_default(validator_class): validate_properties = validator_class.VALIDATORS["properties"] def set_defaults(validator, properties, instance, schema): for error in validate_properties( validator, properties, instance, schema, ): yield error for property, subschema in properties.iteritems(): if "default" in subschema: instance.setdefault(property, subschema["default"]) return jsonschema.validators.extend( validator_class, {"properties": set_defaults}, ) DefaultValidator = extend_with_default(jsonschema.Draft4Validator)
[docs]def parse_control_file(filename, schema): """Parse a control file and check against a jsonschema. :param filename: Path to the yaml file to be parsed :type filename: str :param schema: jsonschema to validate data against :type schema: dict :returns: Parsed data :rtype: object :raises jsonschema.ValidationError: If file contents doesn't follow the schema definitions .. seealso:: :func:`parse_yaml_file`, :class:`DefaultValidator` """ control_data = parse_yaml_file(filename) validator = DefaultValidator(schema) validator.validate(control_data) return control_data
[docs]def debug_print(data, force=False): """Print debugging information according to ``CONFIG['DEBUG']`` value. Data will be printed with the ``DEBUG:`` string prepended to it. :param data: Data to be printed :type data: object :param force: Print data regardless of the configuration :type force: bool :returns: Whether something was printed or not :rtype: bool """ try: debug = CONFIG['DEBUG'] except NameError: debug = False debug = debug or force if debug: print "DEBUG:", data # return True if we printed some output return debug
[docs]def get_media_info(): """Get the contents of the media-info file if available. :returns: The contents of the media-info file or ``'unknown'`` if not available. :rtype: str .. note:: This is only a best-effort approach. """ media_info = 'unknown' try: with open(MEDIA_INFO, 'r') as f: media_info = f.read().strip() except IOError: # If this fails, return the default pass return media_info
[docs]def get_install_type(): """Get the contents of the install-type file if available. :returns: The contents of the install-type file or ``'unknown'`` if not available. :rtype: str .. note:: This is only a best-effort approach. .. seealso:: :func:`get_media_info` """ install_type = 'unknown' try: with open(INSTALL_TYPE, 'r') as f: install_type = f.read().strip() except IOError: # If this fails, return the default pass return install_type
[docs]def get_product_uuid(): """Get the product_uuid of the machine under test. :returns: The contents of the product_uuid file or ``None`` if not available. .. note:: This is only a best-effort approach. """ product_uuid = None try: with open(PRODUCT_UUID) as f: product_uuid = f.read().strip() except IOError: # If this fails, return the default pass return product_uuid
[docs]def get_host_info(): """Get host info, useful for debugging. :returns: Host uname, media-info and product_uuid together :rtype: dict .. seealso:: :func:`get_media_info`, :func:`get_product_uuid` """ retval = {} retval['uname'] = platform.uname() retval['media-info'] = get_media_info() retval['product_uuid'] = get_product_uuid() return retval
[docs]def get_arch(): """Get the host's architecture. :returns: The human readable architecture or ``'unknown'`` :rtype: str """ arches = { 'x86_64': 'amd64', 'x86': 'i386', 'i686': 'i386', } arch = platform.machine() return arches.get(arch, arch)
[docs]def get_release(): """Get the host's release name. :returns: Release name (i.e. quantal, raring, etc.) :rtype: str """ return platform.linux_distribution()[2]
[docs]def get_build_number(): """Get build number. :returns: Build number as according to media-info or ``'?'`` if not found :rtype: str .. seealso:: :func:`get_host_info` """ host_info = get_host_info() pattern = re.compile('.*\(([0-9.]+)\)$') match = pattern.match(host_info['media-info']) return match.group(1) if match else '?'
[docs]def mkdir(path): """Create a directory only if needed with proper error handling.""" if not os.path.exists(path): try: os.mkdir(path) except OSError as err: raise BadDir('Unable to create directory: {}'.format(err))
[docs]def chdir(path): """Change to directory with proper error handling.""" try: os.chdir(path) except OSError as err: raise BadDir('Unable to change to directory: {}'.format(err))
Read the Docs v: latest
Versions
latest
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.