コンテンツにスキップ

Pub/Sub を触ってみた

概要

公式ドキュメントから

データを取り込んで配布するためのストリーミング分析とデータ統合パイプラインに使用されます。これは、サービスの統合を目的としたメッセージング指向のミドルウェア、または、タスクを同時に読み込むキューとしても使用されます。

サブスクリプションタイプは push、pull、BigQuery の 3 種類。それぞれのユースケースについて書かれたドキュメントがある。

push 型の Subscription にどの ServiceAccount を紐づけるかを設定できる。この SA の権限で Cloud Run 等の認証設定を行う。確認応答期限がタイムアウトのようなものでサブスクライバーからこの期間内に応答がなければ再試行ポリシーに則って再送する。再試行ポリシーはすぐに再送と指数バックオフの再送を選べる。メッセージはpublish_timeの情報を持っており、再送されてもこの値は変わらない。

デフォルトでは最低 1 回の配信保証となっており場合によっては 2 回以上配信されることもある。1 回限りの配信を有効にするオプションもプレビューとして提供されている。

Pull 型の Subscription

Pull 型には StreamingPull と単項 Pull の2 通りがある。これらは ack を返すことで処理の完了したことを示す。 StraimingPull は比較的スループットが高く、レイテンシを最小にしたい場合に選択される。単項 Pull は処理するメッセージの数を制限したい場合に選択される。メモリやリソースに不安がある場合は流量を制限できる。

扱うメッセージ処理には 2 種類あり、非同期 pull と同期 pull がある。リソースの状況に制約がある場合は単項 Pull API と同期モードで処理を行う。

StreamingPull flow control

flow control を使うと StreamingPull で一度に受信できるメッセージ数をコントロールできる。

def receive_messages_with_flow_control(
    project_id: str, subscription_id: str, timeout: Optional[float] = None
) -> None:
    """Receives messages from a pull subscription with flow control."""
    # [START pubsub_subscriber_flow_settings]
    from concurrent.futures import TimeoutError
    from google.cloud import pubsub_v1

    # TODO(developer)
    # project_id = "your-project-id"
    # subscription_id = "your-subscription-id"
    # Number of seconds the subscriber should listen for messages
    # timeout = 5.0

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(project_id, subscription_id)

    def callback(message: pubsub_v1.subscriber.message.Message) -> None:
        print(f"Received {message.data!r}.")
        message.ack()

    # Limit the subscriber to only have ten outstanding messages at a time.
    flow_control = pubsub_v1.types.FlowControl(max_messages=10)

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback, flow_control=flow_control
    )
    print(f"Listening for messages on {subscription_path}..\n")

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except TimeoutError:
            streaming_pull_future.cancel()  # Trigger the shutdown.
            streaming_pull_future.result()  # Block until the shutdown is complete.
    # [END pubsub_subscriber_flow_settings]

別プロジェクトの Topic と Subscription を関連付ける

プロジェクトをまたぐ Topic と Subscription を紐づけるには Subscription 作成時に Topic を指定し、Pub/Sub Subscriberの権限が必要になる。

1
2
3
4
5
gcloud pubsub subscriptions create project_a_subscription \
    --topic=project_b_topic \
    --topic-project=project_b \
    --push-auth-service-account=foo@project_a.iam.gserviceaccount.com \
    --push-endpoint=https://example.com

Subscription の有効期限

Subscription に有効期限を設けることができる。一時的に作成して消すのを忘れそうな時などに使う。

システムテスト

公式からシステムテストを行う例が紹介されている。出力されたログを確認することでメッセージが配信されていることを確認している。

Pub/Sub の指標を使った Compute Engine のスケール

こちらで紹介されてるように、Pub/Sub の指標を使って Compute Engine のスケール設定をできる。マネージドインスタンスグループを作成する際にスケールの指標を Pub/Sub の指標にすることで、メッセージの数に応じて Compute Engine のスケールできる。 Pub/Sub の指標以外にも CPU 使用率などの指標利用できる。複数の指標でスケールすることもでき、その場合は最大の数になるようにスケールする。