Skip to main content

netkeiba のデータをスクレイピングして LOD 化する(5)

Kiai

まだ zenn.dev に記事としてまとめることは出来ていないが,着々と自動化処理が作成できてきた.

一方で,エラーに関してもエッジケースが現れたのでその例外処理もプチプチやっていく.

自動化処理

dailyUpdate.py をメインプログラムとして実行するのではなく,反復処理の一部として実行する. すなわち,元日から大晦日まで一日ずつ 大晦日から元日まで一日ずつ1 dailyUpdate.py に渡して 365 日処理ができるようにする.

firstDay = date(2021, 12, 31)

today = firstDay

loop_size = 365

for loop in range(loop_size):
print("now is ", today)
targetDate = sum([today.year * 10 ** 4, today.month * 10 ** 2, today.day])
updateDaily(targetDate) # int(YYYYMMDD) を渡す
today -= timedelta(days=1)

こまめにコミット

また,一年分も処理をすると得られるデータ数が膨大なものとなり,あとでコミットするのに支障をきたすかもしれない. そのため,一週間ごとにこまめにコミットさせる.

def makeCommands():
dt = datetime.now(timezone(timedelta(hours=9))).strftime("%Y-%m-%d %H:%M:%S")
git_add = ["git", "add", "."]
git_commit = ["git", "commit", "-m", f"Update: horse and race data || {dt}"]
git_gc = ["git", "gc", "--prune=all"]
git_push = ["git", "push"]

return [git_add, git_commit, git_gc, git_push]


def gitCommit():
for proc in makeCommands():
subprocess.run(proc, encoding="utf-8", stdout=subprocess.PIPE)
firstDay = date(2021, 12, 31)

today = firstDay

loop_size = 365

for loop in range(loop_size):
print("now is ", today)
targetDate = sum([today.year * 10 ** 4, today.month * 10 ** 2, today.day])
updateDaily(targetDate) # int(YYYYMMDD) を渡す
today -= timedelta(days=1)
# こまめにコミット
if loop % 7 == 6:
gitCommit()

ファイルの存在判定(重複リクエストをなくす)

さらに,処理を高速化するために既存のファイルの有無によってリクエストを減らす. horse_id によってディレクトリ構造および HTTP リクエストが管理されているが,pathlib によってこの ID を持つファイルへのパスオブジェクトを作成し,Path.exists() で存在を確認する. ファイルが無い場合に絞ってリクエストを行えば,処理が繰り返されるたびに 1 ループに必要な時間は減っていくことだろう.

filter.py
from pathlib import Path

cwd = Path.cwd()


def getHorsePath(horse_id: str, dir="json") -> Path:

yyyy, xxxx, zz = horse_id[:4], horse_id[4:8], horse_id[8:]
filepath = cwd / "data" / "horse" / dir / "profile" / yyyy / xxxx / f"{zz}.json"

return filepath

def filteringDuplicated(horse_list: List[str]) -> List[str]:

unregistered_horse_id = [horse_id for horse_id in horse_list if not getHorsePath(horse_id).exists()]
return unregistered_horse_id

UnicodeDecodeError

UnicodeDecodeError: 'euc_jp' codec can't decode byte 0xf9
Traceback (most recent call last):
File "python/temp.py", line 36, in <module>
updateDaily(targetDate)
File "/path/to/ML4Keiba/python/dailyUpdate.py", line 65, in main
processHorseData(horse_list=horse_list, limit=PARALLEL_LIMIT)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 503, in main
loop.run_until_complete(_run(horse_list, coro, limit))
File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
return future.result()
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 491, in _run
responses = await tqdm.gather(*tasks) # wrapper for asyncio.gather
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 79, in gather
res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 79, in <listcomp>
res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
File "/usr/lib/python3.8/asyncio/tasks.py", line 619, in _wait_for_one
return f.result() # May raise f.exception().
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/tqdm/asyncio.py", line 76, in wrap_awaitable
return i, await f
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 468, in _bound_fetch
return await _fetch(session, horse_id, coro)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 455, in _fetch
return await coro(horse_id, res_top, res_ped)
File "/path/to/ML4Keiba/python/getHorseProfile.py", line 407, in coroutine
content_top = await res_top.text(encoding=ENCODING)
File "/path/to/ML4Keiba/.venv/lib/python3.8/site-packages/aiohttp/client_reqrep.py", line 1085, in text
return self._body.decode( # type: ignore[no-any-return,union-attr]
UnicodeDecodeError: 'euc_jp' codec can't decode byte 0xf9 in position 26711: illegal multibyte sequence

リクエストをバイナリデータから euc-jp で読み込むときに,「髙﨑」が悪さをして UnicodeDecodeError が起きてしまった.

https://qiita.com/inoory/items/aafe79384dbfcc0802cf#エラーハンドラを指定したときのデコード結果

いろいろ試したところ,surrogateescape だとうまくいくときとそうでない場合があるようだ. 一方で backslashreplace だと常に成功するが,\\xe3\\x81\\xb2\\xe3\\x82\\x89\\xe3\\x82 のような表記がデータ上に残ってしまう.

現状,うまく回避する方法が(自分では)よくわかっていないため,try-except 構文でエラーが起きるたびそれを回避する方法を取ることとする.

content_top = await res_top.text(encoding=ENCODING, errors="surrogateescape")
content_ped = await res_ped.text(encoding=ENCODING, errors="surrogateescape")

try:
meta = getHorseMeta(BeautifulSoup(content_top, "lxml"))
except Exception as e:
print(e, file=sys.stderr)
content_top = await res_top.text(encoding=ENCODING, errors="backslashreplace")
meta = getHorseMeta(BeautifulSoup(content_top, "lxml"))

ped = getHorsePed(BeautifulSoup(content_ped, "lxml"))

codecs --- codec レジストリと基底クラス — Python 3.10.0b2 ドキュメント とか Python でサロゲートペア -- ほっけの逆襲 - 今川館 あたりが正解なんじゃあないかな~~と思いつつ,とりあえず現段階ではデータ収集を優先する.

あとになって余裕ができてからデータクレンジングを行えばよいだろう(ヨシッ!👈😹)

getId()

getId() で与えられた文字列から ID を引っこ抜いていたが,以外にもロバストネスが低いことがわかったため,処理を冗長化した.

正規表現を使うのもアリではあるのだが,プロバイダ側で ID の表記ルールを変更されたら追従しにくいため,別のアプローチを取った.

def getId(url: str) -> str:

if type(url) is int:
return str(url)

if type(url) is not str:
return None

res = url.strip("/").split("/")[-1]
if len(res) < 1:
return None

id_ = res.split("=")[-1]

return id_

とりあえず本日までの進捗はこんなところだろうか.

データクレンジングの課題については適宜 Issue を建てて後から参照して直すことになるだろう. 未来の自分にぶん投げて申し訳ない……😢


  1. 遡っていくほうが,既知の ID と衝突しやすい(重複リクエストを減らしやすい)と考え直した(が,そこまでの違いはないかも?)