EventBridge Pipes の enrichment 機能を活用しよう – DynamoDB Streams の JSON を変換する

目次
本記事では、EventBridge Pipes の enrichment 機能(強化機能)を使用して、DynamoDB Streams から得られる DynamoDB 独自の JSON 形式を通常の JSON 形式に変換する方法を紹介します。
1. この記事の要約
- DynamoDB Streams から得られるデータは、DynamoDB 独自の JSON 形式で出力される
- EventBridge Pipes の enrichment 機能は、Lambda のコードを実行して、イベントを任意の形に変換できる
- アプリケーションロジックと DynamoDB の仕様を分離できる
- 変換処理を再利用可能な形で実装できる
2. はじめに
イベント駆動アプリではDynamoDB Streams+Lambdaの構成は頻繁に使われます。ただ、DynamoDBの癖の一つに、独特のデータ形式があると思います。(後述)
そこで、アプリコードがDynamoDBに依存せずに書ければ、保守性が向上すると考え、DynamoDBに依存する部分をEventBridge Pipesに分離してやろうというのがこの記事のアイデアになります。
2.1. DynamoDB Streams とは?
DynamoDB Streams は、DynamoDB テーブルの変更をリアルタイムでキャプチャする機能です。
主な特徴:
- テーブルの変更(INSERT、UPDATE、DELETE)をリアルタイムで検知できる
- 変更内容を DynamoDB 独自の JSON 形式で出力する
- EventBridge Pipes などの AWS サービスと連携可能
2.2. EventBridge Pipes とは?
EventBridge Pipes は、AWS のサービス間でイベントを転送・変換するためのサービスです。
主な特徴:
- サービス間のイベント転送を簡単に設定できる
- enrichment 機能でイベントデータの変換が可能
- フィルタリング機能で必要なイベントのみを処理できる
3. 実装例
3.1 アーキテクチャ

3.2 DynamoDB Streams のデータ形式
DynamoDB Streams から得られるデータは、以下のような形式になっています。
{
"Records": [
{
"eventID": "1",
"eventName": "INSERT",
"dynamodb": {
"NewImage": {
"entityId": {
"S": "RIDE#0196e2a5-62ed-70fb-ad78-bf493167a7e0"
},
"type": {
"S": "RIDE_BOOKED"
},
"payload": {
"M": {
"userId": {
"S": "USER#0196e2ec-42c3-76d0-b188-db2ab6b1e3b6"
}
}
}
}
}
}
]
}
3.3 変換後の JSON 形式
EventBridge Pipes の enrichment 機能を使用して、以下のような通常の JSON 形式に変換します
{
"events": [
{
"entityId": "RIDE#0196e2a5-62ed-70fb-ad78-bf493167a7e0",
"type": "RIDE_BOOKED",
"payload": {
"userId": "USER#0196e2ec-42c3-76d0-b188-db2ab6b1e3b6"
}
}
]
}
3.4 変換処理の実装
変換処理は、TypeScript のコードで簡単に実装しました。
このコードを Lambda にデプロイして、EventBridge Pipes の enrichment 機能で実行します。
import { AttributeValue } from "@aws-sdk/client-dynamodb";
import { unmarshall } from "@aws-sdk/util-dynamodb";
import { DynamoDBRecord } from "aws-lambda";
type MyEvent = {
events: Record<string, unknown>[];
};
export const handler = async (
event: DynamoDBRecord[] // 注意:DynamoDB StreamsとLambdaイベントソースマッピングを統合した時と型が異なる
): Promise<MyEvent | {}> => {
return {
events: event
.map((record) => {
if (record.dynamodb?.NewImage) {
// TypeScriptの場合、aws-sdkのunmarshallで一発変換できる
return unmarshall(
// aws-lambdaとaws-sdkで型の互換性がないようなので、型アサーションが必要
record.dynamodb.NewImage as Record<string, AttributeValue>
);
} else {
return undefined;
}
})
.filter((v) => v !== undefined), // NewImageがない場合は、ターゲットを起動しないようにする
};
};
4. 他の方法との比較
4.1 コンシューマ側のアプリで変換する場合との比較
コンシューマ側のアプリで変換する方法もありますが、その方法に比べて Pipes の enrichment 機能には次のような特徴があります。
- 🙆 アプリケーションロジックと DynamoDB の仕様を分離できる
- 🙆 変換処理を再利用可能な形で実装できる
- 🙅 AWS リソースが増える
アプリケーション側がレガシーアプリで手を加えにくいときなど、Pipes の enrichment 機能で役割を分離するのは特に有用かもしれませんね。
4.2 そもそもメッセージングに DynamoDB Streams を使わない
これは DynamoDB Streams を使う意義の問題になるかと思います。
- 🙆 DynamoDB Streams を使うことで、DynamoDB とメッセージングとの間に不整合が起こらない
- DynamoDB への書き込みと、メッセージングへの書き込みがアトミックになります
DB 書き込みとメッセージングの間に不整合が起こる問題や、それを解決する「アウトボックスパターン」については、次の AWS のドキュメントに詳しく書かれています。
さいごに
- EventBridge Pipes の enrichment 機能を使用して、DynamoDB Streams の JSON を変換する方法を実装した
- アプリケーションロジックと DynamoDB の仕様を分離でき、よりクリーンな設計を実現できた
- 変換処理を再利用可能な形で実装でき、メンテナンス性が向上した
実装例(CDKコード)は以下の GitHub リポジトリで公開しています