Source code for PyExpLabSys.combos

"""This module contains socket, database saver and logger heuristic combinations"""

from __future__ import unicode_literals, absolute_import, print_function

from time import time

from .common.sockets import LiveSocket
from .common.database_saver import ContinuousDataSaver


[docs]class LiveContinuousLogger(object): """A combination of a :class:`.LiveSocket` and a :class:`.ContinuousDataSaver` that also does logging heuristics FIXME explain the term log """
[docs] def __init__( self, name, codenames, continuous_data_table, username, password, time_criteria=None, absolute_criteria=None, relative_criteria=None, live_server_kwargs=None, ): """Initialize local data Args: name (str): The name to be used in the sockets codenames (sequence): A sequence of codenames. These codenames are the measurements codenames for the :class:`.ContinuousDataSaver` and they will also be used as the codenames for the :class:`.LiveSocket`. continuous_data_table (str): The continuous data table to log data to username (str): The MySQL username password (str): The password for ``username`` in the database time_criteria (float or dict): (Optional) The time after which a point will always be saved in the database. Either a single value, which will be used for all codenames or a dict of codenames to values. If supplying a dict, each codename must be present as a key. absolute_criteria (float or dict): (Optional) The absolute value difference criteria. Either a single value or one for each codename, see `time_criteria` for details. relative_criteria (float or dict): (Optional) The relative value difference criteria. Either a single value or one for each codename, see `time_criteria` for details. lives_server_kwargs (dict): (Optinal) A dict of keyword arguments for the :class:`.LiveSocket`. See the doc string for :meth:`.LiveSocket.__init__` for additional details. """ # Initialize local variables self.name = name self.codenames = set(codenames) # It should be a set anyway # Update and check criteria: self.time_criteria = self._init_criteria(time_criteria, 'time') self.absolute_criteria = self._init_criteria(absolute_criteria, 'absolute') self.relative_criteria = self._init_criteria(relative_criteria, 'relative') # Init last times and values (non-existing key will trigger save) self.last_times = {} self.last_values = {} # Init live socket and continuous data saver if live_server_kwargs is None: live_server_kwargs = {} self.live_socket = LiveSocket(name, codenames, **live_server_kwargs) self.continuous_data_saver = ContinuousDataSaver( continuous_data_table, username, password, measurement_codenames=codenames, )
def _init_criteria(self, criteria, criteria_name): """Init logging criteria to make sure it is either a dictionay with one value for each codename or None Args: criteria (float or dict): Same as in :meth:`.__init__`. criteria_name (str): Criteria name ('time', 'absolute' or 'relative') Returns: dict or None: The dict of criteria or None if criteria in was none """ if criteria is None: return None # If not a dictionary, assumed to be int or float if not isinstance(criteria, dict): if not criteria > 0: error_message = ( 'The criterium for {} is expected to be positive. {} is not.' ) raise ValueError(error_message.format(criteria_name, criteria)) # Return dict with the passed in single value for all keys return {codename: criteria for codename in self.codenames} else: # Check that criteria contains exactly one entry for each codename. Note # self.codenames is a set if not set(criteria.keys()) == self.codenames: error_message = ( 'There is not a {} criteria in the criteria dict {} for every ' 'codename: {}'.format(criteria_name, criteria, self.codenames) ) raise ValueError(error_message) return criteria
[docs] def start(self): """Start the underlying :class:`.LiveSocket` and :class:`.ContinuousDataSaver`""" self.live_socket.start() self.continuous_data_saver.start()
[docs] def stop(self): """Stop the underlying :class:`.LiveSocket` and :class:`.ContinuousDataSaver`""" self.live_socket.stop() self.continuous_data_saver.stop()
[docs] def log_point_now(self, codename, value): """Log a point now As the time will be attached the time now. For an explanation of what is meant by the term "log", see the :class:`class docstring <.LiveContinuousLogger>`. Args: codename (str): The codename to log this point for value (float): The value to store with the time now (:py:func:`time.time`) """ self.log_point(codename, (time(), value))
[docs] def log_point(self, codename, point): """Log a point For an explanation of what is meant by the term "log", see the :class:`class docstring <.LiveContinuousLogger>`. Args: codename (str): The codename to log this point for point (sequence): A (unix_time, value) two item sequence (e.g. list or tuple), that represents a point """ self.live_socket.set_point(codename, point) if self._test_logging_criteria(codename, point): self.continuous_data_saver.save_point(codename, point) self._update_last_information(codename, point)
[docs] def log_batch_now(self, values): """Log a batch of values now For an explanation of what is meant by the term "log", see the :class:`class docstring <.LiveContinuousLogger>`. Args: values (dict): Dict of codenames to values. The values will be stored with the time now (:py:func:`time.time`) """ now = time() self.log_batch({codename: (now, value) for codename, value in values.items()})
[docs] def log_batch(self, points): """Log a batch of points For an explanation of what is meant by the term "log", see the :class:`class docstring <.LiveContinuousLogger>`. Args: points (dict): Dict of codenames to points """ self.live_socket.set_batch(points) for codename, point in points.items(): if self._test_logging_criteria(codename, point): self.continuous_data_saver.save_point(codename, point) self._update_last_information(codename, point)
def _test_logging_criteria(self, codename, point): """Test the logging criteria for a point Args: codename (str): The codename to log this point for point (sequence): A (unix_time, value) two item sequence (e.g. list or tuple), that represents a point """ current_time, current_value = point # The last values are initialized empty, so always trigger a save on first point try: last_time = self.last_times[codename] last_value = self.last_values[codename] except KeyError: return True # Test time criteria if present if self.time_criteria: if current_time - last_time > self.time_criteria[codename]: return True # Test absolute criteria if present if self.absolute_criteria: if abs(current_value - last_value) > self.absolute_criteria[codename]: return True # Test relative criteria if present if self.relative_criteria: try: relative_difference = abs(current_value - last_value) / abs( current_value ) except ZeroDivisionError: return True if relative_difference > self.absolute_criteria[codename]: return True return False def _update_last_information(self, codename, point): """Update the information about last logged point""" current_time, current_value = point self.last_times[codename] = current_time self.last_values[codename] = current_value