Source code for nanaimo.builtin.nanaimo_serial_watch

#
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# This software is distributed under the terms of the MIT License.
#
#                                       (@@@@%%%%%%%%%&@@&.
#                              /%&&%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%&@@(
#                              *@&%%%%%%%%%&&%%%%%%%%%%%%%%%%%%&&&%%%%%%%
#                               @   @@@(@@@@%%%%%%%%%%%%%%%%&@@&* @@@   .
#                               ,   .        .  .@@@&                   /
#                                .       .                              *
#                               @@              .                       @
#                              @&&&&&&@. .    .                     *@%&@
#                              &&&&&&&&&&&&&&&&@@        *@@############@
#                     *&/ @@ #&&&&&&&&&&&&&&&&&&&&@  ###################*
#                              @&&&&&&&&&&&&&&&&&&##################@
#                                 %@&&&&&&&&&&&&&&################@
#                                        @&&&&&&&&&&%#######&@%
#  nanaimo                                   (@&&&&####@@*
#
import asyncio
import re
import textwrap
import typing

import nanaimo
import nanaimo.connections
import nanaimo.connections.uart
import nanaimo.fixtures
import nanaimo.pytest.plugin


[docs]class Fixture(nanaimo.fixtures.Fixture): """ Gathers a log over a serial connection until a given pattern is matched. """ fixture_name = 'nanaimo_serial_watch' argument_prefix = 'lw' def __init__(self, manager: nanaimo.fixtures.FixtureManager, args: nanaimo.Namespace, **kwargs: typing.Any) -> None: super().__init__(manager, args, **kwargs) if 'uart_factory' in kwargs: self._uart_factory = typing.cast(typing.Callable[[str, int], nanaimo.connections.uart.ConcurrentUart], kwargs['uart_factory']) else: self._uart_factory = nanaimo.connections.uart.ConcurrentUart.new_default
[docs] @classmethod def on_visit_test_arguments(cls, arguments: nanaimo.Arguments) -> None: nanaimo.connections.uart.ConcurrentUart.on_visit_test_arguments(arguments) arguments.add_argument('--pattern', default='.*', enable_default_from_environ=True, help=textwrap.dedent(''' A Python regular expression that will be matched against each line of serial input received. This fixture gathers input until either this pattern is matched or the gather times out.''').lstrip()) arguments.add_argument('--disruption', default='\r\n', help=textwrap.dedent(''' Characters to send periodically on the serial link in an attempt to wake up connected hosts and get a default response.''').lstrip()) arguments.add_argument('--disturb-rate', type=float, help=textwrap.dedent(''' The rate at which the watcher will input the disruption characters to try to get the device on the other end of the serial pipe to respond (in fractional seconds)''').lstrip()) arguments.add_argument('--update-period', type=float, help=textwrap.dedent(''' The rate at which a message is logged when waiting for a log time. This provides feedback that the fixture is still active but that the pattern has not yet been matched. Omit to squelch the update message. (This argument is in fractional seconds). ''').lstrip())
[docs] async def on_gather(self, args: nanaimo.Namespace) -> nanaimo.Artifacts: """ Watch the logs until the pattern matches. +--------------+---------------------------+-----------------------------------------------+ | **Returned Artifacts** | +--------------+---------------------------+-----------------------------------------------+ | key | type | Notes | +==============+===========================+===============================================+ | match | re.MatchObject | The match if result_code is 0 | +--------------+---------------------------+-----------------------------------------------+ | matched_line | str | The full line matched if result_code is 0 | +--------------+---------------------------+-----------------------------------------------+ """ lw_port = self.get_arg_covariant_or_fail(args, 'port') lw_port_speed = self.get_arg_covariant_or_fail(args, 'port_speed') with self._uart_factory(lw_port, lw_port_speed) as monitor: match_future, _ = await self.gate_tasks(self._matcher(args, monitor), None, self._agitator(args, monitor), self._updater(args)) artifacts = typing.cast(nanaimo.Artifacts, match_future.result()) self._logger.info('Found match : %s', artifacts.matched_line) return artifacts
# +-----------------------------------------------------------------------+ # | PRIVATE # +-----------------------------------------------------------------------+ async def _updater(self, args: nanaimo.Namespace) -> nanaimo.Artifacts: lw_update_period = self.get_arg_covariant(args, 'update_period') if lw_update_period is not None: while True: await asyncio.sleep(lw_update_period) self._logger.info('still waiting...') return nanaimo.Artifacts() async def _matcher(self, args: nanaimo.Namespace, monitor: nanaimo.connections.uart.ConcurrentUart) -> nanaimo.Artifacts: artifacts = nanaimo.Artifacts() lw_pattern = self.get_arg_covariant_or_fail(args, 'pattern') pattern = re.compile(lw_pattern) self._logger.info('Starting to look for pattern : %s', str(lw_pattern)) while True: result = await monitor.get_line() result_stripped = result.rstrip() if len(result_stripped) > 0: self._logger.debug(result_stripped) match = pattern.search(result) if match: setattr(artifacts, 'match', match) setattr(artifacts, 'matched_line', result) break return artifacts async def _agitator(self, args: nanaimo.Namespace, monitor: nanaimo.connections.uart.ConcurrentUart) -> nanaimo.Artifacts: lw_disturb_rate = self.get_arg_covariant(args, 'disturb_rate') lw_disruption = self.get_arg_covariant_or_fail(args, 'disruption') if lw_disturb_rate is not None: while True: await asyncio.sleep(lw_disturb_rate) self._logger.debug('About to disturb the uart...') await monitor.put_line(lw_disruption) return nanaimo.Artifacts()
def pytest_nanaimo_fixture_type() -> typing.Type['Fixture']: return Fixture