advisory-circular/cleanbasestation.py

179 lines
4.5 KiB
Python

import multiprocessing
import os
import re
import shutil
import sqlite3
import sys
import time
def error(msg, *args):
sys.stderr.write((msg % args) + '\n')
sys.stderr.flush()
def print_usage():
print('%s: <src.sqb> <dest.sqb>')
def process_changes(db_path, queue):
db = sqlite3.connect(db_path, isolation_level=None)
# db.execute('pragma journal_mode=wal')
db.execute('BEGIN')
rec = queue.get()
num_executes = 0
start_time = time.time()
while rec != 'DONE':
sql = 'UPDATE Aircraft SET Type = ?, RegisteredOwners = ? where ModeS = ?'
db.execute(sql, (rec['Type'], rec['RegisteredOwners'], rec['ModeS']))
num_executes += 1
if num_executes == 100000:
db.commit()
end_time = time.time()
print('Writer: Processing %.1f records/sec' % (num_executes / (end_time - start_time)))
db.execute('BEGIN')
num_executes = 0
start_time = time.time()
rec = queue.get()
print('Writer: Finished')
db.commit()
db.close()
CITY_STATE_CLEAN_RE = re.compile(r' +- +[a-zA-Z0-9 ]+, [A-Za-z]{2}$')
# DO title-case these tokens.
TITLE_CASE = [
'AIR',
'CO',
'OF',
'AND',
'INC',
'ONE',
'TWO',
'FLI',
'HI',
'SAN'
]
# DO NOT title-case these tokens.
NOT_TITLE_CASE = [
'TIS-B'
]
TITLE_CASE_EXCEPTION_RE = re.compile('[0-9]')
RE_SUBSTITUTIONS = [
# "mcdonnell" -> "McDonnell"
[re.compile(r'\bmcdonnell\b', re.IGNORECASE), 'McDonnell'],
# "AS.350-B-1" -> AS 350 B1"
[re.compile(r'\bAS.?350.?B.?1'), 'AS 350 B1'],
[re.compile(r'\bAS.?350.?B.?2'), 'AS 350 B2'],
[re.compile(r'\bAS.?350.?B.?3'), 'AS 350 B3'],
# "AS.350" -> "AS 350"
[re.compile(r'\bAS.?350'), 'AS 350'],
# "Notar" -> NOTAR
[re.compile(r'\bnotar\b', re.IGNORECASE), 'NOTAR']
]
def do_re_substitutions(s):
new_s = s
for regex, replacement in RE_SUBSTITUTIONS:
new_s = regex.sub(replacement, new_s)
# if s != new_s:
# print([s, new_s])
return new_s
def contains_upper_and_lower(s):
return any(c.isupper() for c in s) and any(c.islower() for c in s)
def title_case(s):
if (contains_upper_and_lower(s) or
TITLE_CASE_EXCEPTION_RE.search(s) or
s in NOT_TITLE_CASE or
(len(s) <= 3 and s not in TITLE_CASE)):
return s
else:
return s.title()
def fix_type(s):
orig_s = s
if s is not None:
tokens = [p for p in s.split(' ') if p]
tokens = [title_case(t) for t in tokens]
s = ' '.join(tokens)
s = do_re_substitutions(s)
# if s != orig_s:
# print([orig_s, s])
return s
def fix_registered_owners(s):
if s is not None:
# Remove " - <city>, <state abbrev>' suffix.
s = CITY_STATE_CLEAN_RE.sub('', s)
# Coalesce multiple spaces.
tokens = [p for p in s.split(' ') if p]
tokens = [title_case(t) for t in tokens]
s = ' '.join(tokens)
return s
def fix_db(db_in_path, db_out_path):
queue = multiprocessing.Queue()
worker = multiprocessing.Process(
target=process_changes,
args=(db_out_path, queue))
db = sqlite3.connect(db_in_path)
db.row_factory = sqlite3.Row
worker.start()
num_records = 0
num_fixed_records = 0
for row in db.execute('select * from Aircraft'):
num_records += 1
ac_type = row['Type']
ac_regd_owners = row['RegisteredOwners']
new_ac_type = fix_type(ac_type)
# new_ac_regd_owners = fix_registered_owners(ac_regd_owners)
new_ac_regd_owners = ac_regd_owners
if new_ac_type != ac_type or new_ac_regd_owners != ac_regd_owners:
# print('%s -> %s' % (ac_type, new_ac_type))
num_fixed_records += 1
queue.put({'ModeS': row['ModeS'],
'Type': new_ac_type,
'RegisteredOwners': new_ac_regd_owners},
True,
10)
if num_records % 100000 == 0:
print('Reader: Processed %s records, fixed %s' % (num_records, num_fixed_records))
queue.put('DONE')
print('Reader: Finished')
worker.join()
db.close()
def main(args):
if len(args) != 2:
print_usage()
sys.exit(1)
db_in = args[0]
db_out = args[1]
try:
print('Copying %s to %s...' % (db_in, db_out))
shutil.copyfile(db_in, db_out)
fix_db(db_in, db_out)
except:
#os.remove(db_out)
raise
if __name__ == '__main__':
main(sys.argv[1:])