179 lines
4.5 KiB
Python
179 lines
4.5 KiB
Python
import multiprocessing
|
|
import os
|
|
import re
|
|
import shutil
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
|
|
|
|
def error(msg, *args):
|
|
sys.stderr.write((msg % args) + '\n')
|
|
sys.stderr.flush()
|
|
|
|
|
|
def print_usage():
|
|
print('%s: <src.sqb> <dest.sqb>')
|
|
|
|
|
|
def process_changes(db_path, queue):
|
|
db = sqlite3.connect(db_path, isolation_level=None)
|
|
# db.execute('pragma journal_mode=wal')
|
|
db.execute('BEGIN')
|
|
rec = queue.get()
|
|
num_executes = 0
|
|
start_time = time.time()
|
|
while rec != 'DONE':
|
|
sql = 'UPDATE Aircraft SET Type = ?, RegisteredOwners = ? where ModeS = ?'
|
|
db.execute(sql, (rec['Type'], rec['RegisteredOwners'], rec['ModeS']))
|
|
num_executes += 1
|
|
if num_executes == 100000:
|
|
db.commit()
|
|
end_time = time.time()
|
|
print('Writer: Processing %.1f records/sec' % (num_executes / (end_time - start_time)))
|
|
db.execute('BEGIN')
|
|
num_executes = 0
|
|
start_time = time.time()
|
|
rec = queue.get()
|
|
print('Writer: Finished')
|
|
db.commit()
|
|
db.close()
|
|
|
|
|
|
CITY_STATE_CLEAN_RE = re.compile(r' +- +[a-zA-Z0-9 ]+, [A-Za-z]{2}$')
|
|
|
|
|
|
# DO title-case these tokens.
|
|
|
|
TITLE_CASE = [
|
|
'AIR',
|
|
'CO',
|
|
'OF',
|
|
'AND',
|
|
'INC',
|
|
'ONE',
|
|
'TWO',
|
|
'FLI',
|
|
'HI',
|
|
'SAN'
|
|
]
|
|
|
|
# DO NOT title-case these tokens.
|
|
|
|
NOT_TITLE_CASE = [
|
|
'TIS-B'
|
|
]
|
|
|
|
|
|
TITLE_CASE_EXCEPTION_RE = re.compile('[0-9]')
|
|
|
|
|
|
RE_SUBSTITUTIONS = [
|
|
# "mcdonnell" -> "McDonnell"
|
|
[re.compile(r'\bmcdonnell\b', re.IGNORECASE), 'McDonnell'],
|
|
# "AS.350-B-1" -> AS 350 B1"
|
|
[re.compile(r'\bAS.?350.?B.?1'), 'AS 350 B1'],
|
|
[re.compile(r'\bAS.?350.?B.?2'), 'AS 350 B2'],
|
|
[re.compile(r'\bAS.?350.?B.?3'), 'AS 350 B3'],
|
|
# "AS.350" -> "AS 350"
|
|
[re.compile(r'\bAS.?350'), 'AS 350'],
|
|
# "Notar" -> NOTAR
|
|
[re.compile(r'\bnotar\b', re.IGNORECASE), 'NOTAR']
|
|
]
|
|
|
|
def do_re_substitutions(s):
|
|
new_s = s
|
|
for regex, replacement in RE_SUBSTITUTIONS:
|
|
new_s = regex.sub(replacement, new_s)
|
|
# if s != new_s:
|
|
# print([s, new_s])
|
|
return new_s
|
|
|
|
def contains_upper_and_lower(s):
|
|
return any(c.isupper() for c in s) and any(c.islower() for c in s)
|
|
|
|
|
|
def title_case(s):
|
|
if (contains_upper_and_lower(s) or
|
|
TITLE_CASE_EXCEPTION_RE.search(s) or
|
|
s in NOT_TITLE_CASE or
|
|
(len(s) <= 3 and s not in TITLE_CASE)):
|
|
return s
|
|
else:
|
|
return s.title()
|
|
|
|
|
|
def fix_type(s):
|
|
orig_s = s
|
|
if s is not None:
|
|
tokens = [p for p in s.split(' ') if p]
|
|
tokens = [title_case(t) for t in tokens]
|
|
s = ' '.join(tokens)
|
|
s = do_re_substitutions(s)
|
|
# if s != orig_s:
|
|
# print([orig_s, s])
|
|
return s
|
|
|
|
|
|
def fix_registered_owners(s):
|
|
if s is not None:
|
|
# Remove " - <city>, <state abbrev>' suffix.
|
|
s = CITY_STATE_CLEAN_RE.sub('', s)
|
|
# Coalesce multiple spaces.
|
|
tokens = [p for p in s.split(' ') if p]
|
|
tokens = [title_case(t) for t in tokens]
|
|
s = ' '.join(tokens)
|
|
return s
|
|
|
|
|
|
def fix_db(db_in_path, db_out_path):
|
|
queue = multiprocessing.Queue()
|
|
worker = multiprocessing.Process(
|
|
target=process_changes,
|
|
args=(db_out_path, queue))
|
|
db = sqlite3.connect(db_in_path)
|
|
db.row_factory = sqlite3.Row
|
|
worker.start()
|
|
num_records = 0
|
|
num_fixed_records = 0
|
|
for row in db.execute('select * from Aircraft'):
|
|
num_records += 1
|
|
ac_type = row['Type']
|
|
ac_regd_owners = row['RegisteredOwners']
|
|
new_ac_type = fix_type(ac_type)
|
|
# new_ac_regd_owners = fix_registered_owners(ac_regd_owners)
|
|
new_ac_regd_owners = ac_regd_owners
|
|
if new_ac_type != ac_type or new_ac_regd_owners != ac_regd_owners:
|
|
# print('%s -> %s' % (ac_type, new_ac_type))
|
|
num_fixed_records += 1
|
|
queue.put({'ModeS': row['ModeS'],
|
|
'Type': new_ac_type,
|
|
'RegisteredOwners': new_ac_regd_owners},
|
|
True,
|
|
10)
|
|
if num_records % 100000 == 0:
|
|
print('Reader: Processed %s records, fixed %s' % (num_records, num_fixed_records))
|
|
queue.put('DONE')
|
|
print('Reader: Finished')
|
|
worker.join()
|
|
db.close()
|
|
|
|
|
|
def main(args):
|
|
if len(args) != 2:
|
|
print_usage()
|
|
sys.exit(1)
|
|
db_in = args[0]
|
|
db_out = args[1]
|
|
try:
|
|
print('Copying %s to %s...' % (db_in, db_out))
|
|
shutil.copyfile(db_in, db_out)
|
|
fix_db(db_in, db_out)
|
|
except:
|
|
#os.remove(db_out)
|
|
raise
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main(sys.argv[1:])
|