# Ubuntu Testing Automation Harness
# Copyright 2013 Canonical Ltd.
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License version 3, as published
# by the Free Software Foundation.
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranties of
# MERCHANTABILITY, SATISFACTORY QUALITY, or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
"""rsyslog processing and monitoring."""
import logging
import os
import re
import select
import socket
import sys
import time
from utah.config import config
from utah.exceptions import UTAHException
from utah.provisioning.exceptions import UTAHProvisioningException
from utah.timeout import UTAHTimeout
[docs]class RSyslog(object):
"""Listen to rsyslog messages and process them.
:param hostname: Host where the syslog is coming from.
:type hostname: str
:param logpath: Base directory where log files are created.
:type logpath: str
:param usefile:
allows class to ``tail`` a file rather than act as an rsyslogd server
:type usefile: str | None
"""
def __init__(self, hostname, logpath, usefile=None):
self._host = hostname
self._logpath = logpath
self.logger = logging.getLogger('rsyslog')
if usefile:
if not os.path.exists(usefile):
self.logger.debug('creating syslog file')
with open(usefile, 'w') as f:
os.fchmod(f.fileno(), 0660)
self._file = open(usefile, 'r')
self._where = 0
self._read_line = self._read_file
self._port = 0
else:
# start listening on an ephemeral UDP port
self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._sock.bind(('', 0))
self._port = self._sock.getsockname()[1]
self._read_line = self._read_udp
self.logger.info('rsylogd port(%d) opened', self._port)
@property
[docs] def port(self):
"""Return UDP port number used to listen for syslog messages."""
return self._port
[docs] def wait_for_install(self, booted_callback=None):
"""Monitor rsyslog messages during the installation.
Works through each step in the steps array to find messages in the
syslog indicating what part of the install we are in.
:param booted_callback:
function to be called once the system has been booted
:type booted_callback: callable | None
"""
steps = [
{
'message': 'system started',
'pattern': ['.*'],
'booted': True,
},
{
'message': 'install started',
'pattern': [
'.*ubiquity\\[\\d+\\]: Ubiquity \\d',
# needed for non-ubiquity installs
'.*anna-install',
],
},
{
'message': 'install complete',
'pattern': [
'.*ubiquity reboot',
'.*log-output -t ubiquity umount /target$',
# for server bootspeed
'.*finish-install.d/94save-logs',
'.*finish-install: umount',
# a catch-all
'.*rsyslogd:.*exiting on signal 15.',
],
'fail_pattern': [
# ubiquity/failure_command
'.*utah: Installation failure detected',
# ubiquity/success_command
# and d-i/late_command failures
'.*utah: Late command failure detected',
# d-i installation failure
'.*exited with status [^0]',
],
},
]
logfile = '{}/{}-install.log'.format(self._logpath, self._host)
callbacks = {'booted': booted_callback}
self._wait_for_steps(steps, config.installtimeout, logfile, callbacks)
[docs] def wait_for_booted(self, uuid):
"""Monitor rsyslog during boot up.
Works the same as the :meth:`wait_for_install` method but takes in
steps that determine a system has booted after the install has
completed.
:param uuid: The UUID generated by provisioning code.
:type uuid: string
"""
steps = [
{
'message': 'system booted',
'pattern': '.*Linux version \\d',
},
{
'message': 'disk mounted',
'pattern': '.*mounted filesystem with ordered data',
},
{
'message': 'UTAH uuid received',
# binds this to the callback for wait_for_booted
'uuid': True,
'pattern': '.*Machine UUID:\s+(.*)',
},
]
logfile = '{}/{}-boot.log'.format(self._logpath, self._host)
def _cb(match):
if match:
_cb.uuid = match.group(1).strip()
_cb.uuid = None
callbacks = {'uuid': _cb}
# TODO: Resolve the issues around this and get it back to normal
# i.e. LP#1100386
try:
self._wait_for_steps(steps, config.boottimeout, logfile, callbacks)
if not _cb.uuid:
self.logger.error('UUID not sent from target')
elif uuid and uuid != _cb.uuid:
msg = 'Incorrect UUID: {} != {}'.format(uuid, _cb.uuid)
raise UTAHProvisioningException(msg)
except UTAHTimeout:
self.logger.warning('Timed out waiting for boot, but continuing')
[docs] def wait_for_utah_run(self, completed_cb=None):
"""Monitor rsyslog while waiting for utah-client to finish autorun.
:param completed_cb: A callback function that will include the utah
exit code.
:type completed_cb: function
"""
steps = [
{
'message': 'UTAH setup',
'pattern': ('.*utah: completed '
'/etc/utah/autorun/\d+_utah-setup'),
},
{
'message': 'UTAH client install',
'pattern': ('.*utah: completed '
'/etc/utah/autorun/\d+_install-client'),
},
{
'message': 'UTAH runlist',
'pattern': ('.*utah: completed '
'/etc/utah/autorun/\d+_run-utah rc=(\d+)'),
'completed': True,
},
]
logfile = '{}/{}-utah-run.log'.format(self._logpath, self._host)
callbacks = {'completed': completed_cb}
timeout = config.runtimeout
if not timeout:
# this means forever, but lets give something sensible to help
# prevent jobs from hanging forever
timeout = 172800 # 2 days
self._wait_for_steps(steps, timeout, logfile, callbacks)
def _wait_for_steps(self, steps, timeout, logfile, callbacks):
with open(logfile, 'w') as f:
x = 0
deadline = time.time() + timeout
while x < len(steps):
message = steps[x]['message']
pattern = steps[x]['pattern']
fail_pattern = steps[x].get('fail_pattern', [])
if not isinstance(pattern, list):
pattern = [pattern]
else:
pattern = pattern[:] # create a copy so we don't alter
if not isinstance(fail_pattern, list):
fail_pattern = [fail_pattern]
else:
fail_pattern = fail_pattern[:]
pattern.extend(fail_pattern)
future_pats = self._future_patterns(steps, x)
pattern.extend(future_pats)
self.logger.info(
'Waiting %ds for: %s', deadline - time.time(), message)
pattern, match = self._wait_for(f, pattern, message, deadline)
if match is None:
remaining_messages = [step['message']
for step in steps[x + 1:]]
log_message = (
'Timeout ({}) occurred for {} message.\n'
'Remaining messages: {}'
.format(timeout, message,
', '.join(remaining_messages)))
# Log error message to write the timestamp
# when the timeout happened
self.logger.error(log_message)
raise UTAHTimeout(log_message)
if pattern in fail_pattern:
raise UTAHException('Failure pattern found: {}'
.format(pattern))
if pattern in future_pats:
msg = 'Expected pattern missed, matched future pattern: %s'
self.logger.warn(msg, pattern)
x = self._fast_forward(steps, pattern, match, callbacks)
else:
self.logger.info('Matched pattern %r for %r message',
pattern, message)
self._do_callback(steps[x], callbacks, match)
x += 1
@staticmethod
def _do_callback(step, callbacks, match):
for name, func in callbacks.iteritems():
if func and step.get(name, False):
func(match)
@staticmethod
def _future_patterns(steps, index):
patterns = []
index += 1
for step in steps[index:]:
pattern = step['pattern']
if not isinstance(pattern, list):
patterns.append(pattern)
else:
patterns.extend(pattern)
return patterns
@staticmethod
def _fast_forward(steps, pattern, match, callbacks):
"""Figure out what should be the next step.
Look through each item in the steps array to find the index of the
given pattern. It will return that index so that the ``wait_for`` code
knows where to continue from. It will also alert the
``booted_callback`` function if that was one of the steps that was
missed.
"""
x = 0
while x < len(steps):
# make sure we don't skip a callback
RSyslog._do_callback(steps[x], callbacks, match)
patterns = steps[x]['pattern']
if not isinstance(patterns, list):
patterns = [patterns]
if pattern in patterns:
return x
x += 1
def _wait_for(self, writer, patterns, message, deadline):
pats = [re.compile(x) for x in patterns]
while(time.time() < deadline):
data = self._read_line()
if data:
if self.logger.level <= logging.DEBUG:
sys.stderr.write(data)
writer.write(data)
for pat in pats:
match = pat.match(data)
if match:
return (pat.pattern, match)
return (None, None)
def _read_udp(self):
data = None
(rd, _, _) = select.select([self._sock], [], [], 5)
if self._sock in rd:
data = self._sock.recv(1024)
if data[-1] != '\n':
data += '\n'
return data
def _read_file(self):
# try for up to 5 seconds
for _ in xrange(5):
line = self._file.readline()
if line:
self._where = self._file.tell()
return line
if os.fstat(self._file.fileno()).st_size < self._where:
self.logger.warn('file truncation detected')
self._file.seek(0)
self._where = 0
elif not line:
time.sleep(1)
return None