Source code for PyExpLabSys.drivers.powerwalker_ethernet

import datetime
import paramiko


[docs]class PowerWalkerEthernet(object): """ This driver uses the fact that the PowerWalker allows ssh-access, and thus gives access the actual binary files that reads the internal values. SNMP could also be used, but apparently most values miss a digit compared to the internal tools. """
[docs] def __init__(self, ip_address, read_old_events=True): if read_old_events: self.latest_event = datetime.datetime.min else: self.latest_event = datetime.datetime.now() self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.ssh.connect( ip_address, username='root', password='12345678', look_for_keys=False )
def _read_static_data(self): """ Reads combined static information about the unit, this is traditionally returned as two seperate calls, thus this is considered a private function. """ command = '/var/www/html/web_pages_Galleon/cgi-bin/baseInfo.cgi' stdin, stdout, stderr = self.ssh.exec_command(command, timeout=0.75) raw_lines = stdout.readlines() lines = [] for line in raw_lines: if line.strip(): lines.append(line.strip()) nominal_input = int(lines[4][0:3]) nominal_output = int(lines[4][4:]) values = { 'model': lines[2], 'version': lines[6], 'nominal_input_voltage': nominal_input, 'nominal_output_voltage': nominal_output, 'nominal_output_frequency': int(lines[10]) / 10.0, 'rated_battery_voltage': int(lines[12]) / 10.0, 'rated_va': int(lines[8]), 'rated_output_current': int(lines[11]) / 10.0, } return values def device_information(self): statics = self._read_static_data() information = { 'company': 'Power Walker', 'model': statics['model'], 'version': statics['version'], } return information def device_ratings(self): statics = self._read_static_data() ratings = { 'rated_voltage': statics['nominal_output_voltage'], 'rated_current': statics['rated_output_current'], 'battery_voltage': statics['rated_battery_voltage'], 'rated_frequency': statics['nominal_output_frequency'], } return ratings def device_status(self): command = '/var/www/html/web_pages_Galleon/cgi-bin/realInfo.cgi' stdin, stdout, stderr = self.ssh.exec_command(command, timeout=0.75) raw_lines = stdout.readlines() lines = [] for line in raw_lines: if line.strip(): lines.append(line.strip()) status = [] # TODO! if not lines[1] == 'Line Mode': status.append('Utility Fail') # Compatibility with serial interface values = { 'input_voltage': int(lines[12]) / 10.0, 'output_voltage': int(lines[15]) / 10.0, 'output_current': int(lines[35]) / 10.0, 'input_frequency': int(lines[11]) / 10.0, 'battery_voltage': int(lines[8]) / 10.0, 'temperature': int(lines[2]) / 10.0, 'status': status, 'battery_capacity': int(lines[9]), 'remaining_battery': lines[10], # minutes 'output_frequency': int(lines[14]) / 10.0, 'load_level': int(lines[17]), } # WARNING (appears in web front-end - find how to read) # FAULT (appears in web front-end - find how to read) return values def read_events(self, only_new=False): command = 'cd /var/log/eventlog; cat "$(ls -1rt | tail -n1)"' stdin, stdout, stderr = self.ssh.exec_command(command, timeout=0.75) raw_lines = stdout.readlines() if len(raw_lines) < 2: print('PowerWalker Ethernet: Too few lines in event file') return None events = [] for line in raw_lines[1:]: split_line = line.strip().split(',') timestamp = datetime.datetime.strptime(split_line[0], '%Y/%m/%d %H:%M:%S') if only_new and timestamp <= self.latest_event: continue event = { 'timestamp': timestamp, 'event': split_line[1], 'source': split_line[2], } events.append(event) self.latest_event = timestamp return events
if __name__ == '__main__': pw = PowerWalkerEthernet(ip_address='192.168.2.100') print(pw.device_status()) print() print() print(pw._read_static_data()) print() print() events = pw.read_events() for event in events: print(event)