• home
  • about
  • 全ての投稿
  • ソフトウェア・ハードウェアの設定のまとめ
  • 分析関連のまとめ
  • ヘルスケア関連のまとめ
  • 生涯学習関連のまとめ

gcp cloud pub/sub

date: 2021-03-13 excerpt: gcp cloud pub/subについて

tag: cloud pub/subgcp


gcp cloud pub/subについて

概要

  • グローバルメッセージングサービス
  • キューイングサービスのように使えるので、リアルタイムに動かすのがきつい場合に、非同期を許容することで動作させることができる
    • Cloud Functions, Cloud Runとの連携も可能である

よく使うコマンド

topicの作成

$ gcloud pubsub topics create ${TOPIC}

メッセージの送信

$ gcloud pubsub topics publish ${TOPIC} --message "hello"

サブスクリプションの作成

$ gcloud pubsub subscriptions create --topic ${TOPIC} ${SUBSCRIPT}

メッセージのpull

$ gcloud pubsub subscriptions pull --auto-ack ${SUBSCRIPT}

ローカルエミュレータ

起動

$ gcloud beta emulators pubsub start --project=<starry-lattice-256603>
  • --projectは実際に存在するプロジェクト名である必要がある

環境変数の設定

$ $(gcloud beta emulators pubsub env-init)

pythonでの使用例

publisher

from google.cloud import pubsub_v1

project_id = "starry-lattice-256603"
topic_id = "test-topic"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

for n in range(1, 10):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    future = publisher.publish(topic_path, data)
    print(future.result())

print(f"Published messages to {topic_path}.")

subscriber

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
project_id = "starry-lattice-256603"
subscription_id = "test-subscriber"
# Number of seconds the subscriber should listen for messages
timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message}.")
    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        streaming_pull_future.result()
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Cloud Pub/SubからCloud Functionsをトリガーする

概要

  • Subscription側でメッセージを受け取ったらFunctionsを起動することが可能
  • CUIでもデプロイできるが、Pub/Subの管理画面から、Functionsをトリガーすることもできる
    • CUIでうまくいかなった際に、WebUIから操作したところ、トリガーできた

手順

  • WebUI
    • Pub/Subの管理画面
    • トピックを選択
    • Functionsに紐づけたいトピックのハンバーガーボタンから、Cloud Functionをトリガーを選択し、functionsを初期化

参考

  • エミュレータを使用したローカルでのアプリのテスト/GoogleCloud
  • How to use Google Pub/Sub emulator locally to test Google Pub/Sub on Apple Silicon Chip (M1)
  • クライアント ライブラリを使用して Pub/Sub でメッセージをパブリッシュおよび受信する/GoogleCloud


cloud pub/subgcp Share Tweet