api: put functions in a class
This commit is contained in:
parent
2eb84bd746
commit
c54f4f0fe7
285
scaruffi/api.py
285
scaruffi/api.py
|
@ -1,3 +1,4 @@
|
||||||
|
import logging
|
||||||
import re
|
import re
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
@ -7,8 +8,6 @@ import requests
|
||||||
import scaruffi.log
|
import scaruffi.log
|
||||||
|
|
||||||
|
|
||||||
LOG = None
|
|
||||||
|
|
||||||
SITE_URL = "https://scaruffi.com"
|
SITE_URL = "https://scaruffi.com"
|
||||||
GENERAL_INDEX = SITE_URL + "/music/groups.html"
|
GENERAL_INDEX = SITE_URL + "/music/groups.html"
|
||||||
RATINGS_DECADES = SITE_URL + "/ratings/{:02}.html"
|
RATINGS_DECADES = SITE_URL + "/ratings/{:02}.html"
|
||||||
|
@ -21,163 +20,153 @@ class Release:
|
||||||
year: int = 0 # Usually the release year, not the recording year.
|
year: int = 0 # Usually the release year, not the recording year.
|
||||||
|
|
||||||
|
|
||||||
def setup_logging(*args, **kwargs):
|
class ScaruffiApi:
|
||||||
global LOG
|
|
||||||
LOG = scaruffi.log.get_logger(*args, **kwargs)
|
|
||||||
|
|
||||||
|
def __init__(self, log_level=logging.WARNING):
|
||||||
|
self.log = scaruffi.log.get_logger("scaruffi", level=log_level)
|
||||||
|
|
||||||
def _get_page(url):
|
def _get_soup(self, url):
|
||||||
LOG.debug(f"GET {url}")
|
html = self._get_page(url)
|
||||||
try:
|
if not html:
|
||||||
response = requests.get(url)
|
|
||||||
except requests.exceptions.RequestException as exc:
|
|
||||||
LOG.error(f"An exception occured during HTTP GET: {exc}")
|
|
||||||
return None
|
|
||||||
sc = response.status_code
|
|
||||||
if sc != 200:
|
|
||||||
LOG.error(f"Server returned HTTP response {sc} to {url}.")
|
|
||||||
return None
|
|
||||||
return response.text
|
|
||||||
|
|
||||||
|
|
||||||
def _get_soup(url):
|
|
||||||
html = _get_page(url)
|
|
||||||
if not html:
|
|
||||||
return None
|
|
||||||
return BeautifulSoup(html, "html5lib")
|
|
||||||
|
|
||||||
|
|
||||||
def get_musicians(offset=0, limit=20):
|
|
||||||
"""Get a list of musicians, or None on error."""
|
|
||||||
soup = _get_soup(GENERAL_INDEX)
|
|
||||||
if not soup:
|
|
||||||
return None
|
|
||||||
# Semantic Web? Just find the fattest table.
|
|
||||||
mu_table = max(soup.find_all("table"), key=lambda t: len(t.text))
|
|
||||||
musicians = [a_tag.text for a_tag in mu_table.find_all("a")]
|
|
||||||
return musicians[offset : offset + limit]
|
|
||||||
|
|
||||||
|
|
||||||
def get_ratings(decade):
|
|
||||||
"""Get a dict of ratings to a release list for this decade.
|
|
||||||
|
|
||||||
The decade must be an integer in the [0, 99] range, or a full year
|
|
||||||
(1960 for example). Returns None on error.
|
|
||||||
"""
|
|
||||||
if 1900 <= decade:
|
|
||||||
decade %= 100
|
|
||||||
if not (0 <= decade < 100 and decade % 10 == 0):
|
|
||||||
LOG.error(f"Invalid decade value: {decade}.")
|
|
||||||
return None
|
|
||||||
soup = _get_soup(RATINGS_DECADES.format(decade))
|
|
||||||
if not soup:
|
|
||||||
return None
|
|
||||||
ratings_table = max(soup.find_all("table"), key=lambda t: len(t.text))
|
|
||||||
num_lists = len(ratings_table("ul"))
|
|
||||||
if num_lists == 1:
|
|
||||||
return _get_ratings_from_unique_list(ratings_table.ul)
|
|
||||||
else:
|
|
||||||
return _get_ratings_from_lists(ratings_table("ul"))
|
|
||||||
|
|
||||||
|
|
||||||
def _get_ratings_from_unique_list(messy_list):
|
|
||||||
"""Get ratings from decades where one list contains all ratings."""
|
|
||||||
ratings = {}
|
|
||||||
current_key = None
|
|
||||||
for tag in messy_list:
|
|
||||||
if isinstance(tag, NavigableString):
|
|
||||||
continue
|
|
||||||
# Get an entry for the current rating.
|
|
||||||
if tag.name == "li":
|
|
||||||
release = _parse_release(tag.text)
|
|
||||||
if not current_key:
|
|
||||||
LOG.critical(f"Found release {release} without rating.")
|
|
||||||
return None
|
|
||||||
ratings[current_key].append(release)
|
|
||||||
# Detect a new rating list.
|
|
||||||
# Do it after getting entries in tag due to bad HTML.
|
|
||||||
text = tag.text.strip()
|
|
||||||
if text:
|
|
||||||
rating = _match_rating(text.split()[-1])
|
|
||||||
if rating is not None:
|
|
||||||
current_key = rating
|
|
||||||
ratings[current_key] = []
|
|
||||||
return ratings
|
|
||||||
|
|
||||||
|
|
||||||
def _get_ratings_from_lists(lists):
|
|
||||||
"""Get ratings from several lists, one per rating."""
|
|
||||||
ratings = {}
|
|
||||||
for ul in lists:
|
|
||||||
rating_tag = ul.span
|
|
||||||
if rating_tag:
|
|
||||||
rating = _match_rating(rating_tag.text)
|
|
||||||
if rating is None:
|
|
||||||
LOG.critical("Failed to find rating tag in list.")
|
|
||||||
return None
|
return None
|
||||||
releases = [_parse_release(li.text) for li in ul("li")]
|
return BeautifulSoup(html, "html5lib")
|
||||||
ratings[rating] = releases
|
|
||||||
return ratings
|
|
||||||
|
|
||||||
|
def _get_page(self, url):
|
||||||
|
self.log.debug(f"GET {url}")
|
||||||
|
try:
|
||||||
|
response = requests.get(url)
|
||||||
|
except requests.exceptions.RequestException as exc:
|
||||||
|
self.log.error(f"An exception occured during HTTP GET: {exc}")
|
||||||
|
return None
|
||||||
|
sc = response.status_code
|
||||||
|
if sc != 200:
|
||||||
|
self.log.error(f"Server returned HTTP response {sc} to {url}.")
|
||||||
|
return None
|
||||||
|
return response.text
|
||||||
|
|
||||||
RATING_RE = re.compile(r"\s*(\d(.\d)?)/10\s*")
|
def get_musicians(self, offset=0, limit=20):
|
||||||
|
"""Get a list of musicians, or None on error."""
|
||||||
|
soup = self._get_soup(GENERAL_INDEX)
|
||||||
|
if not soup:
|
||||||
|
return None
|
||||||
|
# Semantic Web? Just find the fattest table.
|
||||||
|
mu_table = max(soup.find_all("table"), key=lambda t: len(t.text))
|
||||||
|
musicians = [a_tag.text for a_tag in mu_table.find_all("a")]
|
||||||
|
return musicians[offset : offset + limit]
|
||||||
|
|
||||||
|
def get_ratings(self, decade):
|
||||||
|
"""Get a dict of ratings to a release list for this decade.
|
||||||
|
|
||||||
def _match_rating(text):
|
The decade must be an integer in the [0, 99] range, or a full year
|
||||||
"""Try to match text as a rating and return the rating, or None."""
|
(1960 for example). Returns None on error.
|
||||||
if not text.strip():
|
"""
|
||||||
return None
|
if 1900 <= decade:
|
||||||
match = RATING_RE.match(text.strip())
|
decade %= 100
|
||||||
if match:
|
if not (0 <= decade < 100 and decade % 10 == 0):
|
||||||
return float(match.group(1))
|
self.log.error(f"Invalid decade value: {decade}.")
|
||||||
|
return None
|
||||||
|
soup = self._get_soup(RATINGS_DECADES.format(decade))
|
||||||
|
if not soup:
|
||||||
|
return None
|
||||||
|
ratings_table = max(soup.find_all("table"), key=lambda t: len(t.text))
|
||||||
|
num_lists = len(ratings_table("ul"))
|
||||||
|
if num_lists == 1:
|
||||||
|
return self._get_ratings_from_unique_list(ratings_table.ul)
|
||||||
|
else:
|
||||||
|
return self._get_ratings_from_lists(ratings_table("ul"))
|
||||||
|
|
||||||
|
def _get_ratings_from_unique_list(self, messy_list):
|
||||||
|
"""Get ratings from decades where one list contains all ratings."""
|
||||||
|
ratings = {}
|
||||||
|
current_key = None
|
||||||
|
for tag in messy_list:
|
||||||
|
if isinstance(tag, NavigableString):
|
||||||
|
continue
|
||||||
|
# Get an entry for the current rating.
|
||||||
|
if tag.name == "li":
|
||||||
|
release = self._parse_release(tag.text)
|
||||||
|
if not current_key:
|
||||||
|
self.log.critical(f"Release {release} without rating.")
|
||||||
|
return None
|
||||||
|
ratings[current_key].append(release)
|
||||||
|
# Detect a new rating list.
|
||||||
|
# Do it after getting entries in tag due to bad HTML.
|
||||||
|
text = tag.text.strip()
|
||||||
|
if text:
|
||||||
|
rating = self._match_rating(text.split()[-1])
|
||||||
|
if rating is not None:
|
||||||
|
current_key = rating
|
||||||
|
ratings[current_key] = []
|
||||||
|
return ratings
|
||||||
|
|
||||||
def _parse_release(entry):
|
def _get_ratings_from_lists(self, lists):
|
||||||
"""Fill a release fields using entry, as well as we can."""
|
"""Get ratings from several lists, one per rating."""
|
||||||
entry = entry.strip("\r\n :") # Remove bogus spaces and colons.
|
ratings = {}
|
||||||
parts = entry.split(": ")
|
for ul in lists:
|
||||||
if len(parts) == 1:
|
rating_tag = ul.span
|
||||||
LOG.info(f"No colon in {entry}, using both as artist and title.")
|
if rating_tag:
|
||||||
title_and_year = _parse_release_title_year(entry)
|
rating = self._match_rating(rating_tag.text)
|
||||||
if not title_and_year:
|
if rating is None:
|
||||||
return Release(title=entry)
|
self.log.critical("Failed to find rating tag in list.")
|
||||||
title, year = title_and_year
|
return None
|
||||||
artist = title
|
releases = [self._parse_release(li.text) for li in ul("li")]
|
||||||
else:
|
ratings[rating] = releases
|
||||||
# Usual case is 2 parts ("artist: title"), but in case one of them
|
return ratings
|
||||||
# contains ": " as well, assume that it is part of the title, not the
|
|
||||||
# artist name.
|
|
||||||
artist = parts[0]
|
|
||||||
title_and_year_str = parts[1].strip()
|
|
||||||
if len(parts) > 2:
|
|
||||||
title_and_year_str += ": " + ": ".join(parts[2:])
|
|
||||||
title_and_year = _parse_release_title_year(title_and_year_str)
|
|
||||||
if not title_and_year:
|
|
||||||
return Release(artist=artist, title=title_and_year_str)
|
|
||||||
title, year = title_and_year
|
|
||||||
return Release(artist=artist, title=title, year=year)
|
|
||||||
|
|
||||||
|
RATING_RE = re.compile(r"\s*(\d(.\d)?)/10\s*")
|
||||||
|
|
||||||
RATING_TITLE_AND_YEAR_RE = re.compile(r"(.+?)\s?\((\d{4})(?:-\d+)?\)")
|
def _match_rating(self, text):
|
||||||
|
"""Try to match text as a rating and return the rating, or None."""
|
||||||
|
if not text.strip():
|
||||||
|
return None
|
||||||
|
match = self.RATING_RE.match(text.strip())
|
||||||
|
if match:
|
||||||
|
return float(match.group(1))
|
||||||
|
|
||||||
|
def _parse_release(self, entry):
|
||||||
|
"""Fill a release fields using entry, as well as we can."""
|
||||||
|
entry = entry.strip("\r\n :") # Remove bogus spaces and colons.
|
||||||
|
parts = entry.split(": ")
|
||||||
|
if len(parts) == 1:
|
||||||
|
self.log.info(f"No colon in {entry}, using both as artist & title.")
|
||||||
|
title_and_year = self._parse_release_title_year(entry)
|
||||||
|
if not title_and_year:
|
||||||
|
return Release(title=entry)
|
||||||
|
title, year = title_and_year
|
||||||
|
artist = title
|
||||||
|
else:
|
||||||
|
# Usual case is 2 parts ("artist: title"), but in case one of them
|
||||||
|
# contains ": " as well, assume that it is part of the title, not
|
||||||
|
# the artist name.
|
||||||
|
artist = parts[0]
|
||||||
|
title_and_year_str = parts[1].strip()
|
||||||
|
if len(parts) > 2:
|
||||||
|
title_and_year_str += ": " + ": ".join(parts[2:])
|
||||||
|
title_and_year = self._parse_release_title_year(title_and_year_str)
|
||||||
|
if not title_and_year:
|
||||||
|
return Release(artist=artist, title=title_and_year_str)
|
||||||
|
title, year = title_and_year
|
||||||
|
return Release(artist=artist, title=title, year=year)
|
||||||
|
|
||||||
def _parse_release_title_year(title_and_year):
|
RATING_TITLE_AND_YEAR_RE = re.compile(r"(.+?)\s?\((\d{4})(?:-\d+)?\)")
|
||||||
"""Parse title and year in the approximate "title (year)" format.
|
|
||||||
|
|
||||||
In some instances, the year is actually a range of years, in the YYYY-YY
|
def _parse_release_title_year(self, title_year):
|
||||||
format. Sometimes there is no space between title and year."""
|
"""Parse title and year in the approximate "title (year)" format.
|
||||||
match = RATING_TITLE_AND_YEAR_RE.match(title_and_year)
|
|
||||||
if not match:
|
In some instances, the year is actually a range of years, in the YYYY-YY
|
||||||
LOG.error(f"Failed to split title and year in \"{title_and_year}\".")
|
format. Sometimes there is no space between title and year."""
|
||||||
return None
|
match = self.RATING_TITLE_AND_YEAR_RE.match(title_year)
|
||||||
groups = match.groups()
|
if not match:
|
||||||
if len(groups) != 2 or None in groups:
|
self.log.error(f"Failed to split title/year in \"{title_year}\".")
|
||||||
LOG.error(f"Failed to parse title and year in \"{title_and_year}\".")
|
return None
|
||||||
return None
|
groups = match.groups()
|
||||||
title, year = groups
|
if len(groups) != 2 or None in groups:
|
||||||
try:
|
self.log.error(f"Failed to parse title/year in \"{title_year}\".")
|
||||||
year = int(year)
|
return None
|
||||||
except ValueError:
|
title, year = groups
|
||||||
LOG.error(f"Failed to parse year string \"{year}\" as an integer.")
|
try:
|
||||||
year = 0
|
year = int(year)
|
||||||
return title, year
|
except ValueError:
|
||||||
|
self.log.error(f"Failed to parse \"{year}\" as an integer.")
|
||||||
|
year = 0
|
||||||
|
return title, year
|
||||||
|
|
|
@ -1,21 +1,25 @@
|
||||||
|
import logging
|
||||||
import unittest
|
import unittest
|
||||||
|
|
||||||
from scaruffi import api
|
from scaruffi.api import ScaruffiApi
|
||||||
|
|
||||||
|
|
||||||
class TestScaruffi(unittest.TestCase):
|
class TestScaruffi(unittest.TestCase):
|
||||||
|
|
||||||
def setUpClass():
|
def setUp(self):
|
||||||
api.setup_logging("test")
|
self.api = ScaruffiApi()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.api = None
|
||||||
|
|
||||||
def test_get_musicians(self):
|
def test_get_musicians(self):
|
||||||
musicians = api.get_musicians()
|
musicians = self.api.get_musicians()
|
||||||
self.assertEqual(len(musicians), 20)
|
self.assertEqual(len(musicians), 20)
|
||||||
|
|
||||||
def test_get_ratings(self):
|
def test_get_ratings(self):
|
||||||
self.assertIsNotNone(api.get_ratings(1960))
|
self.assertIsNotNone(self.api.get_ratings(1960))
|
||||||
self.assertIsNotNone(api.get_ratings(1970))
|
self.assertIsNotNone(self.api.get_ratings(1970))
|
||||||
self.assertIsNotNone(api.get_ratings(1980))
|
self.assertIsNotNone(self.api.get_ratings(1980))
|
||||||
self.assertIsNotNone(api.get_ratings(1990))
|
self.assertIsNotNone(self.api.get_ratings(1990))
|
||||||
self.assertIsNotNone(api.get_ratings(2000))
|
self.assertIsNotNone(self.api.get_ratings(2000))
|
||||||
self.assertIsNotNone(api.get_ratings(2010))
|
self.assertIsNotNone(self.api.get_ratings(2010))
|
||||||
|
|
Loading…
Reference in a new issue