torchtester/TorchTester/logic.py

249 lines
8.5 KiB
Python
Raw Normal View History

2016-03-14 15:31:56 +00:00
import wx
import time
import datetime
import serial
import platform
import sqlite3
import initialization as ini
from decimal import Decimal
import locale
#EXITMETHOD = ["Unknown","Completed","Stop Button","Severe Leak"]
leak_template = """Test %s:
Actual Leak: %s sccm
Target Leak: %s
Delta: %s"""
flow_template = """Test %s:
Actual Flow: %s sccm
Target Flow: %s
Delta: %s"""
def connect_serial_and_open(s):
"""
This function will determine if thecomputer is running Windows
or Linux, and set the Serial port appropriately.
Then it will setup the serial port as 8-n-1 and 115200 baud.
This function then checks if the serial port is not open,
if it is not, it opens it and returns the serial port object.
"""
ser = serial.Serial()
# Set up 15200 8-N-1
ser.port = s.COMPORT
ser.baudrate = s.BAUDRATE
ser.bytesize = s.BYTESIZE
ser.parity = s.PARITY
ser.stopbits = s.STOPBITS
if not ser.isOpen():
ser.open()
return ser
#71E1011 T Start Streaming
def run_test(wxObj,settings):
ser = connect_serial_and_open(settings)
test = 0
for run,exit in read_serial(ser,wxObj):
test += 1
if exit == 1 or exit == 0:
if isinstance(run,dict):
runno = int(run['run_number'])
_runs=('A','B','C','D')
if runno < 4:
for k,v in enumerate(_runs):
if k == runno:
tol,delta = passes(run,settings,wxObj,runno)
delta = "%.2f" % delta
lbl = leak_template % (v, str(round(float(run['sccm']),2)),tol,delta)
wx.CallAfter(wxObj.leak1.SetLabel, lbl)
else:
for k,v in enumerate(_runs):
if k+4 == runno:
sccm = float(run['sccm'])
tol,delta = passes(run,settings,wxObj,runno)
sccm = intWithCommas(sccm)
tol = intWithCommas(tol)
delta= intWithCommas(delta)
lbl = flow_template % (v, sccm,tol,delta)
wx.CallAfter(wxObj.flow1.SetLabel, lbl)
else:
msg = None
if exit == 2 and run == 'SB':
msg = 'STOP BUTTON PRESSED'
elif exit == 3 and run == 'SL':
msg = 'SEVERE LEAK'
elif exit == 4 and run == 'MISC':
msg = 'BELOW PRESSURE'
if msg:
if test < 5:
if test ==1:
ini.clear(wxObj)
wx.CallAfter( getattr(wxObj, 'lead%s' % test).SetLabel, msg)
else:
wx.CallAfter( getattr(wxObj, 'flow%s' % test - 4).SetLabel, msg)
return None
def read_serial(ser=None,wxObj=None):
STOP = False
ExitMethod = 0
if ser and ser.isOpen() and wxObj:
buffer = ''
run_number = -1
while not STOP:
sent = False
buffer = buffer + ser.read(1)
last_list = buffer.partition('\r\n')
if last_list[1]:
last_received = last_list[0]
output = parse_line(last_received)
if output == "SB":
STOP = True
ExitMethod = 2
elif output == "SL":
STOP = True
ExitMethod = 3
elif output == "MISC":
STOP = True
ExitMethod = 4
elif output:
if run_number < 0:
ini.clear(wxObj)
run_number +=1
output['run_number'] = run_number
if run_number == 8:
STOP = True
ExitMethod = 1
yield output,ExitMethod
continue
# P12 is a dummy test to release pressure between 7 and 8, so we must ignore that.
if not output['program_number'] == "P12":
sent = True
yield output,ExitMethod
if output and not sent:
try:
yield output,ExitMethod
except GeneratorExit:
pass
buffer = last_list[2]
def parse_line(line_to_parse,last_program_number=None ):
# strip all whitespace from end of line
#if there remains anything, continue on.
if line_to_parse:
# actually store result as a list of strings, split by \t
try:
if "TTY Pressure Decay-L" in line_to_parse:
return "MISC"
result = line_to_parse.rstrip().split('\t')
# check if the list is equal to 3, and that "R" is the 2nd item
if len(result) == 3 and "R" in result[1]:
# If R is the second item, split Result by spaces.
l = result[2].split(' ')
# Remove all blank items in the list
l = [item for item in l if item]
# There will be a summary as the last item, if this is so,
# Skip the item and return None
if l[4] == "SB":
return "SB"
if l[4] == "SL":
return "SL"
if l[5] == "LNK":
return None
# Prepare a dictionary with the keys and values
# we yanked from the test
out_result = {}
out_result['port_number'] = l[0]
out_result['program_number'] = l[1]
out_result['run_datetime'] = datetime.datetime.strptime(
"%s %s" % (l[2], l[3]), "%H:%M:%S.%f %m/%d/%y")
out_result['evaluation'] = l[6]
out_result['sccm'] = l[8]
if out_result:
return out_result
except:
pass
def passes(run,settings,wxObj,runno):
GREEN = wx.Bitmap(settings.DIR + '\\green.png')
RED = wx.Bitmap(settings.DIR + '\\red.png')
for i in range(0,9):
if i > 9:
raise AttributeError('Not sure what this all means, but I dont think we can continute')
if i < 4:
tol,delta,pass_or_fail=withinTolerance(run,settings,run['run_number'])
color = GREEN if pass_or_fail else RED
wx.CallAfter(getattr(wxObj, 'imgLeak%s' % i +1).SetBitmap, (color))
else:
tol,delta,pass_or_fail=withinTolerance(run,settings,run['run_number'])
color = GREEN if pass_or_fail else RED
wx.CallAfter( getattr(wxObj,'imgFlow%s'% i -4).SetBitmap, (color))
return tol,delta
def withinTolerance(run,settings,runno):
tol = Decimal(settings.get_limits(runno,settings))
sccm = Decimal(run['sccm'])
passes = False
if runno < 4:
delta = sccm - tol
if sccm < tol:
passes = True
else:
delta = tol - sccm
if sccm > tol:
passes = True
delta = abs(delta)
rundate = run['run_datetime'].date()
runtime = str(run['run_datetime'].time())
if passes:
passes_ar = "A"
else:
passes_ar = "R"
results = [run['port_number'],
run['program_number'],
rundate,
runtime,
run['sccm'],
passes_ar]
conn = sqlite3.connect(settings.DB_FILE,detect_types=sqlite3.PARSE_DECLTYPES)
c = conn.cursor()
c.execute("INSERT INTO torch_record (port_number,program_number,test_date,test_time,measurement,result) VALUES(?,?,?,?,?,?)",results)
conn.commit()
c.close()
return tol,delta,passes
def intWithCommas(x):
x = int(x)
if type(x) not in [type(0), type(0L)]:
raise TypeError("Parameter must be an integer.")
if x < 0:
return '-' + intWithCommas(-x)
result = ''
while x >= 1000:
x, r = divmod(x, 1000)
result = ",%03d%s" % (r, result)
return "%d%s" % (x, result)