#!/usr/bin/env python3 #### Compute and plot distance ratios for SA1-polling-place correspondences #### For more info, see https://abjago.net/booth-geography/ import sqlite3 import math import subprocess import argparse import sys import os.path # > pip install geographiclib matplotlib numpy from geographiclib.geodesic import Geodesic import matplotlib as mpl import matplotlib.pyplot as plt import numpy as np DBFILE = "dist_ratios.sqlite" parser = argparse.ArgumentParser("Compute SA1-polling-place geographic correlations") parser_imports = parser.add_argument_group( title="data-import files (all required unless --no-import)" ) parser_imports.add_argument( "--polling-places", help="columns: `...,PollingPlaceID,...,Latitude,Longitude`" ) parser_imports.add_argument( "--sa1s-dists", help="columns: `year,state_ab,div_nm,SA1_id,pp_id,pp_nm,votes`" ) parser_imports.add_argument("--sa1-centroids", help="columns: `X,Y,SA1_7DIG16``") parser.add_argument( "--no-import", action="store_true", help="database is already prepared; no need to run imports", ) parser.add_argument( "--db", default=DBFILE, help=f"database file (will be created if it does not exist; default is `{DBFILE}`)" ) parser.add_argument("--graph-dir", help="folder to output graphs to") arguments = parser.parse_args() if arguments.no_import: pass elif not arguments.polling_places: print("error: missing argument --polling-places", file=sys.stderr) sys.exit(1) elif not arguments.sa1s_dists: print("error: missing argument --sa1s-dists", file=sys.stderr) sys.exit(1) elif not arguments.sa1_centroids: print("error: missing argument --sa1-centroids", file=sys.stderr) sys.exit(1) else: print("importing...", file=sys.stderr) sqlite_setup = f'.mode csv \n\ .import "{os.path.abspath(arguments.polling_places)}" Places \n\ .import "{os.path.abspath(arguments.sa1s_dists)}" Correlation \n\ .import "{os.path.abspath(arguments.sa1_centroids)}" Centroids \n\ CREATE TABLE Combine AS SELECT pp_id, CAST(Latitude as REAL) AS PLat, CAST(Longitude as REAL) AS PLon, SA1_id, CAST(Y as REAL) AS SLat, CAST(X as REAL) AS SLon, votes \n\ FROM Correlation, Places, Centroids WHERE Correlation.pp_id = Places.PollingPlaceID AND Correlation.SA1_id = Centroids.SA1_7DIG16; \n\ ALTER TABLE Combine ADD COLUMN distance_km REAL; \n\ DROP TABLE CENTROIDS;\n\ DROP TABLE CORRELATION;\n\ VACUUM;\n\ .save {os.path.abspath(arguments.db)} \n\ .quit\n' # we duplicate subprocess.run(["sqlite3"], input=bytes(sqlite_setup, "utf8")) ### ok back to run-unconditional def distkm(PLat, PLon, SLat, SLon): resp = Geodesic.WGS84.Inverse(PLat, PLon, SLat, SLon) # response is in metres # convert to km and clamp to be between 0.1 and 5000 km return min(max(resp["s12"] / 1000.0, 0.1), 5000.0) con = sqlite3.connect(os.path.abspath(arguments.db)) ### Distance #### con.create_function("distkm", 4, distkm) if not arguments.no_import: print("calculating distances...", file=sys.stderr) # PLat < 0 excludes special teams which coded at (0, 0) con.execute("UPDATE Combine SET distance_km = distkm(PLat, PLon, SLat, SLon)") # this is the computationally expensive one, commit here just in case con.commit() #### Views for later analysis #### con.execute( "CREATE VIEW IF NOT EXISTS SA1Mins AS \ SELECT SA1_id, pp_id, distance_km AS min_dist from Combine \ GROUP BY SA1_id HAVING distance_km = min(distance_km)" ) con.execute( "CREATE VIEW IF NOT EXISTS Ratiod AS \ SELECT Combine.pp_id AS pp_id, SA1_id, votes, (distance_km/min_dist) AS dist_ratio \ FROM Combine LEFT JOIN SA1Mins USING (SA1_id) WHERE PLat < 0" ) con.execute( "CREATE VIEW IF NOT EXISTS Named AS \ SELECT Places.DivisionNm AS div_nm, Places.PollingPlaceNm AS pp_nm, pp_id, SA1_id, votes, dist_ratio \ FROM Ratiod LEFT JOIN Places ON ratiod.pp_id = Places.PollingPlaceID \ ORDER BY dist_ratio ASC" ) #### Analysis by Divisions #### con.execute("DROP TABLE IF EXISTS DivPart") con.execute( "CREATE TABLE IF NOT EXISTS DivPart AS \ SELECT div_nm, pp_nm, pp_id, SA1_id, votes, dist_ratio,\ SUM(votes) OVER (PARTITION BY div_nm ORDER BY dist_ratio ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) sv \ FROM Named \ ORDER By div_nm, sv" ) ## ^^ making this a table speeds up execution by a LOT divisions = [] for row in con.execute("SELECT DISTINCT div_nm, max(sv) FROM DivPart GROUP BY div_nm"): divisions.append((row[0], row[1])) print("Division\t<=1x\t<=3x\t<=10x\tTotal") for div_nm, maxv in divisions: # curve approximation one = con.execute( "SELECT max(sv) FROM DivPart WHERE dist_ratio = 1.0 AND div_nm = ?", (div_nm,) ).fetchone()[0] tri = con.execute( "SELECT max(sv) FROM DivPart WHERE dist_ratio <= 3.0 AND div_nm = ?", (div_nm,) ).fetchone()[0] ten = con.execute( "SELECT max(sv) FROM DivPart WHERE dist_ratio <= 10.0 AND div_nm = ?", (div_nm,) ).fetchone()[0] print(div_nm, one, tri, ten, maxv, sep="\t") # print a figure if arguments.graph_dir: dr = [] sv = [] mv = float(maxv) for r in con.execute("SELECT sv, dist_ratio FROM DivPart WHERE div_nm = ?", (div_nm,)): sv.append(r[0] / mv) dr.append(r[1]) plt.axhline(1.0, c="#eee") plt.axhline(3.0, c="#eee") plt.axhline(10.0, c="#eee") plt.axvline(one / mv, c="#eee") plt.axvline(tri / mv, c="#eee") plt.axvline(ten / mv, c="#eee") plt.scatter(sv, dr, zorder=999) plt.semilogy() plt.ylabel("distance ratio") plt.xlabel("fraction of cumulative ordinary votes cast") plt.title(f"{div_nm} ({one}, {tri}, {ten}, {maxv})") plt.savefig(f"{arguments.graph_dir}/{div_nm}.png") plt.clf() con.commit() con.close() if arguments.graph_dir: out = """