netkeiba のデータをスクレイピングして LOD 化する(5)
まだ zenn.dev に記 事としてまとめることは出来ていないが,着々と自動化処理が作成できてきた.
一方で,エラーに関してもエッジケースが現れたのでその例外処理もプチプチやっていく.
自動化処理
dailyUpdate.py
をメインプログラムとして実行するのではなく,反復処理の一部として実行する.
すなわち,元日から大晦日まで一日ずつ 大晦日から元日まで一日ずつ1 dailyUpdate.py
に渡して 365 日処理ができるようにする.
firstDay = date(2021, 12, 31)
today = firstDay
loop_size = 365
for loop in range(loop_size):
print("now is ", today)
targetDate = sum([today.year * 10 ** 4, today.month * 10 ** 2, today.day])
updateDaily(targetDate) # int(YYYYMMDD) を渡す
today -= timedelta(days=1)
こまめにコミット
また,一年分も処理をすると得られるデータ数が膨大なものとなり,あとでコミットするのに支障をきたすかもしれない. そのため,一週間ごとにこまめにコミットさせる.
def makeCommands():
dt = datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")
git_add = ["git", "add", "."]
git_commit = ["git", "commit", "-m", f"Update: horse and race data || {dt}"]
git_gc = ["git", "gc", "--prune=all"]
git_push = ["git", "push"]
return [git_add, git_commit, git_gc, git_push]
def gitCommit():
for proc in makeCommands():
subprocess.run(proc, encoding="utf-8", stdout=subprocess.PIPE)
firstDay = date(2021, 12, 31)
today = firstDay
loop_size = 365
for loop in range(loop_size):
print("now is ", today)
targetDate = sum([today.year * 10 ** 4, today.month * 10 ** 2, today.day])
updateDaily(targetDate) # int(YYYYMMDD) を渡す
today -= timedelta(days=1)
# こまめにコミット
if loop % 7 == 6:
gitCommit()
ファイルの存在判定(重複リクエストをなくす)
さらに,処理を高速化する ために既存のファイルの有無によってリクエストを減らす.
horse_id
によってディレクトリ構造および HTTP リクエストが管理されているが,pathlib
によってこの ID を持つファイルへのパスオブジェクトを作成し,Path.exists()
で存在を確認する.
ファイルが無い場合に絞ってリクエストを行えば,処理が繰り返されるたびに 1 ループに必要な時間は減っていくことだろう.
from pathlib import Path
cwd = Path.cwd()
def getHorsePath(horse_id: str, dir="json") -> Path:
yyyy, xxxx, zz = horse_id[:4], horse_id[4:8], horse_id[8:]
filepath = cwd / "data" / "horse" / dir / "profile" / yyyy / xxxx / f"{zz}.json"
return filepath
def filteringDuplicated(horse_list: List[str]) -> List[str]:
unregistered_horse_id = [horse_id for horse_id in horse_list if not getHorsePath(horse_id).exists()]
return unregistered_horse_id
UnicodeDecodeError
Traceback (most recent call last):
File "python/temp.py", line 36, in <module>
updateDaily(targetDate)
File "/path/to/ML4Keiba/python/dailyUpdate.py", line 65, in main
processHorseData(horse_list=horse_list, limit=PARALLEL_LIMIT)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 503, in main
loop.run_until_complete(_run(horse_list, coro, limit))
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 491, in _run
responses = await tqdm.gather(*tasks) # wrapper for asyncio.gather
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 79, in gather
res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 79, in <listcomp>
res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
File "/usr/lib/python3.8/asyncio/tasks.py", line 619, in _wait_for_one
return f.result() # May raise f.exception().
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 76, in wrap_awaitable
return i, await f
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 468, in _bound_fetch
return await _fetch(session, horse_id, coro)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 455, in _fetch
return await coro(horse_id, res_top, res_ped)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 407, in coroutine
content_top = await res_top.text(encoding=ENCODING)
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1085, in text
return self._body.decode( # type: ignore[no-any-return,union-attr]
UnicodeDecodeError: 'euc_jp' codec can't decode byte 0xf9 in position 26711: illegal multibyte sequence
リクエストをバイナリデータから euc-jp
で読み込むときに,「髙﨑」が悪さをして UnicodeDecodeError
が起きてしまった.
https://qiita.com/inoory/items/aafe79384dbfcc0802cf#エラーハンドラを指定したときのデコード結果
いろいろ試したところ,surrogateescape
だとうまくいくときとそうでない場合があるようだ.
一方で backslashreplace
だと常に成功するが,\\xe3\\x81\\xb2\\xe3\\x82\\x89\\xe3\\x82
のような表記がデータ上に残ってしまう.
現状,うまく回避する方法が(自分では)よくわかっていないため,try-except
構文でエラーが起きるたびそれを回避する方法を取ることとする.
content_top = await res_top.text(encoding=ENCODING, errors="surrogateescape")
content_ped = await res_ped.text(encoding=ENCODING, errors="surrogateescape")
try:
meta = getHorseMeta(BeautifulSoup(content_top, "lxml"))
except Exception as e:
print(e, file=sys.stderr)
content_top = await res_top.text(encoding=ENCODING, errors="backslashreplace")
meta = getHorseMeta(BeautifulSoup(content_top, "lxml"))
ped = getHorsePed(BeautifulSoup(content_ped, "lxml"))
codecs --- codec レジストリと基底クラス — Python 3.10.0b2 ドキュメント とか Python でサロゲートペア -- ほっけの逆襲 - 今川館 あたりが正解なんじゃあないかな~~と思いつつ,とりあえず現段階ではデータ収集を優先する.
あとになって余裕ができてからデータクレンジングを行えばよいだろう(ヨシッ!👈😹)
getId()
getId()
で与えられた文字列から ID を引っこ抜いていたが,以外にもロバストネスが低いことがわかったため,処理を冗長化した.
正規表現を使うのもアリではあるのだが,プロバイダ側で ID の表記ルールを変更されたら追従しにくいため,別のアプローチを取った.
def getId(url: str) -> str:
if type(url) is int:
return str(url)
if type(url) is not str:
return None
res = url.strip("/").split("/")[-1]
if len(res) < 1:
return None
id_ = res.split("=")[-1]
return id_
とりあえず本日までの進捗はこんなところだろうか.
データクレンジングの課題については適宜 Issue を建てて後から参照し て直すことになるだろう. 未来の自分にぶん投げて申し訳ない……😢
Footnotes
-
遡っていくほうが,既知の ID と衝突しやすい(重複リクエストを減らしやすい)と考え直した(が,そこまでの違いはないかも?) ↩