import multiprocessing import os import re import shutil import sqlite3 import sys import time def error(msg, *args): sys.stderr.write((msg % args) + '\n') sys.stderr.flush() def print_usage(): print('%s: ') def process_changes(db_path, queue): db = sqlite3.connect(db_path, isolation_level=None) # db.execute('pragma journal_mode=wal') db.execute('BEGIN') rec = queue.get() num_executes = 0 start_time = time.time() while rec != 'DONE': sql = 'UPDATE Aircraft SET Type = ?, RegisteredOwners = ? where ModeS = ?' db.execute(sql, (rec['Type'], rec['RegisteredOwners'], rec['ModeS'])) num_executes += 1 if num_executes == 10000: db.commit() end_time = time.time() print('Writer: Processing %.1f records/sec' % (num_executes / (end_time - start_time))) db.execute('BEGIN') num_executes = 0 start_time = time.time() rec = queue.get() print('Writer: Finished') db.commit() db.close() CITY_STATE_CLEAN_RE = re.compile(r' +- +[a-zA-Z0-9 ]+, [A-Za-z]{2}$') TITLE_CASE = [ 'AIR', 'CO', 'OF', 'AND', 'INC', 'ONE', 'TWO', 'FLI', 'HI', 'SAN' ] NOT_TITLE_CASE = [ 'TIS-B' ] TITLE_CASE_EXCEPTION_RE = re.compile('[0-9]') def contains_upper_and_lower(s): return any(c.isupper() for c in s) and any(c.islower() for c in s) def title_case(s): if (contains_upper_and_lower(s) or TITLE_CASE_EXCEPTION_RE.search(s) or s in NOT_TITLE_CASE or (len(s) <= 3 and s not in TITLE_CASE)): return s else: return s.title() # TODO: MCDONNELL -> McDonnell def fix_type(s): if s is not None: tokens = [p for p in s.split(' ') if p] tokens = [title_case(t) for t in tokens] s = ' '.join(tokens) return s def fix_registered_owners(s): if s is not None: # Remove " - , ' suffix. s = CITY_STATE_CLEAN_RE.sub('', s) # Coalesce multiple spaces. tokens = [p for p in s.split(' ') if p] tokens = [title_case(t) for t in tokens] s = ' '.join(tokens) return s def fix_db(db_in_path, db_out_path): queue = multiprocessing.Queue() worker = multiprocessing.Process( target=process_changes, args=(db_out_path, queue)) db = sqlite3.connect(db_in_path) db.row_factory = sqlite3.Row worker.start() num_records = 0 num_fixed_records = 0 for row in db.execute('select * from Aircraft'): num_records += 1 ac_type = row['Type'] ac_regd_owners = row['RegisteredOwners'] new_ac_type = fix_type(ac_type) # new_ac_regd_owners = fix_registered_owners(ac_regd_owners) new_ac_regd_owners = ac_regd_owners if new_ac_type != ac_type or new_ac_regd_owners != ac_regd_owners: # print('%s -> %s' % (ac_type, new_ac_type)) num_fixed_records += 1 queue.put({'ModeS': row['ModeS'], 'Type': new_ac_type, 'RegisteredOwners': new_ac_regd_owners}, True, 10) if num_records % 10000 == 0: print('Reader: Processed %s records, fixed %s' % (num_records, num_fixed_records)) queue.put('DONE') print('Reader: Finished') worker.join() db.close() def main(args): if len(args) != 2: print_usage() sys.exit(1) db_in = args[0] db_out = args[1] try: print('Copying %s to %s...' % (db_in, db_out)) shutil.copyfile(db_in, db_out) fix_db(db_in, db_out) except: #os.remove(db_out) raise if __name__ == '__main__': main(sys.argv[1:])