diff options
author | Volker Hoffmann <volker@cheleb.net> | 2015-08-25 19:06:55 +0200 |
---|---|---|
committer | Volker Hoffmann <volker@cheleb.net> | 2015-08-25 19:06:55 +0200 |
commit | 8b1be28eab48457806a051ad7807c9ef0b9368d7 (patch) | |
tree | e023b58f8c8eb1e1c5326cbe7e3c47ede5edf404 |
initial commit
-rw-r--r-- | .gitignore | 3 | ||||
-rw-r--r-- | Common/__init__.py | 0 | ||||
-rw-r--r-- | Common/influx.py | 34 | ||||
-rw-r--r-- | Common/sensors.py | 37 | ||||
-rw-r--r-- | Common/slurm.py | 158 | ||||
-rw-r--r-- | External/README.md | 3 | ||||
-rw-r--r-- | External/__init__.py | 0 | ||||
-rw-r--r-- | External/lnetatmo.py | 305 | ||||
-rw-r--r-- | README.md | 19 | ||||
-rw-r--r-- | ticker.py | 66 | ||||
-rw-r--r-- | userpass_influx_example | 1 | ||||
-rw-r--r-- | userpass_netatmo_example | 2 |
12 files changed, 628 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5abbc39 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +userpass_influx* +userpass_netatmo diff --git a/Common/__init__.py b/Common/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/Common/__init__.py diff --git a/Common/influx.py b/Common/influx.py new file mode 100644 index 0000000..73d4371 --- /dev/null +++ b/Common/influx.py @@ -0,0 +1,34 @@ +""" +InfluxDB Interaction. +""" + +import requests + + +def post_data(data): + """ + Post Data to Server. + + @param: data - String of data to post. + """ + + # Load Userpass CSV File (InfluxDB) + # Format: server,port,db,user,pass + with open('userpass_influx', 'r') as f: + line = f.readline() + line = line.strip().split(',') + host = line[0] + port = line[1] + db = line[2] + user = line[3] + pswd = line[4] + + # Post + url = "http://%s:8086/write?db=%s&precision=s" % (host, db) + auth = requests.auth.HTTPBasicAuth("%s" % user, "%s" % pswd) + r = requests.post("%s" % url, auth=auth, data="%s" % data) + + # Debug + # print r.status_code + # print r.headers + # print r.content diff --git a/Common/sensors.py b/Common/sensors.py new file mode 100644 index 0000000..9f588ff --- /dev/null +++ b/Common/sensors.py @@ -0,0 +1,37 @@ +""" +Get Environmental Sensor Data. +""" + +import External.lnetatmo as lnetatmo + + +def get_netatmo_temperature(station='zBox Room'): + """ + Get Netatmo Temperature Data. + + @param: station - Name of sensor to poll [String] + @return: temperature - Temperature (Celsius) [Float] + @return: epoch - Unix Timestamp (Seconds) [Float] + """ + + # Load Credentials + with open('userpass_netatmo', 'r') as f: + line = f.readline() + line = line.strip().split(',') + clientId = line[0] + clientSecret = line[1] + username = line[2] + password = line[3] + + # Auth to Netatmo API + authorization = lnetatmo.ClientAuth(clientId = clientId, \ + clientSecret = clientSecret, \ + username = username, \ + password = password) + devList = lnetatmo.DeviceList(authorization) + + # Get Data + temperature = devList.lastData()[station]['Temperature'] + epoch = devList.lastData()[station]['When'] + + return temperature, epoch diff --git a/Common/slurm.py b/Common/slurm.py new file mode 100644 index 0000000..004cd63 --- /dev/null +++ b/Common/slurm.py @@ -0,0 +1,158 @@ +""" +Slurm Polling Functions. +""" + +import subprocess as sp + + +def get_number_of_nodes_down(): + """ + Get Down/Drained Nodes for Partitions. + + Slurm Command: + sinfo --format=%o --list-reasons --noheader --partition=zbox + + @return: number_of_nodes_down [Integer] + """ + + number_of_nodes_down = {} + partitions = [ 'zbox', 'serial', 'debug', 'tasna', 'vesta' ] + for partition in partitions: + cmd = [ 'sinfo', '--format=%o', \ + '--list-reasons', '--noheader', \ + "--partition=%s" % partition ] + p = sp.Popen(cmd, stdout=sp.PIPE) + p.wait() + data, _ = p.communicate() + if len(data) == 0: + count = 0 + else: + count = len(data.strip().split('\n')) + number_of_nodes_down["%s" % partition] = count + + # Aggregate CPU Counts + cpu_partitions = [ 'zbox', 'serial', 'debug' ] + running_sum = 0 + for partition in cpu_partitions: + running_sum += number_of_nodes_down["%s" % partition] + number_of_nodes_down['cpu'] = running_sum + + # Aggregate GPU Counts + gpu_partitions = [ 'tasna', 'vesta' ] + running_sum = 0 + for partition in gpu_partitions: + running_sum += number_of_nodes_down["%s" % partition] + number_of_nodes_down['gpu'] = running_sum + + # Fake Data + # number_of_nodes_down = \ + # {'cpu': 15, + # 'debug': 3, + # 'gpu': 1, + # 'serial': 2, + # 'tasna': 1, + # 'vesta': 0, + # 'zbox': 10} + + return number_of_nodes_down + + +def get_cpu_allocations(): + """ + Get CPU Allocations per Partition. Also return sum over all CPU partitions. + + Slurm Command: + squeue --format=%C --partition zbox --noheader + + @return: number_of_allocated_cpus [Dict {'partition': ncpus}] + """ + + # Old, Really Slow Command: + # sacct --format=partition,alloccpus --allocations \ + # --state=RUNNING --allusers --noheader --parsable2 + + partitions = [ 'zbox', 'serial', 'debug' ] + number_of_allocated_cpus = {} + for partition in partitions: + cmd = [ 'squeue', \ + '--format=%C', '--noheader', "--partition=%s" % partition ] + p = sp.Popen(cmd, stdout=sp.PIPE) + p.wait() + data, _ = p.communicate() + running_sum = 0 + if len(data) > 0: + for line in data.strip().split('\n'): + running_sum += int(line) + number_of_allocated_cpus["%s" % partition] = running_sum + number_of_allocated_cpus['cpu'] = sum(number_of_allocated_cpus.values()) + + # Fake Data + # number_of_allocated_cpus = { 'zbox': 768, 'serial': 12, 'debug': 198 } + + # Return + return number_of_allocated_cpus + + +def get_number_of_jobs_by_partition_and_state(): + """ + Get Number of Jobs by State and Partition. + + Slurm Command: + squeue --noheader --format=%T:%P --partition=zbox --state=pending + + @return number_of_jobs_by_partition - [Dict {'zbox': {'running': 1}} + """ + + partitions = [ 'zbox', 'serial', 'debug', 'tasna', 'vesta' ] + states = [ 'pending', 'running' ] + + # Get Raw Data + number_of_jobs_by_partition = {} + for partition in partitions: + number_of_jobs_by_state = {} + for state in states: + cmd = [ 'squeue', '--noheader', '--format=%T:%P', \ + "--partition=%s" % partition, \ + "--state=%s" % state ] + p = sp.Popen(cmd, stdout=sp.PIPE) + p.wait() + data, _ = p.communicate() + if len(data) == 0: + count = 0 + else: + count = len(data.strip().split('\n')) + number_of_jobs_by_state["%s" % state] = count + number_of_jobs_by_partition["%s" % partition] = \ + number_of_jobs_by_state + + # Aggregate CPU Counts + cpu_partitions = [ 'zbox', 'serial', 'debug' ] + number_of_jobs_by_state = {} + for state in states: + total = 0 + for partition in cpu_partitions: + total += number_of_jobs_by_partition[partition][state] + number_of_jobs_by_state[state] = total + number_of_jobs_by_partition['cpu'] = number_of_jobs_by_state + + # Aggregate GPU Counts + gpu_partitions = [ 'vesta', 'tasna' ] + number_of_jobs_by_state = {} + for state in states: + total = 0 + for partition in gpu_partitions: + total += number_of_jobs_by_partition[partition][state] + number_of_jobs_by_state[state] = total + number_of_jobs_by_partition['gpu'] = number_of_jobs_by_state + + # Fake Data + # number_of_jobs_by_partition = \ + # {'cpu': {'pending': 0, 'running': 465}, + # 'debug': {'pending': 0, 'running': 0}, + # 'gpu': {'pending': 0, 'running': 62}, + # 'serial': {'pending': 0, 'running': 457}, + # 'tasna': {'pending': 0, 'running': 31}, + # 'vesta': {'pending': 0, 'running': 31}, + # 'zbox': {'pending': 0, 'running': 8}} + + return number_of_jobs_by_partition diff --git a/External/README.md b/External/README.md new file mode 100644 index 0000000..2710327 --- /dev/null +++ b/External/README.md @@ -0,0 +1,3 @@ +# External Packages + +- lnetatmo.py -- Netatmo API (https://github.com/philippelt/netatmo-api-python) diff --git a/External/__init__.py b/External/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/External/__init__.py diff --git a/External/lnetatmo.py b/External/lnetatmo.py new file mode 100644 index 0000000..155669c --- /dev/null +++ b/External/lnetatmo.py @@ -0,0 +1,305 @@ +# Published Jan 2013 +# Revised Jan 2014 (to add new modules data) +# Author : Philippe Larduinat, philippelt@users.sourceforge.net +# Public domain source code + +# This API provides access to the Netatmo (Internet weather station) devices +# This package can be used with Python2 or Python3 applications and do not +# require anything else than standard libraries + +# PythonAPI Netatmo REST data access +# coding=utf-8 + +from sys import version_info +import json, time + +# HTTP libraries depends upon Python 2 or 3 +if version_info.major == 3 : + import urllib.parse, urllib.request +else: + from urllib import urlencode + import urllib2 + +######################## USER SPECIFIC INFORMATION ###################### + +# To be able to have a program accessing your netatmo data, you have to register your program as +# a Netatmo app in your Netatmo account. All you have to do is to give it a name (whatever) and you will be +# returned a client_id and secret that your app has to supply to access netatmo servers. + +_CLIENT_ID = "" # Your client ID from Netatmo app registration at http://dev.netatmo.com/dev/listapps +_CLIENT_SECRET = "" # Your client app secret ' ' +_USERNAME = "" # Your netatmo account username +_PASSWORD = "" # Your netatmo account password + +######################################################################### + + +# Common definitions + +_BASE_URL = "https://api.netatmo.net/" +_AUTH_REQ = _BASE_URL + "oauth2/token" +_GETUSER_REQ = _BASE_URL + "api/getuser" +_DEVICELIST_REQ = _BASE_URL + "api/devicelist" +_GETMEASURE_REQ = _BASE_URL + "api/getmeasure" + + +class ClientAuth: + "Request authentication and keep access token available through token method. Renew it automatically if necessary" + + def __init__(self, clientId=_CLIENT_ID, + clientSecret=_CLIENT_SECRET, + username=_USERNAME, + password=_PASSWORD): + + postParams = { + "grant_type" : "password", + "client_id" : clientId, + "client_secret" : clientSecret, + "username" : username, + "password" : password, + "scope" : "read_station" + } + resp = postRequest(_AUTH_REQ, postParams) + + self._clientId = clientId + self._clientSecret = clientSecret + self._accessToken = resp['access_token'] + self.refreshToken = resp['refresh_token'] + self._scope = resp['scope'] + self.expiration = int(resp['expire_in'] + time.time()) + + @property + def accessToken(self): + + if self.expiration < time.time(): # Token should be renewed + + postParams = { + "grant_type" : "refresh_token", + "refresh_token" : self.refreshToken, + "client_id" : self._clientId, + "client_secret" : self._clientSecret + } + resp = postRequest(_AUTH_REQ, postParams) + + self._accessToken = resp['access_token'] + self.refreshToken = resp['refresh_token'] + self.expiration = int(resp['expire_in'] + time.time()) + + return self._accessToken + +class User: + + def __init__(self, authData): + + postParams = { + "access_token" : authData.accessToken + } + resp = postRequest(_GETUSER_REQ, postParams) + self.rawData = resp['body'] + self.id = self.rawData['_id'] + self.devList = self.rawData['devices'] + self.ownerMail = self.rawData['mail'] + +class DeviceList: + + def __init__(self, authData): + + self.getAuthToken = authData.accessToken + postParams = { + "access_token" : self.getAuthToken, + "app_type" : "app_station" + } + resp = postRequest(_DEVICELIST_REQ, postParams) + self.rawData = resp['body'] + self.stations = { d['_id'] : d for d in self.rawData['devices'] } + self.modules = { m['_id'] : m for m in self.rawData['modules'] } + self.default_station = list(self.stations.values())[0]['station_name'] + + def modulesNamesList(self, station=None): + res = [m['module_name'] for m in self.modules.values()] + res.append(self.stationByName(station)['module_name']) + return res + + def stationByName(self, station=None): + if not station : station = self.default_station + for i,s in self.stations.items(): + if s['station_name'] == station : return self.stations[i] + return None + + def stationById(self, sid): + return None if sid not in self.stations else self.stations[sid] + + def moduleByName(self, module, station=None): + s = None + if station : + s = self.stationByName(station) + if not s : return None + for m in self.modules: + mod = self.modules[m] + if mod['module_name'] == module : + if not s or mod['main_device'] == s['_id'] : return mod + return None + + def moduleById(self, mid, sid=None): + s = self.stationById(sid) if sid else None + if mid in self.modules : + return self.modules[mid] if not s or self.modules[mid]['main_device'] == s['_id'] else None + + def lastData(self, station=None, exclude=0): + s = self.stationByName(station) + if not s : return None + lastD = dict() + # Define oldest acceptable sensor measure event + limit = (time.time() - exclude) if exclude else 0 + ds = s['dashboard_data'] + if ds['time_utc'] > limit : + lastD[s['module_name']] = ds.copy() + lastD[s['module_name']]['When'] = lastD[s['module_name']].pop("time_utc") + lastD[s['module_name']]['wifi_status'] = s['wifi_status'] + for mId in s["modules"]: + ds = self.modules[mId]['dashboard_data'] + if ds['time_utc'] > limit : + mod = self.modules[mId] + lastD[mod['module_name']] = ds.copy() + lastD[mod['module_name']]['When'] = lastD[mod['module_name']].pop("time_utc") + # For potential use, add battery and radio coverage information to module data if present + for i in ('battery_vp', 'rf_status') : + if i in mod : lastD[mod['module_name']][i] = mod[i] + return lastD + + def checkNotUpdated(self, station=None, delay=3600): + res = self.lastData(station) + ret = [] + for mn,v in res.items(): + if time.time()-v['When'] > delay : ret.append(mn) + return ret if ret else None + + def checkUpdated(self, station=None, delay=3600): + res = self.lastData(station) + ret = [] + for mn,v in res.items(): + if time.time()-v['When'] < delay : ret.append(mn) + return ret if ret else None + + def getMeasure(self, device_id, scale, mtype, module_id=None, date_begin=None, date_end=None, limit=None, optimize=False, real_time=False): + postParams = { "access_token" : self.getAuthToken } + postParams['device_id'] = device_id + if module_id : postParams['module_id'] = module_id + postParams['scale'] = scale + postParams['type'] = mtype + if date_begin : postParams['date_begin'] = date_begin + if date_end : postParams['date_end'] = date_end + if limit : postParams['limit'] = limit + postParams['optimize'] = "true" if optimize else "false" + postParams['real_time'] = "true" if real_time else "false" + return postRequest(_GETMEASURE_REQ, postParams) + + def MinMaxTH(self, station=None, module=None, frame="last24"): + if not station : station = self.default_station + s = self.stationByName(station) + if not s : + s = self.stationById(station) + if not s : return None + if frame == "last24": + end = time.time() + start = end - 24*3600 # 24 hours ago + elif frame == "day": + start, end = todayStamps() + if module and module != s['module_name']: + m = self.moduleByName(module, s['station_name']) + if not m : + m = self.moduleById(s['_id'], module) + if not m : return None + # retrieve module's data + resp = self.getMeasure( + device_id = s['_id'], + module_id = m['_id'], + scale = "max", + mtype = "Temperature,Humidity", + date_begin = start, + date_end = end) + else : # retrieve station's data + resp = self.getMeasure( + device_id = s['_id'], + scale = "max", + mtype = "Temperature,Humidity", + date_begin = start, + date_end = end) + if resp: + T = [v[0] for v in resp['body'].values()] + H = [v[1] for v in resp['body'].values()] + return min(T), max(T), min(H), max(H) + else: + return None + +# Utilities routines + +def postRequest(url, params): + if version_info.major == 3: + req = urllib.request.Request(url) + req.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8") + params = urllib.parse.urlencode(params).encode('utf-8') + resp = urllib.request.urlopen(req, params).readall().decode("utf-8") + else: + params = urlencode(params) + headers = {"Content-Type" : "application/x-www-form-urlencoded;charset=utf-8"} + req = urllib2.Request(url=url, data=params, headers=headers) + resp = urllib2.urlopen(req).read() + return json.loads(resp) + +def toTimeString(value): + return time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime(int(value))) + +def toEpoch(value): + return int(time.mktime(time.strptime(value,"%Y-%m-%d_%H:%M:%S"))) + +def todayStamps(): + today = time.strftime("%Y-%m-%d") + today = int(time.mktime(time.strptime(today,"%Y-%m-%d"))) + return today, today+3600*24 + +# Global shortcut + +def getStationMinMaxTH(station=None, module=None): + authorization = ClientAuth() + devList = DeviceList(authorization) + if not station : station = devList.default_station + if module : + mname = module + else : + mname = devList.stationByName(station)['module_name'] + lastD = devList.lastData(station) + if mname == "*": + result = dict() + for m in lastD.keys(): + if time.time()-lastD[m]['When'] > 3600 : continue + r = devList.MinMaxTH(module=m) + result[m] = (r[0], lastD[m]['Temperature'], r[1]) + else: + if time.time()-lastD[mname]['When'] > 3600 : result = ["-", "-"] + else : result = [lastD[mname]['Temperature'], lastD[mname]['Humidity']] + result.extend(devList.MinMaxTH(station, mname)) + return result + +# auto-test when executed directly + +if __name__ == "__main__": + + from sys import exit, stdout, stderr + + if not _CLIENT_ID or not _CLIENT_SECRET or not _USERNAME or not _PASSWORD : + stderr.write("Library source missing identification arguments to check lnetatmo.py (user/password/etc...)") + exit(1) + + authorization = ClientAuth() # Test authentication method + user = User(authorization) # Test GETUSER + devList = DeviceList(authorization) # Test DEVICELIST + devList.MinMaxTH() # Test GETMEASURE + + # If we reach this line, all is OK + + # If launched interactively, display OK message + if stdout.isatty(): + print("lnetatmo.py : OK") + + exit(0) diff --git a/README.md b/README.md new file mode 100644 index 0000000..13202fc --- /dev/null +++ b/README.md @@ -0,0 +1,19 @@ +# Dashboard Data Collection Tools + +## Overview + +This is a collection of Python scripts to poll cluster scheduling data from the [Slurm Workload Manager](http://slurm.schedmd.com/) and a [Netatmo](http://netatmo.com) device, and then upload it to on instance of [InfluxDB](https://influxdb.com/). + +For example, the [zBox4 Dashboard](https://labs.cheleb.net/grafana/dashboard/db/zbox) uses these scripts. + +## Requirements + +- Execute the scripts on a host that can access Slurm data. +- Put your InfluxDB access data in the *userpass_influx* file. +- Put your Netatmo access data in the *userpass_netatmo* file. + +If you do not want to use the Netatmo part, simply comment out the code. + +## Contact + +Questions, comments, rants should be sent to [volker@cheleb.net](mailto:volker@cheleb.net). diff --git a/ticker.py b/ticker.py new file mode 100644 index 0000000..3b52b08 --- /dev/null +++ b/ticker.py @@ -0,0 +1,66 @@ +""" +Read, Parse, Post Stats to InfluxDB. +""" + +import Common.slurm as slurm +import Common.sensors as sensors +import Common.influx as influx +import time + + +# ############################################################################# +# Load Data from Slurm +# ############################################################################# + +# Number of Nodes Down +number_of_nodes_down = slurm.get_number_of_nodes_down() + +# CPU Allocations +cpu_allocations = slurm.get_cpu_allocations() + +# Jobs per Partition per State +njobs_by_partition_and_state = \ + slurm.get_number_of_jobs_by_partition_and_state() + +# ############################################################################# +# Load Data from Netatmo +# ############################################################################# +temperature, netatmo_epoch = sensors.get_netatmo_temperature() + +# ############################################################################# +# Build Data Post +# ############################################################################# +epoch = int(time.time()) +lines = [] + +# CPU Allocations +for partition, ncpus_allocated in cpu_allocations.iteritems(): + line = "ncpus_allocated,partition=%s value=%i %i" % \ + (partition, ncpus_allocated, epoch) + lines.append(line) + +# Number of Nodes Down +for partition, nnodes_down in number_of_nodes_down.iteritems(): + line = "nodes,state=down,partition=%s value=%i %i" % \ + (partition, nnodes_down, epoch) + lines.append(line) + +# Jobs per Partition per State +for partition, njobs_by_state in njobs_by_partition_and_state.iteritems(): + for state, njobs in njobs_by_state.iteritems(): + line = "jobs,state=%s,partition=%s value=%i %i" % \ + (state, partition, njobs, epoch) + lines.append(line) + +# Netatmo +line = "room_temperature,room=zbox_room value=%.2f %i" % \ + (temperature, netatmo_epoch) +lines.append(line) + +# Join +data = "\n".join(lines) + +# ############################################################################# +# Post Data +# ############################################################################# +influx.post_data(data) diff --git a/userpass_influx_example b/userpass_influx_example new file mode 100644 index 0000000..a530d0c --- /dev/null +++ b/userpass_influx_example @@ -0,0 +1 @@ +host,port,database,username,password diff --git a/userpass_netatmo_example b/userpass_netatmo_example new file mode 100644 index 0000000..f3d5438 --- /dev/null +++ b/userpass_netatmo_example @@ -0,0 +1,2 @@ +clientid,clientsecret,username,password + |