Source code for nukescripts.hardwareinfo

# Copyright (c) 2014 The Foundry Visionmongers Ltd.  All Rights Reserved.

# Warning: the code below is extracted from an internal test tool (Tests/Common/FnSysInfo.py) and is not recommended for
# use in your own scripts.

import os, sys, glob, math
import xml.sax.saxutils

#Helpers

#Temporary until we fix system profiler.
[docs]def RunCmdWithTimeout(cmd, timeout): import subprocess, time p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) timerStep = 0.5 runTime = 0 while True: if p.poll() is not None: break runTime += timerStep if runTime > timeout: p.terminate() return -1 sys.exit(1) time.sleep(timerStep) return 0
#End temporary
[docs]def ConvertUnits(s, token, multiplier): if s.find(token) != -1: realSpeed = s[:s.find(token)] realSpeedFlt = float(realSpeed) return str(int(realSpeedFlt * multiplier))
[docs]def ConvertSpeedUnitsToMhZ(s): p = ConvertUnits(s, "GHz", 1000) if p != None: return p return str(int(math.ceil(float(s))))
[docs]def ConvertMemSizeToKb(s): p = ConvertUnits(s, "MB", 1000) if p != None: return p p = ConvertUnits(s, "GB", 1000000) if p != None: return p p = ConvertUnits(s, "Gb", 1000000) if p != None: return p p = ConvertUnits(s, "kB", 1) if p != None: return p p = ConvertUnits(s, "KB", 1) if p != None: return p return s
#Standard Identifiers gCPUSpeed = "CPUSpeed_Mhz" gMachineName = "MachineName" gNumCPUs = "NumCPUs" gNumCores = "NumCores" gRAM = "RAM_Kb" gCPUType = "CPUType" gOS = "OS" gBusSpeed = "BusSpeed" gL2Cache = "L2Cache_Kb" gOSVersion = "OSVersion" gKernelVersion = "KernelVersion"
[docs]class HardwareInfo: def __init__(self): self._standardDict = {} self.gLogStr = "" if sys.platform.lower() == "darwin": self.SafeCall(self.initMac) elif (sys.platform.lower() == "linux") | (sys.platform.lower() == "linux2"): self.SafeCall(self.initLinux) elif (sys.platform.lower().find("win") == 0): self.SafeCall(self.initWin) def SafeCall(self, f, *positional, **keyword): try: return f(*positional, **keyword) except Exception as e: self.gLogStr += "Function: calling " + f.__name__ + "\n" self.gLogStr += "\tDescription:" + str(f.__doc__) + "\n" self.gLogStr += "\tError:" + str(e) + "\n" def testCatCommand(self, info): if not os.path.exists(info): self.gLogStr += "Failed to find " + info return False return True def parseProcInfo(self, info): success = self.testCatCommand(info) if not success: return ([{}], False) import subprocess cpuinfo = subprocess.getoutput("cat " + info) return self.parseProcInfoStr(cpuinfo) def parseProcInfoStr(self, cpuinfo): import copy values = [] currentDict = {} for i in cpuinfo.split("\n"): if i.find(":") == -1: values.append(copy.deepcopy(currentDict)) currentDict = {} continue tokens = i.split(":") name = tokens[0] value = ":".join(tokens[1:]) currentDict[name.strip()] = value.strip() if len(currentDict) != 0: values.append(copy.deepcopy(currentDict)) return (values, True) def parseCPUInfo(self): (itemDict, success) = self.parseProcInfo("/proc/cpuinfo") if success: mapping = [["cpu MHz", gCPUSpeed], ["model name", gCPUType], ["cache size", gL2Cache]] self.MapDictionaries(itemDict, self._standardDict, mapping, "/proc/cpuinfo") self._standardDict[gL2Cache] = ConvertMemSizeToKb(self._standardDict[gL2Cache]) self._standardDict[gCPUSpeed] = ConvertSpeedUnitsToMhZ(self._standardDict[gCPUSpeed]) self._standardDict[gNumCores] = len(itemDict) def ParseProcVersion(self): if self.testCatCommand("/etc/redhat-release"): import subprocess osVersion = subprocess.getoutput("cat /etc/redhat-release") self._standardDict[gOSVersion] = osVersion elif self.testCatCommand("/etc/lsb-release"): import subprocess lsbInfo = subprocess.getoutput("cat /etc/lsb-release") for i in lsbInfo.split("\n"): if i.find("DISTRIB_DESCRIPTION") != -1: self._standardDict[gOSVersion] = i.split("=")[1].replace("\"","") elif self.testCatCommand("/proc/version"): import subprocess osVersion = subprocess.getoutput("cat /proc/version") start = osVersion.find(" #1 ") osVersionShort = osVersion[:start] token = osVersionShort.split("(")[-1] self._standardDict[gOSVersion] = token.replace(")", "") def parseProcSimple(self, file, entry): import subprocess if os.path.exists(file): self._standardDict[entry] = subprocess.getoutput("cat " + file) else: self.gLogStr += file + " doesn't exist.\n" self._standardDict[entry] = "Unknown" def parseMemInfo(self): (itemDict, success) = self.parseProcInfo("/proc/meminfo") if success: self.MapDictionaries(itemDict, self._standardDict, [["MemTotal", gRAM]], "/proc/meminfo") self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM]) def initLinux(self): self._standardDict[gOS] = "linux" self.SafeCall(self.parseCPUInfo) self.SafeCall(self.ParseProcVersion) self.SafeCall(self.parseProcSimple, "/proc/sys/kernel/osrelease", gKernelVersion) self.SafeCall(self.parseProcSimple, "/proc/sys/kernel/hostname", gMachineName) self.SafeCall(self.parseMemInfo) def getRegistryNumSubKeys(self, key, subkey): try: import winreg key = getattr(_winreg, key) handle = winreg.OpenKey(key, subkey) return winreg.QueryInfoKey(handle)[0] except: self.gLogStr += "Failed to find " + str(key) + ", " + str(subkey) + "\n" return "Unknown" def getRegistryValue(self, key, subkey, value): try: import winreg key = getattr(_winreg, key) handle = winreg.OpenKey(key, subkey) (result, type) = winreg.QueryValueEx(handle, value) return result except: self.gLogStr += "Failed to find " + str(key) + ", " + str(subkey) + ", " + str(value) + "\n" return "Unknown" def getWindowsRam(self): try: from psutil import virtual_memory mem = virtual_memory() memGb = mem.total * 0.000000001 return str(memGb) + " Gb" except: return "unable to import psutil - memory information disabled" def RunCmdWin(cmd): import subprocess process = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) process.stdin.close() output = "" while line != "": line = process.stdout.readline() output += line return output def getWindowsL2Cache(self): output = self.RunCmdWin("wmic cpu get L2CacheSize") return output.split("\n")[-1] def getWindowsOSVersion(self): def get(key): return self.getRegistryValue("HKEY_LOCAL_MACHINE", "SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion", key) os = get("ProductName") sp = get("CSDVersion") build = get("CurrentBuildNumber") return "%s %s (build %s)" % (os, sp, build) def getWindowsMachineName(self): return self.getRegistryValue("HKEY_LOCAL_MACHINE", "SYSTEM\\CurrentControlSet\\Control\\ComputerName\\ComputerName", "ComputerName") def initWin(self): import ctypes self._standardDict[gOS] = "win" self._standardDict[gCPUType] = self.getRegistryValue("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString") self._standardDict[gCPUSpeed] = self.getRegistryValue("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "~MHz") self._standardDict[gNumCores] = self.getRegistryNumSubKeys("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor") self._standardDict[gRAM] = self.SafeCall(self.getWindowsRam) self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM]) self._standardDict[gOSVersion] = self.SafeCall(self.getWindowsOSVersion) self._standardDict[gL2Cache] = self.SafeCall(self.getWindowsL2Cache) self._standardDict[gMachineName] = self.SafeCall(self.getWindowsMachineName) def MapDictionaries(self, originalDict, addTo, mapping, name): for i in mapping: try: addTo[i[1]] = originalDict[0][i[0]] except: self.gLogStr += "Failed to find key: " + i[0] + " in " + name + "\n" addTo[i[1]] = "Unknown"
[docs] def initMac(self): """Initialises the object for mac - relies on system_profiler being in the path""" self._standardDict[gOS] = "mac" import tempfile (handle, tmpPList) = tempfile.mkstemp() try: os.close(handle) status = RunCmdWithTimeout("system_profiler -xml > " + tmpPList, 300.0) self.SafeCall(self.initMacFromFile, tmpPList) finally: os.remove(tmpPList)
def initMacHardware(self, itemDicts): mapping = [["current_processor_speed", gCPUSpeed], ["physical_memory", gRAM], ["number_processors", gNumCores], ["cpu_type", gCPUType], ["l2_cache", gL2Cache]] self.MapDictionaries(itemDicts, self._standardDict, mapping, "SPHardwareDataType") if self._standardDict[gL2Cache] == "Unknown": fixMacChangingThingsMapping = [["l2_cache_share", gL2Cache]] self.MapDictionaries(itemDicts, self._standardDict, fixMacChangingThingsMapping, "SPHardwareDataType") self._standardDict[gCPUSpeed] = ConvertSpeedUnitsToMhZ(self._standardDict[gCPUSpeed]) self._standardDict[gL2Cache] = ConvertMemSizeToKb(self._standardDict[gL2Cache]) self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM]) extendedMapping = [["machine_model", gMachineName]] self.MapDictionaries(itemDicts, self._extendedDict, extendedMapping, "SPHardwareDataType") def initMacSoftware(self, itemDicts): mapping = [["os_version", gOSVersion], ["kernel_version", gKernelVersion], ["local_host_name", gMachineName]] self.MapDictionaries(itemDicts, self._standardDict, mapping, "SPSoftwareDataType")
[docs] def printDict(self, pDict, handle, indentLevel = 1): """ outputs the elements from pDict in XML structure, nesting when an element contains another dict """ for key in sorted(pDict): elem = pDict[key] handle.write(" " * indentLevel) if type(elem) == dict: #nest handle.write("<%s>\n" % key) self.printDict(elem, handle, indentLevel +1) handle.write(" " * indentLevel) handle.write("</%s>" % key) else: #bottom level, just write the value of the key if type(elem) == str or type(elem) == str: elem = elem.encode('UTF-8') #if isinstance(elem, str): # elem = elem.replace(u"\u001B", "") try: for badch in "<>&": #make sure anything with these is escaped, so XML doesn't screw up if badch in str(elem) and not str(elem).startswith("<![CDATA"): elem = "<![CDATA[\n%s\n]]>" % elem for c in str(elem): if ord(c) > 128: raise UnicodeDecodeError handle.write("<%(key)s>%(elem)s</%(key)s>" % { "key": xml.sax.saxutils.escape(key), "elem": elem }) except UnicodeDecodeError as e: handle.write("<%(key)s>[--- unicode decoding problem ---]</%(key)s>" % {"key": key}) handle.write("\n")
def printXML(self, file, indentLevel = 1): self.printDict(self._standardDict, file, indentLevel);
[docs]def PrintMachineInfoToFile(file, indentLevel = 1): info = HardwareInfo() info.printXML(file, indentLevel)