Skip to main content

netkeiba のデータをスクレイピングして LOD 化する(6)

Kiai
@Ningensei848

ML4Keiba に関する昨年の記事でも書いたように、ローカルだけでなくクラウド側にデータを保存し、それらをクラウド上の DB におさめて分析あるいはサービス提供できるようにしたいと考えている。

そんな折、以下の書籍を読む機会があり、そこでおおよその方向性がつかめたのでそれをまとめる。

実践的データ基盤への処方箋〜 ビジネス価値創出のためのデータ・システム・ヒトのノウハウ 単行本(ソフトカバー) – 2021/12/11

https://amzn.to/3DMKKQS

一言で言えば、 Cloud Functions x GCS x BigQuery で DWH をつくる構想 といったところだろうか。

データレイクとデータウェアハウス(DWH)

まず、データレイクとは "Data Lake" であり、データソースという水源を一箇所に貯めておく湖(lake)のようなものであるらしい。 整っている必要は必ずしもなく、乱雑なデータをボンボコ入れていく場所を用意して必要になるかもしれないデータをすべて集積することに価値をおいている。 ロギングデータだったり複数部署から集めたデータだったり、たくさんあるし分析には必要だろうが、そのままでは使えないデータを一時的に(といいつつ半永久的に)保存しておくことになる。

データウェアハウス(DWH: Data Ware House)とは、データレイクに集めたデータを分析できる状態に整理して堆積したものであるようだ。 "Warehouse" で「倉庫」の意味であり、後に使うことを想定して収める際に整頓するという現実世界でのイメージと沿うだろうか。 また、「堆積」という語を使ったがこれには垂直方向のデータ圧縮すなわち「列指向圧縮」を意味している。 テキストや数値データをバイナリに変換し、Aggregation しやすいようにクエリ発行も専用のチューニングが施されているらしい。

Google: Cloud Storage と BigQuery

御存知の通り、GCS こと Google Cloud Storage は Google が誇る堅牢なクラウドストレージであり、前述したデータレイクとして利用する。 ここに貯めたデータから必要なものを抽出し大規模分析に活用するのが、データウェアハウスとしての BigQuery である。

AWS こと Amazon Web Services の S3:Simple Storage Service をストレージとして採用する場合もあるだろう。 (曰く、GCS は癖があるとか、AWS のほうがサポートが手厚い[?]からメインは AWS に置きたいらしい) その場合でも、BigQuery からデータを読み取って利用することができる。 もっとも、AWS にも同様の機能を持つAmazon Redshift があるらしいのだが……機能としてどちらのほうが優れているのだろうか? 知名度的には BigQuery に軍配が上がりそうだ。

データの読み込み

BigQuery は手元のファイルを直接読み込ませることにも対応しているが、今後サービス提供まで見据えてオペレーショナル DB も持つことを考えると、一旦はすべてを GCS に集積しておく方がいいだろう。 それを踏まえると、どうやって BigQuery から GCS のデータを読み取ればよいのだろうか。

Cloud Storage からの CSV データの読み込み | BigQuery | Google Cloud を見ると、Node.js および Python で BigQuery のクライアントライブラリが提供されており、適切に初期設定を施した後、GCS の URI を指定すればあとはよしなにやってくれるようだ。 しかし、これだと単一ファイルしかロードできず、フォルダ内の膨大なファイル群を処理するには面倒……と思っていたところ、*(アスタリスク)を用いることで指定フォルダのサブフォルダまで再帰的に探ってまとめてロードしてくれるらしい。

cf. Cloud Storage の URI でのワイルドカードの使用

だが、定期的に GCS へ更新データを入れた際に毎回フォルダ全体をアップロードしていては非効率だし重複データで溢れてしまう。 最初だけこの方法を用いて、更新データについては一つ一つ処理する必要がありそう……と思ったのも束の間、Cloud Functions にはGCS のイベントトリガーが実装されており、この内 google.cloud.storage.object.v1.finalized (ファイナライズ / 作成:オブジェクトの作成または上書き) の際に実行されるようにすれば良いことがわかった。

贅沢を言えば、このイベントで受け取ったファイルが「新規作成」なのか「既存ファイルの更新」なのかは判定したいところだ。 既存のものを BigQuery にロードしてしまうと、重複データが入り込んでしまい後顧の憂いとなりうる(杞憂かもしれないが)。

"Cloud Storage トリガの Cloud Function (第 2 世代) では、トリガの情報 (Cloud Storage にアップロードされたオブジェクトのパスやファイル名、サイズ等) が CloudEvent 形式 で渡され"1 てくるところまでは判明した。 しかし、このイベントで渡されるデータの中には「そのオブジェクトが更新されたものであるか」という確たる証明手段が含まれていない。 オブジェクトのプロパティに関する仕様説明を読むと、おそらく timeCreatedupdated 2 の一致を見ればよいはずだが、ごく短時間の間に GCS 側で metadata が更新されないとも限らない(更新されると updated の時刻がズレて更新ファイル判定されることになってしまう)。

「GCS にファイルをアップロードしてからファイナライズイベントが発生するまでの短時間では GCS がファイルオブジェクトのメタデータを更新することはない」という前提の元、 CloudEvent で返されるオブジェクトのプロパティ timeCreatedupdated とを単純に文字列として比較し、一致していれば新規ファイル、そうでなければ更新ファイルという判定方法を採用する。

読み込みに関する注意点

GCS ↔ Cloud Functions ↔ BigQuery という接続・自動化が実現しそうな目処がたった。

が、その前にさらに CSV / JSON データの内容について制限があるようだ。

cf. CSV データの読み込みの詳細 | BigQuery | Google Cloud

cf. JSON データの読み込みの詳細 | BigQuery | Google Cloud

関係ありそうなものを抜粋すると以下の通り:

CSV データまたは JSON データを読み込む場合、DATE 列の値に区切りとしてダッシュ(-)を使用し、YYYY-MM-DD(年-月-日)の形式にする必要があります。

JSON または CSV データを読み込む場合、TIMESTAMP 列のタイムスタンプ値の日付部分の区切りにはダッシュ(-)を使用し、日付は YYYY-MM-DD(年-月-日)の形式にする必要があります。タイムスタンプの時間部分 hh:mm:ss(時-分-秒)には、区切りとしてコロン(:)を使用します。

これらの制限を念頭に、ETL のうち "T" の部分を再度実装を見直す必要がありそうだ。


さらに、パーティショニングという概念にもぶつかった。 未だによくわかっていないが、取り敢えず大きすぎるテーブル一つで頑張るよりは複数に分けて(partition)やることで、

  1. データの管理や照会が簡単になる
  2. クエリのパフォーマンスを高める
  3. クエリによって読み取られるバイト数を減らしてコストを抑える

といったメリットが得られるらしい。

BigQuery の場合は時間ごとに区切ることが一般的だが、その粒度によっては「一日ごと」だったり「年ごと」だったりする。 本案件のような場合、前者だと小さすぎるし後者だと大きすぎるので、「月ごと」の分割を採用するのが良いと考えている。

参考:「列ベースの時間パーティショニング」の採用判断基準

cf. https://cloud.google.com/bigquery/docs/partitioned-tables#when_to_use_partitioning

次のシナリオでは、テーブルのパーティショニングを検討してください。

  • テーブルの一部のみをスキャンすることで、クエリのパフォーマンスを向上させる必要がある。
  • テーブル オペレーションが割り当てを超えており、テーブル オペレーションの範囲を特定のパーティション列の値に設定できる。
  • クエリを実行する前にクエリ費用を把握する必要がある

BigQuery では、パーティション分割テーブルにクエリを実行する前のクエリ費用の見積もりが提供されます。 パーティション分割テーブルをプルーニングすることでクエリ費用を見積もり、続いてクエリ ドライランを発行してクエリ費用を見積もります。


次のような場合は、テーブルをパーティショニングするのではなく、クラスタリングを検討してください。

  • パーティショニングで許容されるよりも、細かい粒度が必要。
  • 通常、クエリによって複数列に対するフィルタまたは集計が使用されている。
  • 1 つの列または列グループの値のカーディナリティが大きい。
  • クエリを実行する前に厳密な費用見積もりが必要ない場合。

このような場合、テーブル クラスタリングでは、ユーザー定義の並べ替えプロパティに基づいて特定の列のデータをクラスタリングすることで、クエリを高速化できます。

さらなる自動化を目指して

GCS にデータを集積すれば、それを検知して BigQuery に堆積してくれるところまでは実現できそうな見通しが立てられた。 次は、GCS へのデータ集積を自動化、もとい定期実行できるようにしたい。

GitHub Actions

まず考えたのは、これまで通り GitHub Actions で定期実行する方法だ。 時間的な上限があるとはいえ、それを超えない限りはいつまでも無料なのが最大の強みである。

さて、ファイル システムからオブジェクトをアップロードする | Cloud Storage | Google Cloudを見る限り、bucket.blob.upload_from_filename(source_file_name) あるいは bucket.blob.upload_from_string(data, content_type) を使用して、一つずつ地道にアップロードするしかないようだ。 (フォルダごとまとめてアップロード!とかはできない)

どうしてもまとめてアップロードしたい場合には、gsutil--recursive オプションを使えばフォルダごとアップロードできる。

gsutil がどのようにして料金が発生しているのか知る由もないが、JSON API でリクエスト(これはクラス A オペレーションなのでもっとも高めの料金となる)すると請求額がデカくなりそうでこわいという思いがある。 (まぁ 1000 回やってようやく $0.005 ~ $0.01 とかなので、たかが知れているといえばそうなのだが、ある程度は工夫して回数を減らしたいところである)

GCE

やっすい仮想マシンを借りてそこで Cron を仕込めば、時間上限もなく上述の gsutil によるアップロードがかんたんに実現できる。 マシンパワーこそ心配だが、リージョンを揃えればデータ転送の利用料金を0に抑えられるかもしれない

……と思ったけどやはりマシンパワーがかなり心もとない気がする。 予期しない変なところで地雷を踏みそうな、そういう嫌な予感しかしないのであんまり積極的に採用したくはない、最終手段として残しておくべきか……?

Cloud Scheduler x Pub/Sub trigger x Cloud Functions

横断的にサービスを利用する分、最も金が掛かりそうに感じるのがこのアプローチだ。 GCP において、Cron のように定期実行するアプローチは今の所これくらいしか提供されていない。 (あるいは提供されているとしても、この方法よりは割高であるようだ)

Cloud Functions の関数を作成する | Google Cloud Functions に関するドキュメントというページには関数の作成方法が Python でやる場合も、Node.js でやる場合も詳しく説明されている。

また、ヒントとコツなんていう記事も存在している。 これに従って実装するとパフォーマンスは上がり、料金は抑えられるだろう。

ローカル環境で関数が作成できたら、次はデプロイだ。 Cloud Functions の関数をデプロイする | Google Cloud Functions に関するドキュメントを読むと、gcloud CLI を使って長ったらしいコマンドを実行する必要がありそうなのがわかる。

これでは面倒だ。 いちいちオプションを打ちたくはないし、typo ばかり増えてミスに泣かされるだろう。 (gcloud CLI で実行した命令は Ctrl+C を受け付けない)

そこで、タスクランナーの出番だ。 python で書いているから、この場合は pyptoject.toml に書くのがいいだろう。 poetry run task deploy-function あたりにしておけば確実だ。 細かなオプションについては、後々考えることとする(環境変数はどうするのとか)。

SchedulerPub/Sub については、Pub/Sub を使用して Cloud ファンクションをトリガーする | Cloud Scheduler のドキュメント | Google Cloudを見ながらポチポチすればよい。


これで、Cloud Scheduler (Cron) ↔ Pub/Sub (Event Trigger) ↔ Cloud Functions (Extraction and Translation) ↔ GCS という形で定期的な自動実行が実現できそうな見通しが立てられたことになる。

先述の  Load 自動化部分と組み合わせれば、ETL のワークフローすべてが自動化できたことになる。 BigQuery を使いこなして、座りしままに餅を食いたいねぇ……

まとめ

書籍から得た語彙の確認からはじめて、BigQuery  と Cloud Storage の連携方法、Cloud Storage に自動で定期的にデータを集積する方法について、一から検討を重ねつつドキュメントを読み込んで全体設計を検討した。 サンプルコードには一切触れなかったが、参照先ドキュメントには豊富な事例がたくさんあるのでそれに倣うといいだろう。

次は、手元のデータ構造を見直しつつ、Cloud Functions の関数作成を行なって実装を完成に近づけよう。

Footnotes

  1. Cloud Storage トリガで Cloud Functions(2nd gen)を動かしてみた - G-gen Tech Blog

  2. どちらも datetime 型としてフォーマットされた文字列である