import wx import time import datetime import serial import platform import sqlite3 import initialization as ini from decimal import Decimal import locale #EXITMETHOD = ["Unknown","Completed","Stop Button","Severe Leak"] leak_template = """Test %s: Actual Leak: %s sccm Target Leak: %s Delta: %s""" flow_template = """Test %s: Actual Flow: %s sccm Target Flow: %s Delta: %s""" def connect_serial_and_open(s): """ This function will determine if thecomputer is running Windows or Linux, and set the Serial port appropriately. Then it will setup the serial port as 8-n-1 and 115200 baud. This function then checks if the serial port is not open, if it is not, it opens it and returns the serial port object. """ ser = serial.Serial() # Set up 15200 8-N-1 ser.port = s.COMPORT ser.baudrate = s.BAUDRATE ser.bytesize = s.BYTESIZE ser.parity = s.PARITY ser.stopbits = s.STOPBITS if not ser.isOpen(): ser.open() return ser #71E1011 T Start Streaming def run_test(wxObj,settings): ser = connect_serial_and_open(settings) test = 0 for run,exit in read_serial(ser,wxObj): test += 1 if exit == 1 or exit == 0: if isinstance(run,dict): runno = int(run['run_number']) _runs=('A','B','C','D') if runno < 4: for k,v in enumerate(_runs): if k == runno: tol,delta = passes(run,settings,wxObj,runno) delta = "%.2f" % delta lbl = leak_template % (v, str(round(float(run['sccm']),2)),tol,delta) wx.CallAfter(wxObj.leak1.SetLabel, lbl) else: for k,v in enumerate(_runs): if k+4 == runno: sccm = float(run['sccm']) tol,delta = passes(run,settings,wxObj,runno) sccm = intWithCommas(sccm) tol = intWithCommas(tol) delta= intWithCommas(delta) lbl = flow_template % (v, sccm,tol,delta) wx.CallAfter(wxObj.flow1.SetLabel, lbl) else: msg = None if exit == 2 and run == 'SB': msg = 'STOP BUTTON PRESSED' elif exit == 3 and run == 'SL': msg = 'SEVERE LEAK' elif exit == 4 and run == 'MISC': msg = 'BELOW PRESSURE' if msg: if test < 5: if test ==1: ini.clear(wxObj) wx.CallAfter( getattr(wxObj, 'lead%s' % test).SetLabel, msg) else: wx.CallAfter( getattr(wxObj, 'flow%s' % test - 4).SetLabel, msg) return None def read_serial(ser=None,wxObj=None): STOP = False ExitMethod = 0 if ser and ser.isOpen() and wxObj: buffer = '' run_number = -1 while not STOP: sent = False buffer = buffer + ser.read(1) last_list = buffer.partition('\r\n') if last_list[1]: last_received = last_list[0] output = parse_line(last_received) if output == "SB": STOP = True ExitMethod = 2 elif output == "SL": STOP = True ExitMethod = 3 elif output == "MISC": STOP = True ExitMethod = 4 elif output: if run_number < 0: ini.clear(wxObj) run_number +=1 output['run_number'] = run_number if run_number == 8: STOP = True ExitMethod = 1 yield output,ExitMethod continue # P12 is a dummy test to release pressure between 7 and 8, so we must ignore that. if not output['program_number'] == "P12": sent = True yield output,ExitMethod if output and not sent: try: yield output,ExitMethod except GeneratorExit: pass buffer = last_list[2] def parse_line(line_to_parse,last_program_number=None ): # strip all whitespace from end of line #if there remains anything, continue on. if line_to_parse: # actually store result as a list of strings, split by \t try: if "TTY Pressure Decay-L" in line_to_parse: return "MISC" result = line_to_parse.rstrip().split('\t') # check if the list is equal to 3, and that "R" is the 2nd item if len(result) == 3 and "R" in result[1]: # If R is the second item, split Result by spaces. l = result[2].split(' ') # Remove all blank items in the list l = [item for item in l if item] # There will be a summary as the last item, if this is so, # Skip the item and return None if l[4] == "SB": return "SB" if l[4] == "SL": return "SL" if l[5] == "LNK": return None # Prepare a dictionary with the keys and values # we yanked from the test out_result = {} out_result['port_number'] = l[0] out_result['program_number'] = l[1] out_result['run_datetime'] = datetime.datetime.strptime( "%s %s" % (l[2], l[3]), "%H:%M:%S.%f %m/%d/%y") out_result['evaluation'] = l[6] out_result['sccm'] = l[8] if out_result: return out_result except: pass def passes(run,settings,wxObj,runno): GREEN = wx.Bitmap(settings.DIR + '\\green.png') RED = wx.Bitmap(settings.DIR + '\\red.png') for i in range(0,9): if i > 9: raise AttributeError('Not sure what this all means, but I dont think we can continute') if i < 4: tol,delta,pass_or_fail=withinTolerance(run,settings,run['run_number']) color = GREEN if pass_or_fail else RED wx.CallAfter(getattr(wxObj, 'imgLeak%s' % i +1).SetBitmap, (color)) else: tol,delta,pass_or_fail=withinTolerance(run,settings,run['run_number']) color = GREEN if pass_or_fail else RED wx.CallAfter( getattr(wxObj,'imgFlow%s'% i -4).SetBitmap, (color)) return tol,delta def withinTolerance(run,settings,runno): tol = Decimal(settings.get_limits(runno,settings)) sccm = Decimal(run['sccm']) passes = False if runno < 4: delta = sccm - tol if sccm < tol: passes = True else: delta = tol - sccm if sccm > tol: passes = True delta = abs(delta) rundate = run['run_datetime'].date() runtime = str(run['run_datetime'].time()) if passes: passes_ar = "A" else: passes_ar = "R" results = [run['port_number'], run['program_number'], rundate, runtime, run['sccm'], passes_ar] conn = sqlite3.connect(settings.DB_FILE,detect_types=sqlite3.PARSE_DECLTYPES) c = conn.cursor() c.execute("INSERT INTO torch_record (port_number,program_number,test_date,test_time,measurement,result) VALUES(?,?,?,?,?,?)",results) conn.commit() c.close() return tol,delta,passes def intWithCommas(x): x = int(x) if type(x) not in [type(0), type(0L)]: raise TypeError("Parameter must be an integer.") if x < 0: return '-' + intWithCommas(-x) result = '' while x >= 1000: x, r = divmod(x, 1000) result = ",%03d%s" % (r, result) return "%d%s" % (x, result)