import multiprocessing import os import re import shutil import sqlite3 import sys import time def error(msg, *args): sys.stderr.write((msg % args) + '\n') sys.stderr.flush() def print_usage(): print('%s: ') def process_changes(db_path, queue): db = sqlite3.connect(db_path, isolation_level=None) # db.execute('pragma journal_mode=wal') db.execute('BEGIN') rec = queue.get() num_executes = 0 start_time = time.time() while rec != 'DONE': sql = 'UPDATE Aircraft SET Type = ?, RegisteredOwners = ? where ModeS = ?' db.execute(sql, (rec['Type'], rec['RegisteredOwners'], rec['ModeS'])) num_executes += 1 if num_executes == 100000: db.commit() end_time = time.time() print('Writer: Processing %.1f records/sec' % (num_executes / (end_time - start_time))) db.execute('BEGIN') num_executes = 0 start_time = time.time() rec = queue.get() print('Writer: Finished') db.commit() db.close() CITY_STATE_CLEAN_RE = re.compile(r' +- +[a-zA-Z0-9 ]+, [A-Za-z]{2}$') # DO title-case these tokens. TITLE_CASE = [ 'AIR', 'CO', 'OF', 'AND', 'INC', 'ONE', 'TWO', 'FLI', 'HI', 'SAN' ] # DO NOT title-case these tokens. NOT_TITLE_CASE = [ 'TIS-B' ] TITLE_CASE_EXCEPTION_RE = re.compile('[0-9]') RE_SUBSTITUTIONS = [ # "mcdonnell" -> "McDonnell" [re.compile(r'\bmcdonnell\b', re.IGNORECASE), 'McDonnell'], # "AS.350-B-1" -> AS 350 B1" [re.compile(r'\bAS.?350.?B.?1'), 'AS 350 B1'], [re.compile(r'\bAS.?350.?B.?2'), 'AS 350 B2'], [re.compile(r'\bAS.?350.?B.?3'), 'AS 350 B3'], # "AS.350" -> "AS 350" [re.compile(r'\bAS.?350'), 'AS 350'], # "Notar" -> NOTAR [re.compile(r'\bnotar\b', re.IGNORECASE), 'NOTAR'] ] def do_re_substitutions(s): new_s = s for regex, replacement in RE_SUBSTITUTIONS: new_s = regex.sub(replacement, new_s) # if s != new_s: # print([s, new_s]) return new_s def contains_upper_and_lower(s): return any(c.isupper() for c in s) and any(c.islower() for c in s) def title_case(s): if (contains_upper_and_lower(s) or TITLE_CASE_EXCEPTION_RE.search(s) or s in NOT_TITLE_CASE or (len(s) <= 3 and s not in TITLE_CASE)): return s else: return s.title() def fix_type(s): orig_s = s if s is not None: tokens = [p for p in s.split(' ') if p] tokens = [title_case(t) for t in tokens] s = ' '.join(tokens) s = do_re_substitutions(s) # if s != orig_s: # print([orig_s, s]) return s def fix_registered_owners(s): if s is not None: # Remove " - , ' suffix. s = CITY_STATE_CLEAN_RE.sub('', s) # Coalesce multiple spaces. tokens = [p for p in s.split(' ') if p] tokens = [title_case(t) for t in tokens] s = ' '.join(tokens) return s def fix_db(db_in_path, db_out_path): queue = multiprocessing.Queue() worker = multiprocessing.Process( target=process_changes, args=(db_out_path, queue)) db = sqlite3.connect(db_in_path) db.row_factory = sqlite3.Row worker.start() num_records = 0 num_fixed_records = 0 for row in db.execute('select * from Aircraft'): num_records += 1 ac_type = row['Type'] ac_regd_owners = row['RegisteredOwners'] new_ac_type = fix_type(ac_type) # new_ac_regd_owners = fix_registered_owners(ac_regd_owners) new_ac_regd_owners = ac_regd_owners if new_ac_type != ac_type or new_ac_regd_owners != ac_regd_owners: # print('%s -> %s' % (ac_type, new_ac_type)) num_fixed_records += 1 queue.put({'ModeS': row['ModeS'], 'Type': new_ac_type, 'RegisteredOwners': new_ac_regd_owners}, True, 10) if num_records % 100000 == 0: print('Reader: Processed %s records, fixed %s' % (num_records, num_fixed_records)) queue.put('DONE') print('Reader: Finished') worker.join() db.close() def main(args): if len(args) != 2: print_usage() sys.exit(1) db_in = args[0] db_out = args[1] try: print('Copying %s to %s...' % (db_in, db_out)) shutil.copyfile(db_in, db_out) fix_db(db_in, db_out) except: #os.remove(db_out) raise if __name__ == '__main__': main(sys.argv[1:])