Introduction
This article shows you how to maintain your own GeoIP database and to implement an API around it.
The Data
The data is publicly and freely available. It is provided by the five Regional Internet Registries (RIRs).
Each RIR regularly updates a big file containing the information we need. These files comply with the RIR statistics exchange format, which is parsable (with some tweaks) as a CSV with the pipe (|
) as the separator.
For each RIR, here are the files: (beware, they are a bit heavy)
We are interested in entries like this one (from the RIPE NCC file):
ripencc|FR|ipv4|2.0.0.0|1048576|20100712|allocated|...
This line tells us that the IPv4 block 2.0.0.0/12
is assigned to France (FR
). We know it is a /12
because the fifth column tells us how many IP addresses are in the block (1048576). The following formula gives you the CIDR mask: -log2(1048576) + 32 = 12
.
(IPv6 entries give the CIDR mask directly because of the astronomical numbers of IPv6 addresses you can have in one “little” block, e.g., 18,446,744,073,709,551,616 for a /64
.)
However, the number of addresses is useful here because we are only interested in the lowest IP address and the highest IP address. That is, we have:
- Lowest: 2.0.0.0
- Highest: 2.0.0.0 + 1048576 – 1 = 2.15.255.255
(Remember: IPv4 addresses are just signed 32-bit integers.)
So, we now know that any requested address within the range 2.0.0.0-2.15.255.255
is located in France.
We can build a list containing the lowest and the highest address for each block. The standard ipaddress
module is pretty handy for adding IP address and integers together.
Here is a simple parser in Python:
import csv
import ipaddress
import math
def size_to_cidr_mask(c):
""" c = 2^(32-m), m being the CIDR mask """
return int(-math.log2(c) + 32)
def parse_rir_file(filename):
with open(filename) as f:
rows = csv.reader(f, delimiter='|')
for r in rows:
try:
rir, country_code, ip_version, ip, mask, *_ = r
except ValueError:
continue
if ip == '*':
continue
if ip_version == 'ipv4':
length = int(mask)
addr = ipaddress.ip_address(ip)
yield {
'ip_low': addr,
'ip_high': addr + length - 1,
'rir': rir,
'country': country_code,
'range': ip+'/'+str(size_to_cidr_mask(length)),
}
The function parse_rir_file
returns an iterator for one RIR file ingested. We can merge all of them to have only one sequence containing the blocks for the entire World:
import itertools as it
data = list(it.chain(
parse_rir_file('delegated-ripencc-extended-latest'),
parse_rir_file('delegated-arin-extended-latest'),
parse_rir_file('delegated-apnic-extended-latest'),
parse_rir_file('delegated-afrinic-extended-latest'),
parse_rir_file('delegated-lacnic-extended-latest')
))
This may take a while depending on your hardware… (It takes several seconds on my laptop with an SSD) If you are curious, you can now count how many IPv4 blocks are in use 😉
Lookup
We have built our data, and now we have to sort it. We also need to build an index list on the ip_low
s (keys
), so we can perform a lookup on it and then retrieve the entire entry from data
.
data.sort(key=lambda r: r['ip_low'])
keys = [r['ip_low'] for r in data]
A naive approach consists in simply comparing each low
and high
against the requested IP address until low <= ip <= high
. However, our list is pretty huge! The worst case is when a non-assigned IP address is requested: we would walk through the entire list for nothing.
def naive_lookup(keys, target):
last_v = None
for k,v in enumerate(keys):
if last_v is None:
last_v = v
if last_v <= target < v:
return k
last_v = v
return None
We are not looking for a specific entry, but for a range in which the requested address fits. For this kind of work, a bisection method suits our needs. Python provides the bisect
module for this.
def lookup(ip):
ip = ipaddress.ip_address(ip)
if not ip.is_global or ip.is_multicast: # Check bogon return None
i = bisect.bisect_right(keys, ip)
entry = data[i-1]
assert(entry['ip_low'] <= ip <= entry['ip_high'])
return entry
That is, for the requested IP address, the operation bisect_right(keys, ip)
traverses the list efficiently to see where the address fits. However, we must ensure the address belongs to a block, hence the assert
.
API
Last step: make an API and host it. Here is a simple one using the microframework Bottle. The rir
module contains the functions we have defined above.
from bottle import Bottle, route, response
import ipaddress
import rir
app = Bottle()
def valid_ip(ip):
try:
ipaddress.ip_address(ip)
return True
except ValueError:
return False
@app.route('/<ip>')
def lookup_ip(ip):
if not valid_ip(ip):
response.status = 400
return {'error': 'Not a valid IPv4: %s' % ip}
entry = rir.lookup(ip)
if entry is None:
return {
'ip': ip,
'bogon': True,
}
return {
'ip': ip,
'ip_low': str(entry['ip_low']),
'ip_high': str(entry['ip_high']),
'rir': entry['rir'],
'country': entry['country'],
'range': entry['range'],
}
Example:
curl -s localhost:8080/83.167.62.189 | jq
{ "ip_low": "83.167.32.0", "range": "83.167.32.0/19", "rir": "ripencc", "country": "FR", "ip": "83.167.62.189", "ip_high": "83.167.63.255" }
暂无评论内容