前回の記事では、構想を練るところまでやった。
苦節一ヶ月、ついに実現し毎日自動でデータを集め、それが BigQuery にロードされるようになったので、苦労した点をつらつらと書いていく。
目次
- そもそもプロダクト(Pub/Sub)の役割がわかっていなかった
- CloudEvent ってなに?
- イベント駆動の Cloud Functions (Gen 2) はどうやってデプロイするの?
- BigQuery にはユニーク制約が無い!
- BigQuery は何でも入れればいいわけではない
1. Pub/Sub の役割
Cloud Scheduler については、まぁすぐに理解できた。 単に Cron を設定してその時間通りに Pub/Sub を起動するだけだからだ。
が、「Pub/Sub 起動する」ってなに???となってしまった。。。
Pub/Sub は、メッセージを生成するサービスを、それらのメッセージを処理するサービスと切り離す、非同期のスケーラブルなメッセージング サービスです。
これだけをはじめて読んだときは「???」としかならないと思うんだが、次の記事を読みつつ自分で手を動かしてみることで理解することができた:
【図解付き】Cloud Pub/Sub に概要や使い方についてわかりやすく解説 - KIYONO Engineer Blog
https://laboratory.kiyono-co.jp/69/gcp/
メッセージングサービスがないと……
Pub/Sub のようなプロダクトがない場合、データの「送信元」「受信先」がすべて1対1対応になってしまい、複雑かつ情報ロストのリスクが増えてしまう。 例の通り4:4の構成だと、4 * 4 = 16 通りの経路が生じるし、n 個の構成物があれば n^2 に増大してしまう。。。
メッセージングサービスがあると……
間に Pub/Sub を挟むことで、メッセージングサービスに管理を集中させることができる。 経路は一箇所にまとめられるし、Pub/Sub は機能したかだけを気にしていればよい。
ML4Keiba では
- Pub/Sub でトピックを作成する
- ただ作成するだけ
- 処理を仲介するインスタンスを定義するイメージが近い
- このトピックを指定して Cloud Functions 関数をデプロイ
- 購読する(Subscript する)のがこの関数
- 言い換えれば、指定トピックのサブスクライバーの一つとしてこの関数を指定する
- Cloud Scheduler から毎日定時に事前設定したメッセージを Pub/Sub のトピックに向けてパブリッシュする
こうして、Pub/Sub 上の指定トピックが受け取ったペイロードを、そのままサブスクライバーたちに送信することができた (メッセージのパブリッシュイベントを通じて CF 関数を発火させることができた)
勘違いしたこと
誤り → 「Pub/Sub は Cloud Scheduler からしか起動できない!」
正解 → 「Pub/Sub は 多様な Publisher から起動できる」
Cloud Scheduler はあくまで数ある Publisher の一つでしかなく、Google がお手軽 Cron として提供しているだけである。
Pub/Sub 環境は、Google Cloud Console、Cloud Shell、クライアント ライブラリ、REST API のいずれかを使用して設定できます。
2. CloudEvent とは
Q. Pub/Sub でメッセージを送れるのはわかったけど、Cloud Functions でどうやってそれを購読すればいいの? (Pub/Sub のサブスクライバーってどうすれば設定できるの?)
A. CloudEvent オブジェクトを引数として受け入れる CloudEvent 関数を作成する
mypy
で型付ける時にどないすんねん 💢
とブチ切れたが、別途 cloudevents
を入れてやればよかった。
オブジェクトにはプロパティとして attributes
と data
が含まれており、その data
内には、Pub/Sub がトリガーしたイベントでは(第2世代 Cloud Functions の場合)以下のようなオブジェクトが含まれている:
{
"message": {
// 注)data は base64 で encode されている → デコードしてやる必要あり
"data": string,
"attributes": {
string: string,
...
},
"messageId": string,
"publishTime": string,
"orderingKey": string
}
}
イベント ドリブン関数を作成する | Google Cloud Functions に関するドキュメントでも説明されているように、 Cloud Function (Gen 2) においてはすべて「CloudEvent 関数」を使用する。
Pub/Sub トリガーを CloudEvent 関数で使う場合、データのペイロードは MessagePublishedData
になる。
Cloud Pub/Sub のチュートリアル(第 2 世代) | Google Cloud Functions に関するドキュメントにもある通り、cloud_event.data["message"]["data"]
という形でアクセスする。
得られる文字列は base64
でエンコードされた文字列であることにも留意すること。
サンプルコード
- Python
- TypeScript
import base64
import functions_framework
from cloudevents.http import CloudEvent
# Triggered from a message on a Cloud Pub/Sub topic.
@functions_framework.cloud_event
def subscribe(cloud_event: CloudEvent) -> None:
# Your code here
# Access the CloudEvent data payload via cloud_event.data
# base64 encoded string
data: str = cloud_event.data["message"]["data"]
byte_string: bytes = base64.b64decode(data)
decoded_txt: str = byte_string.decode()
print(decoded_txt)
return
import functions, { CloudEvent } from '@google-cloud/functions-framework'
import { MessagePublishedData } from '@google/events/cloud/pubsub/v1/MessagePublishedData'
// Triggered from a message on a Cloud Pub/Sub topic.
functions.cloudEvent<MessagePublishedData>('subscribe', (cloudEvent) => {
// Your code here
// Access the CloudEvent data payload via cloudEvent.data
// base64 encoded string
data: string = cloud_event.data["message"]["data"]
byte_string = Buffer.from(data, 'base64')
decoded_txt = byte_string.toString()
concole.log(decoded_txt)
return
})
つまり、Pub/Sub でトリガーした Cloud Functions 関数は、subscribe()
以下を起点として実行すればよい。
cf. Cloud Pub/Sub のチュートリアル(第 2 世代) | Google Cloud Functions に関するドキュメント