コンテンツにスキップ

Numaflow でストリーミング処理を行う

Numaflow とは

Numaflow は Kubernetes ネイティブなツールで、大規模な並列ストリーム処理を実行するためのフレームワーク。 Numaflow パイプラインは Kubernetes カスタムリソースとして実装され、1 つ以上の source、データ処理、および sink の Vertex から構成される。

Argo Workflows と同様に YAML でパイプラインを定義できる。

利用シーン

  • リアルタイムデータ分析アプリケーション
  • 異常検出、モニタリング、アラートなどのイベント駆動型アプリケーション
  • データ計測およびデータ移動などのストリーミングアプリケーション
  • ストリーミング方式で実行されるワークフロー

特徴

  • Kubernetes ネイティブ:Kubernetes を知っていれば、Numaflow の使用方法も理解しやすい
  • 言語に依存しない:好きなプログラミング言語を利用できる
  • 一度だけのセマンティクス:ポッドが再スケジュールまたは再起動されても、入力要素は失われず、重複しない
  • バックプレッシャー付きの自動スケーリング:各 Vertex は必要に応じてゼロから自動的にスケーリング

データの整合性保証

  • at-least-onceの配信
  • ニアリアルタイムのデータソースに対して一度だけのセマンティクスを提供

用語

パイプライン

  • パイプラインはデータ処理のジョブを表す
  • 次の要素からパイプラインを構成する
    • Vertex のリスト:データ処理タスクを定義する
    • edge のリスト:Vertex 間の関係を記述する。edge は 1 つの Vertex から複数の Vertex に移動する可能性があり、v0.10 では複数の Vertex から 1 つの Vertex に移動することも可能。これは Join と Cycles を介して可能な多対一の関係。
pipeline.yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: simple-pipeline
spec:
  vertices:
    - name: in
      source:
        generator:
          rpu: 5
          duration: 1s
    - name: cat
      udf:
        builtin:
          name: cat
    - name: out
      sink:
        log: {}
  edges:
    - from: in
      to: cat
    - from: cat
      to: out
pipelineの情報取得
kubectl get pipeline # or "pl" as a short name

Vertex

Vertex は Numaflow Pipeline の主要構成要素で、パイプライン仕様にリストとして定義され、各々がデータ処理タスクを表す。

vertexの情報取得
kubectl get vertex # または "vtx" を短縮名として使用

Vertex Types

  • Source: データを取り込むためのタイプ
  • Sink: 処理済みデータを転送するためのタイプ
  • UDF (User Defined Function): データ処理ロジックを定義するためのタイプ

Source

Source vertex は、source から Numaflow にデータを確実に読み込む役割を担う。Source vertex は、データを出力バッファに送る前に、データの変換やフォーマットが必要になることがある。watermark の追跡や遅延データの検出も行う。

次のような Source vertex がある。

  • Kafka
  • HTTP
  • Ticker
  • Nats
  • User Defined Source

ユーザー定義 source とは、プラットフォーム内蔵の source ではサポートされていないシステムからデータを読み込む必要がある場合に、ユーザーが Numaflow SDK を使って書き込むカスタム source のこと。 ユーザー定義 source は、正確に一度だけ読み取るための custom acknowledge 管理もサポートしている。

Sink

Sink はプラットフォームから出力された処理済みデータのエンドポイントとして機能する。 その後、外部システムやアプリケーションに送信される。 Sink の目的は、処理済みデータを最終目的地に配信すること。例えば、データベース、データウェアハウス、可視化ツール、アラートシステムなど。 source vertex とは異なり、パイプラインには複数の sink vertex を設定できる。

次のような Sink vertex がある。

  • Kafka
  • Log
  • Black Hole
  • User Defined Sink

ユーザー定義 sink とは、Numaflow SDK を使ってユーザーが作成できるカスタム sink のことで、処理したデータをシステムに出力したり、プラットフォーム内蔵の sink ではサポートされていない変換を行う必要がある場合に使用する。例えば、入力メッセージを処理した後、Elasticsearch をユーザー定義 sink として使用することで、処理したデータを保存し、データの検索や分析を行うことができる。

UDF(User Defined Functions)

pipeline は、source、sink、UDF(User Defined Functions)という複数の Vertex から構成される。

ユーザー定義関数(UDF)は、ユーザーがデータを変換するためにカスタムコードを実行できる Vertex である。UDF でのデータ処理はべき等である。 UDF は Vertex Pod のサイドカーコンテナとして実行され、受信したデータを処理する。 メイン・コンテナ(プラットフォーム・コード)とサイドカー・コンテナ(ユーザー・コード)間の通信は、Unix Domain Socket 上の gRPC を介して行われる。

ユーザーが実行できる処理には次の 2 種類がある。

  • map
  • reduce

map の例

公式が偶奇の判定を行う[例]を紹介している。パイプラインの概要は図の通り。

map

これを yaml で表すと次のようになる。condition はタグを指定して、そのタグが付いたデータのみを処理するように指定している。

pipeline.yaml
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: even-odd
spec:
  vertices:
    - name: in
      source:
        http: {}
    - name: even-or-odd
      scale:
        min: 1
      udf:
        container:
          # Tell the input number is even or odd, see https://github.com/numaproj/numaflow-go/tree/main/pkg/mapper/examples/even_odd
          image: quay.io/numaio/numaflow-go/map-even-odd:v0.5.0
    - name: even-sink
      scale:
        min: 1
      sink:
        # A simple log printing sink
        log: {}
    - name: odd-sink
      scale:
        min: 1
      sink:
        log: {}
    - name: number-sink
      scale:
        min: 1
      sink:
        log: {}
  edges:
    - from: in
      to: even-or-odd
    - from: even-or-odd
      to: even-sink
      conditions:
        tags:
          operator: or
          values:
            - even-tag
    - from: even-or-odd
      to: odd-sink
      conditions:
        tags:
          operator: or
          values:
            - odd-tag
    - from: even-or-odd
      to: number-sink
      conditions:
        tags:
          operator: or
          values:
            - odd-tag
            - even-tag

この UDF の実装はこちら

Auto Scaling

Numaflow は、Horizontal Pod Autoscaling と Vertical Pod Autoscaling の両方で scale out/in できる。

Horizontal Pod Autoscaling

Numaflow でサポートされている Horizontal Pod Autoscaling には次のようなものがある。

  • Numaflow Autoscaling
  • Kubernetes HPA
  • Third-party Autoscaling(KEDA など)

Numaflow Autoscaling

Numaflow は 0 から N までのオートスケーリング機能を提供しており、すべての UDF、sink、source vertex で利用可能。

example
apiVersion: numaflow.numaproj.io/v1alpha1
kind: Pipeline
metadata:
  name: my-pipeline
spec:
  vertices:
    - name: my-vertex
      scale:
        disabled: false # Optional, defaults to false.
        min: 0 # Optional, minimum replicas, defaults to 0.
        max: 20 # Optional, maximum replicas, defaults to 50.
        lookbackSeconds: 120 # Optional, defaults to 120.
        scaleUpCooldownSeconds: 90 # Optional, defaults to 90.
        scaleDownCooldownSeconds: 90 # Optional, defaults to 90.
        zeroReplicaSleepSeconds: 120 # Optional, defaults to 120.
        targetProcessingSeconds: 20 # Optional, defaults to 20.
        targetBufferAvailability: 50 # Optional, defaults to 50.
        replicasPerScale: 2 # Optional, defaults to 2.

Joins and Cycles

Numaflow パイプラインの edge は、複数の Vertex が 1 つの Vertex にメッセージを転送できるように定義できる。

Joins

Numaflow では、複数の Vertex から 1 つの Vertex にメッセージを転送できる。これは、Join と呼ばれる。 Join Vertex の導入により、ユーザーはパイプラインの冗長性を排除できる。多対一のデータフローをサポートし、同じジョブを実行する複数の Vertex を必要としない。

join を用いたパイプラインの例を 2 つ示す。

join example1 join example2

Cycles

Numaflow では、Vertex 間に循環を作成できる。これは、Cycle と呼ばれる。

cycle

Quick Start を試す

公式のドキュメントを参考に、Quick Start を進めてみた。 GKE Autopilot のクラスタを作成し、ドキュメントに従って各リソースを作成。

次のコマンドを実行して、https://localhost:8080/にアクセスすると、Numaflow の UI が表示される。 (補足: Cloud Shell で実行するとうまくできなかったので、開発マシンからコマンド実行した。)

gcloud container clusters get-credentials $CLUSTER_NAME --region asia-northeast1 --project $PROJECT \
 && kubectl port-forward --namespace numaflow-system $(kubectl get pod --namespace numaflow-system --selector="app.kubernetes.io/component=numaflow-ux,app.kubernetes.io/name=numaflow-ux,app.kubernetes.io/part-of=numaflow" --output jsonpath='{.items[0].metadata.name}') 8080:8443

numaflow

Web UI から yaml を編集してリソースを作成したり、編集、削除できる。 試しにinの部分の RPS を変更すると scale out して pod が増えることを確認できた。