aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVolker Hoffmann <volker@cheleb.net>2015-08-25 19:06:55 +0200
committerVolker Hoffmann <volker@cheleb.net>2015-08-25 19:06:55 +0200
commit8b1be28eab48457806a051ad7807c9ef0b9368d7 (patch)
treee023b58f8c8eb1e1c5326cbe7e3c47ede5edf404
initial commit
-rw-r--r--.gitignore3
-rw-r--r--Common/__init__.py0
-rw-r--r--Common/influx.py34
-rw-r--r--Common/sensors.py37
-rw-r--r--Common/slurm.py158
-rw-r--r--External/README.md3
-rw-r--r--External/__init__.py0
-rw-r--r--External/lnetatmo.py305
-rw-r--r--README.md19
-rw-r--r--ticker.py66
-rw-r--r--userpass_influx_example1
-rw-r--r--userpass_netatmo_example2
12 files changed, 628 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..5abbc39
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+*.pyc
+userpass_influx*
+userpass_netatmo
diff --git a/Common/__init__.py b/Common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/Common/__init__.py
diff --git a/Common/influx.py b/Common/influx.py
new file mode 100644
index 0000000..73d4371
--- /dev/null
+++ b/Common/influx.py
@@ -0,0 +1,34 @@
+"""
+InfluxDB Interaction.
+"""
+
+import requests
+
+
+def post_data(data):
+ """
+ Post Data to Server.
+
+ @param: data - String of data to post.
+ """
+
+ # Load Userpass CSV File (InfluxDB)
+ # Format: server,port,db,user,pass
+ with open('userpass_influx', 'r') as f:
+ line = f.readline()
+ line = line.strip().split(',')
+ host = line[0]
+ port = line[1]
+ db = line[2]
+ user = line[3]
+ pswd = line[4]
+
+ # Post
+ url = "http://%s:8086/write?db=%s&precision=s" % (host, db)
+ auth = requests.auth.HTTPBasicAuth("%s" % user, "%s" % pswd)
+ r = requests.post("%s" % url, auth=auth, data="%s" % data)
+
+ # Debug
+ # print r.status_code
+ # print r.headers
+ # print r.content
diff --git a/Common/sensors.py b/Common/sensors.py
new file mode 100644
index 0000000..9f588ff
--- /dev/null
+++ b/Common/sensors.py
@@ -0,0 +1,37 @@
+"""
+Get Environmental Sensor Data.
+"""
+
+import External.lnetatmo as lnetatmo
+
+
+def get_netatmo_temperature(station='zBox Room'):
+ """
+ Get Netatmo Temperature Data.
+
+ @param: station - Name of sensor to poll [String]
+ @return: temperature - Temperature (Celsius) [Float]
+ @return: epoch - Unix Timestamp (Seconds) [Float]
+ """
+
+ # Load Credentials
+ with open('userpass_netatmo', 'r') as f:
+ line = f.readline()
+ line = line.strip().split(',')
+ clientId = line[0]
+ clientSecret = line[1]
+ username = line[2]
+ password = line[3]
+
+ # Auth to Netatmo API
+ authorization = lnetatmo.ClientAuth(clientId = clientId, \
+ clientSecret = clientSecret, \
+ username = username, \
+ password = password)
+ devList = lnetatmo.DeviceList(authorization)
+
+ # Get Data
+ temperature = devList.lastData()[station]['Temperature']
+ epoch = devList.lastData()[station]['When']
+
+ return temperature, epoch
diff --git a/Common/slurm.py b/Common/slurm.py
new file mode 100644
index 0000000..004cd63
--- /dev/null
+++ b/Common/slurm.py
@@ -0,0 +1,158 @@
+"""
+Slurm Polling Functions.
+"""
+
+import subprocess as sp
+
+
+def get_number_of_nodes_down():
+ """
+ Get Down/Drained Nodes for Partitions.
+
+ Slurm Command:
+ sinfo --format=%o --list-reasons --noheader --partition=zbox
+
+ @return: number_of_nodes_down [Integer]
+ """
+
+ number_of_nodes_down = {}
+ partitions = [ 'zbox', 'serial', 'debug', 'tasna', 'vesta' ]
+ for partition in partitions:
+ cmd = [ 'sinfo', '--format=%o', \
+ '--list-reasons', '--noheader', \
+ "--partition=%s" % partition ]
+ p = sp.Popen(cmd, stdout=sp.PIPE)
+ p.wait()
+ data, _ = p.communicate()
+ if len(data) == 0:
+ count = 0
+ else:
+ count = len(data.strip().split('\n'))
+ number_of_nodes_down["%s" % partition] = count
+
+ # Aggregate CPU Counts
+ cpu_partitions = [ 'zbox', 'serial', 'debug' ]
+ running_sum = 0
+ for partition in cpu_partitions:
+ running_sum += number_of_nodes_down["%s" % partition]
+ number_of_nodes_down['cpu'] = running_sum
+
+ # Aggregate GPU Counts
+ gpu_partitions = [ 'tasna', 'vesta' ]
+ running_sum = 0
+ for partition in gpu_partitions:
+ running_sum += number_of_nodes_down["%s" % partition]
+ number_of_nodes_down['gpu'] = running_sum
+
+ # Fake Data
+ # number_of_nodes_down = \
+ # {'cpu': 15,
+ # 'debug': 3,
+ # 'gpu': 1,
+ # 'serial': 2,
+ # 'tasna': 1,
+ # 'vesta': 0,
+ # 'zbox': 10}
+
+ return number_of_nodes_down
+
+
+def get_cpu_allocations():
+ """
+ Get CPU Allocations per Partition. Also return sum over all CPU partitions.
+
+ Slurm Command:
+ squeue --format=%C --partition zbox --noheader
+
+ @return: number_of_allocated_cpus [Dict {'partition': ncpus}]
+ """
+
+ # Old, Really Slow Command:
+ # sacct --format=partition,alloccpus --allocations \
+ # --state=RUNNING --allusers --noheader --parsable2
+
+ partitions = [ 'zbox', 'serial', 'debug' ]
+ number_of_allocated_cpus = {}
+ for partition in partitions:
+ cmd = [ 'squeue', \
+ '--format=%C', '--noheader', "--partition=%s" % partition ]
+ p = sp.Popen(cmd, stdout=sp.PIPE)
+ p.wait()
+ data, _ = p.communicate()
+ running_sum = 0
+ if len(data) > 0:
+ for line in data.strip().split('\n'):
+ running_sum += int(line)
+ number_of_allocated_cpus["%s" % partition] = running_sum
+ number_of_allocated_cpus['cpu'] = sum(number_of_allocated_cpus.values())
+
+ # Fake Data
+ # number_of_allocated_cpus = { 'zbox': 768, 'serial': 12, 'debug': 198 }
+
+ # Return
+ return number_of_allocated_cpus
+
+
+def get_number_of_jobs_by_partition_and_state():
+ """
+ Get Number of Jobs by State and Partition.
+
+ Slurm Command:
+ squeue --noheader --format=%T:%P --partition=zbox --state=pending
+
+ @return number_of_jobs_by_partition - [Dict {'zbox': {'running': 1}}
+ """
+
+ partitions = [ 'zbox', 'serial', 'debug', 'tasna', 'vesta' ]
+ states = [ 'pending', 'running' ]
+
+ # Get Raw Data
+ number_of_jobs_by_partition = {}
+ for partition in partitions:
+ number_of_jobs_by_state = {}
+ for state in states:
+ cmd = [ 'squeue', '--noheader', '--format=%T:%P', \
+ "--partition=%s" % partition, \
+ "--state=%s" % state ]
+ p = sp.Popen(cmd, stdout=sp.PIPE)
+ p.wait()
+ data, _ = p.communicate()
+ if len(data) == 0:
+ count = 0
+ else:
+ count = len(data.strip().split('\n'))
+ number_of_jobs_by_state["%s" % state] = count
+ number_of_jobs_by_partition["%s" % partition] = \
+ number_of_jobs_by_state
+
+ # Aggregate CPU Counts
+ cpu_partitions = [ 'zbox', 'serial', 'debug' ]
+ number_of_jobs_by_state = {}
+ for state in states:
+ total = 0
+ for partition in cpu_partitions:
+ total += number_of_jobs_by_partition[partition][state]
+ number_of_jobs_by_state[state] = total
+ number_of_jobs_by_partition['cpu'] = number_of_jobs_by_state
+
+ # Aggregate GPU Counts
+ gpu_partitions = [ 'vesta', 'tasna' ]
+ number_of_jobs_by_state = {}
+ for state in states:
+ total = 0
+ for partition in gpu_partitions:
+ total += number_of_jobs_by_partition[partition][state]
+ number_of_jobs_by_state[state] = total
+ number_of_jobs_by_partition['gpu'] = number_of_jobs_by_state
+
+ # Fake Data
+ # number_of_jobs_by_partition = \
+ # {'cpu': {'pending': 0, 'running': 465},
+ # 'debug': {'pending': 0, 'running': 0},
+ # 'gpu': {'pending': 0, 'running': 62},
+ # 'serial': {'pending': 0, 'running': 457},
+ # 'tasna': {'pending': 0, 'running': 31},
+ # 'vesta': {'pending': 0, 'running': 31},
+ # 'zbox': {'pending': 0, 'running': 8}}
+
+ return number_of_jobs_by_partition
diff --git a/External/README.md b/External/README.md
new file mode 100644
index 0000000..2710327
--- /dev/null
+++ b/External/README.md
@@ -0,0 +1,3 @@
+# External Packages
+
+- lnetatmo.py -- Netatmo API (https://github.com/philippelt/netatmo-api-python)
diff --git a/External/__init__.py b/External/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/External/__init__.py
diff --git a/External/lnetatmo.py b/External/lnetatmo.py
new file mode 100644
index 0000000..155669c
--- /dev/null
+++ b/External/lnetatmo.py
@@ -0,0 +1,305 @@
+# Published Jan 2013
+# Revised Jan 2014 (to add new modules data)
+# Author : Philippe Larduinat, philippelt@users.sourceforge.net
+# Public domain source code
+
+# This API provides access to the Netatmo (Internet weather station) devices
+# This package can be used with Python2 or Python3 applications and do not
+# require anything else than standard libraries
+
+# PythonAPI Netatmo REST data access
+# coding=utf-8
+
+from sys import version_info
+import json, time
+
+# HTTP libraries depends upon Python 2 or 3
+if version_info.major == 3 :
+ import urllib.parse, urllib.request
+else:
+ from urllib import urlencode
+ import urllib2
+
+######################## USER SPECIFIC INFORMATION ######################
+
+# To be able to have a program accessing your netatmo data, you have to register your program as
+# a Netatmo app in your Netatmo account. All you have to do is to give it a name (whatever) and you will be
+# returned a client_id and secret that your app has to supply to access netatmo servers.
+
+_CLIENT_ID = "" # Your client ID from Netatmo app registration at http://dev.netatmo.com/dev/listapps
+_CLIENT_SECRET = "" # Your client app secret ' '
+_USERNAME = "" # Your netatmo account username
+_PASSWORD = "" # Your netatmo account password
+
+#########################################################################
+
+
+# Common definitions
+
+_BASE_URL = "https://api.netatmo.net/"
+_AUTH_REQ = _BASE_URL + "oauth2/token"
+_GETUSER_REQ = _BASE_URL + "api/getuser"
+_DEVICELIST_REQ = _BASE_URL + "api/devicelist"
+_GETMEASURE_REQ = _BASE_URL + "api/getmeasure"
+
+
+class ClientAuth:
+ "Request authentication and keep access token available through token method. Renew it automatically if necessary"
+
+ def __init__(self, clientId=_CLIENT_ID,
+ clientSecret=_CLIENT_SECRET,
+ username=_USERNAME,
+ password=_PASSWORD):
+
+ postParams = {
+ "grant_type" : "password",
+ "client_id" : clientId,
+ "client_secret" : clientSecret,
+ "username" : username,
+ "password" : password,
+ "scope" : "read_station"
+ }
+ resp = postRequest(_AUTH_REQ, postParams)
+
+ self._clientId = clientId
+ self._clientSecret = clientSecret
+ self._accessToken = resp['access_token']
+ self.refreshToken = resp['refresh_token']
+ self._scope = resp['scope']
+ self.expiration = int(resp['expire_in'] + time.time())
+
+ @property
+ def accessToken(self):
+
+ if self.expiration < time.time(): # Token should be renewed
+
+ postParams = {
+ "grant_type" : "refresh_token",
+ "refresh_token" : self.refreshToken,
+ "client_id" : self._clientId,
+ "client_secret" : self._clientSecret
+ }
+ resp = postRequest(_AUTH_REQ, postParams)
+
+ self._accessToken = resp['access_token']
+ self.refreshToken = resp['refresh_token']
+ self.expiration = int(resp['expire_in'] + time.time())
+
+ return self._accessToken
+
+class User:
+
+ def __init__(self, authData):
+
+ postParams = {
+ "access_token" : authData.accessToken
+ }
+ resp = postRequest(_GETUSER_REQ, postParams)
+ self.rawData = resp['body']
+ self.id = self.rawData['_id']
+ self.devList = self.rawData['devices']
+ self.ownerMail = self.rawData['mail']
+
+class DeviceList:
+
+ def __init__(self, authData):
+
+ self.getAuthToken = authData.accessToken
+ postParams = {
+ "access_token" : self.getAuthToken,
+ "app_type" : "app_station"
+ }
+ resp = postRequest(_DEVICELIST_REQ, postParams)
+ self.rawData = resp['body']
+ self.stations = { d['_id'] : d for d in self.rawData['devices'] }
+ self.modules = { m['_id'] : m for m in self.rawData['modules'] }
+ self.default_station = list(self.stations.values())[0]['station_name']
+
+ def modulesNamesList(self, station=None):
+ res = [m['module_name'] for m in self.modules.values()]
+ res.append(self.stationByName(station)['module_name'])
+ return res
+
+ def stationByName(self, station=None):
+ if not station : station = self.default_station
+ for i,s in self.stations.items():
+ if s['station_name'] == station : return self.stations[i]
+ return None
+
+ def stationById(self, sid):
+ return None if sid not in self.stations else self.stations[sid]
+
+ def moduleByName(self, module, station=None):
+ s = None
+ if station :
+ s = self.stationByName(station)
+ if not s : return None
+ for m in self.modules:
+ mod = self.modules[m]
+ if mod['module_name'] == module :
+ if not s or mod['main_device'] == s['_id'] : return mod
+ return None
+
+ def moduleById(self, mid, sid=None):
+ s = self.stationById(sid) if sid else None
+ if mid in self.modules :
+ return self.modules[mid] if not s or self.modules[mid]['main_device'] == s['_id'] else None
+
+ def lastData(self, station=None, exclude=0):
+ s = self.stationByName(station)
+ if not s : return None
+ lastD = dict()
+ # Define oldest acceptable sensor measure event
+ limit = (time.time() - exclude) if exclude else 0
+ ds = s['dashboard_data']
+ if ds['time_utc'] > limit :
+ lastD[s['module_name']] = ds.copy()
+ lastD[s['module_name']]['When'] = lastD[s['module_name']].pop("time_utc")
+ lastD[s['module_name']]['wifi_status'] = s['wifi_status']
+ for mId in s["modules"]:
+ ds = self.modules[mId]['dashboard_data']
+ if ds['time_utc'] > limit :
+ mod = self.modules[mId]
+ lastD[mod['module_name']] = ds.copy()
+ lastD[mod['module_name']]['When'] = lastD[mod['module_name']].pop("time_utc")
+ # For potential use, add battery and radio coverage information to module data if present
+ for i in ('battery_vp', 'rf_status') :
+ if i in mod : lastD[mod['module_name']][i] = mod[i]
+ return lastD
+
+ def checkNotUpdated(self, station=None, delay=3600):
+ res = self.lastData(station)
+ ret = []
+ for mn,v in res.items():
+ if time.time()-v['When'] > delay : ret.append(mn)
+ return ret if ret else None
+
+ def checkUpdated(self, station=None, delay=3600):
+ res = self.lastData(station)
+ ret = []
+ for mn,v in res.items():
+ if time.time()-v['When'] < delay : ret.append(mn)
+ return ret if ret else None
+
+ def getMeasure(self, device_id, scale, mtype, module_id=None, date_begin=None, date_end=None, limit=None, optimize=False, real_time=False):
+ postParams = { "access_token" : self.getAuthToken }
+ postParams['device_id'] = device_id
+ if module_id : postParams['module_id'] = module_id
+ postParams['scale'] = scale
+ postParams['type'] = mtype
+ if date_begin : postParams['date_begin'] = date_begin
+ if date_end : postParams['date_end'] = date_end
+ if limit : postParams['limit'] = limit
+ postParams['optimize'] = "true" if optimize else "false"
+ postParams['real_time'] = "true" if real_time else "false"
+ return postRequest(_GETMEASURE_REQ, postParams)
+
+ def MinMaxTH(self, station=None, module=None, frame="last24"):
+ if not station : station = self.default_station
+ s = self.stationByName(station)
+ if not s :
+ s = self.stationById(station)
+ if not s : return None
+ if frame == "last24":
+ end = time.time()
+ start = end - 24*3600 # 24 hours ago
+ elif frame == "day":
+ start, end = todayStamps()
+ if module and module != s['module_name']:
+ m = self.moduleByName(module, s['station_name'])
+ if not m :
+ m = self.moduleById(s['_id'], module)
+ if not m : return None
+ # retrieve module's data
+ resp = self.getMeasure(
+ device_id = s['_id'],
+ module_id = m['_id'],
+ scale = "max",
+ mtype = "Temperature,Humidity",
+ date_begin = start,
+ date_end = end)
+ else : # retrieve station's data
+ resp = self.getMeasure(
+ device_id = s['_id'],
+ scale = "max",
+ mtype = "Temperature,Humidity",
+ date_begin = start,
+ date_end = end)
+ if resp:
+ T = [v[0] for v in resp['body'].values()]
+ H = [v[1] for v in resp['body'].values()]
+ return min(T), max(T), min(H), max(H)
+ else:
+ return None
+
+# Utilities routines
+
+def postRequest(url, params):
+ if version_info.major == 3:
+ req = urllib.request.Request(url)
+ req.add_header("Content-Type","application/x-www-form-urlencoded;charset=utf-8")
+ params = urllib.parse.urlencode(params).encode('utf-8')
+ resp = urllib.request.urlopen(req, params).readall().decode("utf-8")
+ else:
+ params = urlencode(params)
+ headers = {"Content-Type" : "application/x-www-form-urlencoded;charset=utf-8"}
+ req = urllib2.Request(url=url, data=params, headers=headers)
+ resp = urllib2.urlopen(req).read()
+ return json.loads(resp)
+
+def toTimeString(value):
+ return time.strftime("%Y-%m-%d_%H:%M:%S", time.localtime(int(value)))
+
+def toEpoch(value):
+ return int(time.mktime(time.strptime(value,"%Y-%m-%d_%H:%M:%S")))
+
+def todayStamps():
+ today = time.strftime("%Y-%m-%d")
+ today = int(time.mktime(time.strptime(today,"%Y-%m-%d")))
+ return today, today+3600*24
+
+# Global shortcut
+
+def getStationMinMaxTH(station=None, module=None):
+ authorization = ClientAuth()
+ devList = DeviceList(authorization)
+ if not station : station = devList.default_station
+ if module :
+ mname = module
+ else :
+ mname = devList.stationByName(station)['module_name']
+ lastD = devList.lastData(station)
+ if mname == "*":
+ result = dict()
+ for m in lastD.keys():
+ if time.time()-lastD[m]['When'] > 3600 : continue
+ r = devList.MinMaxTH(module=m)
+ result[m] = (r[0], lastD[m]['Temperature'], r[1])
+ else:
+ if time.time()-lastD[mname]['When'] > 3600 : result = ["-", "-"]
+ else : result = [lastD[mname]['Temperature'], lastD[mname]['Humidity']]
+ result.extend(devList.MinMaxTH(station, mname))
+ return result
+
+# auto-test when executed directly
+
+if __name__ == "__main__":
+
+ from sys import exit, stdout, stderr
+
+ if not _CLIENT_ID or not _CLIENT_SECRET or not _USERNAME or not _PASSWORD :
+ stderr.write("Library source missing identification arguments to check lnetatmo.py (user/password/etc...)")
+ exit(1)
+
+ authorization = ClientAuth() # Test authentication method
+ user = User(authorization) # Test GETUSER
+ devList = DeviceList(authorization) # Test DEVICELIST
+ devList.MinMaxTH() # Test GETMEASURE
+
+ # If we reach this line, all is OK
+
+ # If launched interactively, display OK message
+ if stdout.isatty():
+ print("lnetatmo.py : OK")
+
+ exit(0)
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..13202fc
--- /dev/null
+++ b/README.md
@@ -0,0 +1,19 @@
+# Dashboard Data Collection Tools
+
+## Overview
+
+This is a collection of Python scripts to poll cluster scheduling data from the [Slurm Workload Manager](http://slurm.schedmd.com/) and a [Netatmo](http://netatmo.com) device, and then upload it to on instance of [InfluxDB](https://influxdb.com/).
+
+For example, the [zBox4 Dashboard](https://labs.cheleb.net/grafana/dashboard/db/zbox) uses these scripts.
+
+## Requirements
+
+- Execute the scripts on a host that can access Slurm data.
+- Put your InfluxDB access data in the *userpass_influx* file.
+- Put your Netatmo access data in the *userpass_netatmo* file.
+
+If you do not want to use the Netatmo part, simply comment out the code.
+
+## Contact
+
+Questions, comments, rants should be sent to [volker@cheleb.net](mailto:volker@cheleb.net).
diff --git a/ticker.py b/ticker.py
new file mode 100644
index 0000000..3b52b08
--- /dev/null
+++ b/ticker.py
@@ -0,0 +1,66 @@
+"""
+Read, Parse, Post Stats to InfluxDB.
+"""
+
+import Common.slurm as slurm
+import Common.sensors as sensors
+import Common.influx as influx
+import time
+
+
+# #############################################################################
+# Load Data from Slurm
+# #############################################################################
+
+# Number of Nodes Down
+number_of_nodes_down = slurm.get_number_of_nodes_down()
+
+# CPU Allocations
+cpu_allocations = slurm.get_cpu_allocations()
+
+# Jobs per Partition per State
+njobs_by_partition_and_state = \
+ slurm.get_number_of_jobs_by_partition_and_state()
+
+# #############################################################################
+# Load Data from Netatmo
+# #############################################################################
+temperature, netatmo_epoch = sensors.get_netatmo_temperature()
+
+# #############################################################################
+# Build Data Post
+# #############################################################################
+epoch = int(time.time())
+lines = []
+
+# CPU Allocations
+for partition, ncpus_allocated in cpu_allocations.iteritems():
+ line = "ncpus_allocated,partition=%s value=%i %i" % \
+ (partition, ncpus_allocated, epoch)
+ lines.append(line)
+
+# Number of Nodes Down
+for partition, nnodes_down in number_of_nodes_down.iteritems():
+ line = "nodes,state=down,partition=%s value=%i %i" % \
+ (partition, nnodes_down, epoch)
+ lines.append(line)
+
+# Jobs per Partition per State
+for partition, njobs_by_state in njobs_by_partition_and_state.iteritems():
+ for state, njobs in njobs_by_state.iteritems():
+ line = "jobs,state=%s,partition=%s value=%i %i" % \
+ (state, partition, njobs, epoch)
+ lines.append(line)
+
+# Netatmo
+line = "room_temperature,room=zbox_room value=%.2f %i" % \
+ (temperature, netatmo_epoch)
+lines.append(line)
+
+# Join
+data = "\n".join(lines)
+
+# #############################################################################
+# Post Data
+# #############################################################################
+influx.post_data(data)
diff --git a/userpass_influx_example b/userpass_influx_example
new file mode 100644
index 0000000..a530d0c
--- /dev/null
+++ b/userpass_influx_example
@@ -0,0 +1 @@
+host,port,database,username,password
diff --git a/userpass_netatmo_example b/userpass_netatmo_example
new file mode 100644
index 0000000..f3d5438
--- /dev/null
+++ b/userpass_netatmo_example
@@ -0,0 +1,2 @@
+clientid,clientsecret,username,password
+