Package nukescripts :: Module hardwareinfo
[hide private]
[frames] | no frames]

Source Code for Module nukescripts.hardwareinfo

  1  # Copyright (c) 2014 The Foundry Visionmongers Ltd.  All Rights Reserved. 
  2   
  3  # Warning: the code below is extracted from an internal test tool (Tests/Common/FnSysInfo.py) and is not recommended for 
  4  # use in your own scripts. 
  5   
  6  import os, sys, glob, math 
  7  import xml.sax.saxutils 
  8   
  9  #Helpers 
 10   
 11  #Temporary until we fix system profiler. 
12 -def RunCmdWithTimeout(cmd, timeout):
13 import subprocess, time 14 p = subprocess.Popen(cmd, stderr=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) 15 timerStep = 0.5 16 runTime = 0 17 while True: 18 if p.poll() is not None: 19 break 20 runTime += timerStep 21 if runTime > timeout: 22 p.terminate() 23 return -1 24 sys.exit(1) 25 time.sleep(timerStep) 26 return 0
27 #End temporary 28
29 -def ConvertUnits(s, token, multiplier):
30 if s.find(token) != -1: 31 realSpeed = s[:s.find(token)] 32 realSpeedFlt = float(realSpeed) 33 return str(int(realSpeedFlt * multiplier))
34
35 -def ConvertSpeedUnitsToMhZ(s):
36 p = ConvertUnits(s, "GHz", 1000) 37 if p != None: 38 return p 39 return str(int(math.ceil(float(s))))
40
41 -def ConvertMemSizeToKb(s):
42 p = ConvertUnits(s, "MB", 1000) 43 if p != None: 44 return p 45 p = ConvertUnits(s, "GB", 1000000) 46 if p != None: 47 return p 48 p = ConvertUnits(s, "Gb", 1000000) 49 if p != None: 50 return p 51 p = ConvertUnits(s, "kB", 1) 52 if p != None: 53 return p 54 p = ConvertUnits(s, "KB", 1) 55 if p != None: 56 return p 57 return s
58 59 60 #Standard Identifiers 61 gCPUSpeed = "CPUSpeed_Mhz" 62 gMachineName = "MachineName" 63 gNumCPUs = "NumCPUs" 64 gNumCores = "NumCores" 65 gRAM = "RAM_Kb" 66 gCPUType = "CPUType" 67 gOS = "OS" 68 gBusSpeed = "BusSpeed" 69 gL2Cache = "L2Cache_Kb" 70 gOSVersion = "OSVersion" 71 gKernelVersion = "KernelVersion" 72
73 -class HardwareInfo:
74 - def __init__(self):
75 self._standardDict = {} 76 self.gLogStr = "" 77 if sys.platform.lower() == "darwin": 78 self.SafeCall(self.initMac) 79 elif (sys.platform.lower() == "linux") | (sys.platform.lower() == "linux2"): 80 self.SafeCall(self.initLinux) 81 elif (sys.platform.lower().find("win") == 0): 82 self.SafeCall(self.initWin)
83
84 - def SafeCall(self, f, *positional, **keyword):
85 try: 86 return apply(f, positional, keyword) 87 except Exception, e: 88 self.gLogStr += "Function: calling " + f.__name__ + "\n" 89 self.gLogStr += "\tDescription:" + str(f.__doc__) + "\n" 90 self.gLogStr += "\tError:" + str(e) + "\n"
91
92 - def testCatCommand(self, info):
93 if not os.path.exists(info): 94 self.gLogStr += "Failed to find " + info 95 return False 96 return True
97
98 - def parseProcInfo(self, info):
99 success = self.testCatCommand(info) 100 if not success: 101 return ([{}], False) 102 import commands 103 cpuinfo = commands.getoutput("cat " + info) 104 return self.parseProcInfoStr(cpuinfo)
105
106 - def parseProcInfoStr(self, cpuinfo):
107 import copy 108 values = [] 109 currentDict = {} 110 for i in cpuinfo.split("\n"): 111 if i.find(":") == -1: 112 values.append(copy.deepcopy(currentDict)) 113 currentDict = {} 114 continue 115 tokens = i.split(":") 116 name = tokens[0] 117 value = ":".join(tokens[1:]) 118 currentDict[name.strip()] = value.strip() 119 if len(currentDict) != 0: 120 values.append(copy.deepcopy(currentDict)) 121 return (values, True)
122
123 - def parseCPUInfo(self):
124 (itemDict, success) = self.parseProcInfo("/proc/cpuinfo") 125 if success: 126 mapping = [["cpu MHz", gCPUSpeed], 127 ["model name", gCPUType], 128 ["cache size", gL2Cache]] 129 self.MapDictionaries(itemDict, self._standardDict, mapping, "/proc/cpuinfo") 130 self._standardDict[gL2Cache] = ConvertMemSizeToKb(self._standardDict[gL2Cache]) 131 self._standardDict[gCPUSpeed] = ConvertSpeedUnitsToMhZ(self._standardDict[gCPUSpeed]) 132 self._standardDict[gNumCores] = len(itemDict)
133
134 - def ParseProcVersion(self):
135 if self.testCatCommand("/etc/redhat-release"): 136 import commands 137 osVersion = commands.getoutput("cat /etc/redhat-release") 138 self._standardDict[gOSVersion] = osVersion 139 elif self.testCatCommand("/etc/lsb-release"): 140 import commands 141 lsbInfo = commands.getoutput("cat /etc/lsb-release") 142 for i in lsbInfo.split("\n"): 143 if i.find("DISTRIB_DESCRIPTION") != -1: 144 self._standardDict[gOSVersion] = i.split("=")[1].replace("\"","") 145 elif self.testCatCommand("/proc/version"): 146 import commands 147 osVersion = commands.getoutput("cat /proc/version") 148 start = osVersion.find(" #1 ") 149 osVersionShort = osVersion[:start] 150 token = osVersionShort.split("(")[-1] 151 self._standardDict[gOSVersion] = token.replace(")", "")
152
153 - def parseProcSimple(self, file, entry):
154 import commands 155 if os.path.exists(file): 156 self._standardDict[entry] = commands.getoutput("cat " + file) 157 else: 158 self.gLogStr += file + " doesn't exist.\n" 159 self._standardDict[entry] = "Unknown"
160
161 - def parseMemInfo(self):
162 (itemDict, success) = self.parseProcInfo("/proc/meminfo") 163 if success: 164 self.MapDictionaries(itemDict, self._standardDict, [["MemTotal", gRAM]], "/proc/meminfo") 165 self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM])
166
167 - def initLinux(self):
168 self._standardDict[gOS] = "linux" 169 self.SafeCall(self.parseCPUInfo) 170 self.SafeCall(self.ParseProcVersion) 171 self.SafeCall(self.parseProcSimple, "/proc/sys/kernel/osrelease", gKernelVersion) 172 self.SafeCall(self.parseProcSimple, "/proc/sys/kernel/hostname", gMachineName) 173 self.SafeCall(self.parseMemInfo)
174
175 - def getRegistryNumSubKeys(self, key, subkey):
176 try: 177 import _winreg 178 key = getattr(_winreg, key) 179 handle = _winreg.OpenKey(key, subkey) 180 return _winreg.QueryInfoKey(handle)[0] 181 except: 182 self.gLogStr += "Failed to find " + str(key) + ", " + str(subkey) + ", " + str(value) + "\n" 183 return "Unknown"
184
185 - def getRegistryValue(self, key, subkey, value):
186 try: 187 import _winreg 188 key = getattr(_winreg, key) 189 handle = _winreg.OpenKey(key, subkey) 190 (result, type) = _winreg.QueryValueEx(handle, value) 191 return result 192 except: 193 self.gLogStr += "Failed to find " + str(key) + ", " + str(subkey) + ", " + str(value) + "\n" 194 return "Unknown"
195
196 - def getWindowsRam(self):
197 try: 198 from psutil import virtual_memory 199 200 mem = virtual_memory() 201 memGb = mem.total * 0.000000001 202 203 return str(memGb) + " Gb" 204 except: 205 return "unable to import psutil - memory information disabled"
206
207 - def RunCmdWin(cmd):
208 import subprocess 209 process = subprocess.Popen(cmd, stderr=subprocess.STDOUT, stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) 210 process.stdin.close() 211 output = "" 212 while line != "": 213 line = process.stdout.readline() 214 output += line 215 return output
216
217 - def getWindowsL2Cache(self):
218 output = RunCmdWin("wmic cpu get L2CacheSize") 219 return output.split("\n")[-1]
220
221 - def getWindowsOSVersion(self):
222 def get(key): 223 return self.getRegistryValue("HKEY_LOCAL_MACHINE", "SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion", key)
224 os = get("ProductName") 225 sp = get("CSDVersion") 226 build = get("CurrentBuildNumber") 227 return "%s %s (build %s)" % (os, sp, build)
228
229 - def getWindowsMachineName(self):
230 return self.getRegistryValue("HKEY_LOCAL_MACHINE", 231 "SYSTEM\CurrentControlSet\Control\ComputerName\ComputerName", 232 "ComputerName")
233 - def initWin(self):
234 import ctypes 235 self._standardDict[gOS] = "win" 236 self._standardDict[gCPUType] = self.getRegistryValue("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "ProcessorNameString") 237 self._standardDict[gCPUSpeed] = self.getRegistryValue("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor\\0", "~MHz") 238 self._standardDict[gNumCores] = self.getRegistryNumSubKeys("HKEY_LOCAL_MACHINE", "HARDWARE\\DESCRIPTION\\System\\CentralProcessor") 239 self._standardDict[gRAM] = self.SafeCall(self.getWindowsRam) 240 self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM]) 241 self._standardDict[gOSVersion] = self.SafeCall(self.getWindowsOSVersion) 242 self._standardDict[gL2Cache] = self.SafeCall(self.getWindowsL2Cache) 243 self._standardDict[gMachineName] = self.SafeCall(self.getWindowsMachineName)
244
245 - def MapDictionaries(self, originalDict, addTo, mapping, name):
246 for i in mapping: 247 try: 248 addTo[i[1]] = originalDict[0][i[0]] 249 except: 250 self.gLogStr += "Failed to find key: " + i[0] + " in " + name + "\n" 251 addTo[i[1]] = "Unknown"
252
253 - def initMac(self):
254 """Initialises the object for mac - relies on system_profiler being in the path""" 255 self._standardDict[gOS] = "mac" 256 import tempfile 257 (handle, tmpPList) = tempfile.mkstemp() 258 try: 259 os.close(handle) 260 status = RunCmdWithTimeout("system_profiler -xml > " + tmpPList, 300.0) 261 self.SafeCall(self.initMacFromFile, tmpPList) 262 finally: 263 os.remove(tmpPList)
264
265 - def initMacHardware(self, itemDicts):
266 mapping = [["current_processor_speed", gCPUSpeed], 267 ["physical_memory", gRAM], 268 ["number_processors", gNumCores], 269 ["cpu_type", gCPUType], 270 ["l2_cache", gL2Cache]] 271 272 self.MapDictionaries(itemDicts, self._standardDict, mapping, "SPHardwareDataType") 273 if self._standardDict[gL2Cache] == "Unknown": 274 fixMacChangingThingsMapping = [["l2_cache_share", gL2Cache]] 275 self.MapDictionaries(itemDicts, self._standardDict, fixMacChangingThingsMapping, "SPHardwareDataType") 276 self._standardDict[gCPUSpeed] = ConvertSpeedUnitsToMhZ(self._standardDict[gCPUSpeed]) 277 self._standardDict[gL2Cache] = ConvertMemSizeToKb(self._standardDict[gL2Cache]) 278 self._standardDict[gRAM] = ConvertMemSizeToKb(self._standardDict[gRAM]) 279 extendedMapping = [["boot_rom_version", gBootROMVersion], 280 ["machine_model", gMachineName]] 281 self.MapDictionaries(itemDicts, self._extendedDict, extendedMapping, "SPHardwareDataType")
282
283 - def initMacSoftware(self, itemDicts):
284 mapping = [["os_version", gOSVersion], 285 ["kernel_version", gKernelVersion], 286 ["local_host_name", gMachineName]] 287 self.MapDictionaries(itemDicts, self._standardDict, mapping, "SPSoftwareDataType")
288
289 - def printDict(self, pDict, handle, indentLevel = 1):
290 """ 291 outputs the elements from pDict in XML structure, nesting when an 292 element contains another dict 293 """ 294 for key in sorted(pDict): 295 elem = pDict[key] 296 handle.write(" " * indentLevel) 297 if type(elem) == dict: #nest 298 handle.write("<%s>\n" % key) 299 self.printDict(elem, handle, indentLevel +1, isEnv=True) 300 handle.write(" " * indentLevel) 301 handle.write("</%s>" % key) 302 else: #bottom level, just write the value of the key 303 if type(elem) == str or type(elem) == unicode: 304 elem = elem.encode('UTF-8') 305 #if isinstance(elem, str): 306 # elem = elem.replace(u"\u001B", "") 307 try: 308 for badch in u"<>&": #make sure anything with these is escaped, so XML doesn't screw up 309 if badch in unicode(elem) and not unicode(elem).startswith("<![CDATA"): 310 elem = "<![CDATA[\n%s\n]]>" % elem 311 for c in str(elem): 312 if ord(c) > 128: 313 raise UnicodeDecodeError 314 handle.write("<%(key)s>%(elem)s</%(key)s>" % { 315 "key": xml.sax.saxutils.escape(key), 316 "elem": elem 317 }) 318 except UnicodeDecodeError, e: 319 handle.write("<%(key)s>[--- unicode decoding problem ---]</%(key)s>" % {"key": key}) 320 handle.write("\n")
321
322 - def printXML(self, file, indentLevel = 1):
323 self.printDict(self._standardDict, file, indentLevel);
324
325 -def PrintMachineInfoToFile(file, indentLevel = 1):
326 info = HardwareInfo() 327 info.printXML(file, indentLevel)
328