EventBridge Pipes の enrichment 機能を活用しよう – DynamoDB Streams の JSON を変換する

EventBridge Pipes の enrichment 機能を活用しよう – DynamoDB Streams の JSON を変換する
この記事をシェアする

本記事では、EventBridge Pipes の enrichment 機能(強化機能)を使用して、DynamoDB Streams から得られる DynamoDB 独自の JSON 形式を通常の JSON 形式に変換する方法を紹介します。

1. この記事の要約

  • DynamoDB Streams から得られるデータは、DynamoDB 独自の JSON 形式で出力される
  • EventBridge Pipes の enrichment 機能は、Lambda のコードを実行して、イベントを任意の形に変換できる
  • アプリケーションロジックと DynamoDB の仕様を分離できる
  • 変換処理を再利用可能な形で実装できる

2. はじめに

イベント駆動アプリではDynamoDB Streams+Lambdaの構成は頻繁に使われます。ただ、DynamoDBの癖の一つに、独特のデータ形式があると思います。(後述)

そこで、アプリコードがDynamoDBに依存せずに書ければ、保守性が向上すると考え、DynamoDBに依存する部分をEventBridge Pipesに分離してやろうというのがこの記事のアイデアになります。

2.1. DynamoDB Streams とは?

DynamoDB Streams は、DynamoDB テーブルの変更をリアルタイムでキャプチャする機能です。

主な特徴:

  • テーブルの変更(INSERT、UPDATE、DELETE)をリアルタイムで検知できる
  • 変更内容を DynamoDB 独自の JSON 形式で出力する
  • EventBridge Pipes などの AWS サービスと連携可能

2.2. EventBridge Pipes とは?

EventBridge Pipes は、AWS のサービス間でイベントを転送・変換するためのサービスです。

主な特徴:

  • サービス間のイベント転送を簡単に設定できる
  • enrichment 機能でイベントデータの変換が可能
  • フィルタリング機能で必要なイベントのみを処理できる

3. 実装例

3.1 アーキテクチャ

3.2 DynamoDB Streams のデータ形式

DynamoDB Streams から得られるデータは、以下のような形式になっています。

{
  "Records": [
    {
      "eventID": "1",
      "eventName": "INSERT",
      "dynamodb": {
        "NewImage": {
          "entityId": {
            "S": "RIDE#0196e2a5-62ed-70fb-ad78-bf493167a7e0"
          },
          "type": {
            "S": "RIDE_BOOKED"
          },
          "payload": {
            "M": {
              "userId": {
                "S": "USER#0196e2ec-42c3-76d0-b188-db2ab6b1e3b6"
              }
            }
          }
        }
      }
    }
  ]
}

3.3 変換後の JSON 形式

EventBridge Pipes の enrichment 機能を使用して、以下のような通常の JSON 形式に変換します

{
  "events": [
    {
      "entityId": "RIDE#0196e2a5-62ed-70fb-ad78-bf493167a7e0",
      "type": "RIDE_BOOKED",
      "payload": {
        "userId": "USER#0196e2ec-42c3-76d0-b188-db2ab6b1e3b6"
      }
    }
  ]
}

3.4 変換処理の実装

変換処理は、TypeScript のコードで簡単に実装しました。

このコードを Lambda にデプロイして、EventBridge Pipes の enrichment 機能で実行します。

import { AttributeValue } from "@aws-sdk/client-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { DynamoDBRecord } from "aws-lambda";

type MyEvent = {
  events: Record<string, unknown>[];
};

export const handler = async (
  event: DynamoDBRecord[]  // 注意:DynamoDB StreamsとLambdaイベントソースマッピングを統合した時と型が異なる
): Promise<MyEvent | {}> => {
  return {
    events: event
      .map((record) => {
        if (record.dynamodb?.NewImage) {
          // TypeScriptの場合、aws-sdkのunmarshallで一発変換できる
          return unmarshall(
            // aws-lambdaとaws-sdkで型の互換性がないようなので、型アサーションが必要
            record.dynamodb.NewImage as Record<string, AttributeValue>
          );
        } else {
          return undefined;
        }
      })
      .filter((v) => v !== undefined),  // NewImageがない場合は、ターゲットを起動しないようにする
  };
};

4. 他の方法との比較

4.1 コンシューマ側のアプリで変換する場合との比較

コンシューマ側のアプリで変換する方法もありますが、その方法に比べて Pipes の enrichment 機能には次のような特徴があります。

  • 🙆 アプリケーションロジックと DynamoDB の仕様を分離できる
  • 🙆 変換処理を再利用可能な形で実装できる
  • 🙅 AWS リソースが増える

アプリケーション側がレガシーアプリで手を加えにくいときなど、Pipes の enrichment 機能で役割を分離するのは特に有用かもしれませんね。

4.2 そもそもメッセージングに DynamoDB Streams を使わない

これは DynamoDB Streams を使う意義の問題になるかと思います。

  • 🙆 DynamoDB Streams を使うことで、DynamoDB とメッセージングとの間に不整合が起こらない
    • DynamoDB への書き込みと、メッセージングへの書き込みがアトミックになります

DB 書き込みとメッセージングの間に不整合が起こる問題や、それを解決する「アウトボックスパターン」については、次の AWS のドキュメントに詳しく書かれています。

さいごに

  • EventBridge Pipes の enrichment 機能を使用して、DynamoDB Streams の JSON を変換する方法を実装した
  • アプリケーションロジックと DynamoDB の仕様を分離でき、よりクリーンな設計を実現できた
  • 変換処理を再利用可能な形で実装でき、メンテナンス性が向上した

実装例(CDKコード)は以下の GitHub リポジトリで公開しています

この記事をシェアする
著者:酒井亮太郎
シナモロール