ECS から Slack へ送られるエラーログの集約方法を考える – サーバーワークスエンジニアブログ

こんにちは、末廣です。
Slack の通知、たくさん来ると憂鬱ですよね?連絡に限らず、障害、トラブルが起こったときに自動エラー通知がたくさん来ても「もういいわ!」「いっぺんに通知してくれ」って思うこともあるかもしれません。ましてやエラーなるものは連続で起こったりすることもあるものです。

この導入として今回は「エラーが大量発生した時は集約してSlack通知、通常時は即時通知」という要件を、ECS コンテナから出たログを例として、CloudWatch Logs →うまく集約 → Slack 通知する方法を考えてみました。

非バースト時・バースト時の両方でログを見逃さず、Slack のレートリミットも回避できる仕組みを目指した検証内容をご紹介します。

要件整理

「集約する」といっても単発エラーが発生したときにある程度すぐ通知が出なければそもそもの意味がありません。従って以下条件を満たすことを目標にします。

  • 通常時:エラーが発生したら Slack へ即座に通知
  • バースト時(短時間に大量発生):一定期間分を集約して Slack へまとめて通知

AWS の構成

簡単に構成例を考えます。

  1. ECS から CloudWatch へログを出力
  2. CloudWatch Logs からサブスクリプションフィルターを使い、エラーログをトリガーに Lambda を起動
  3. 集約
  4. Lambda から SNS、Chatbot を使い Slack へ送信

3 部分が肝で、どのように集約をするかを考えていきます。

準備

その前に上記構成の準備をします。

docs.aws.amazon.com

タスク定義にて、awslogs ログドライバーを指定し CloudWatch Logsへログを送信できるように設定します。

今回検証したログの出力例を示すとこんな感じです。

{
    "timestamp": "2025-08-29T04:49:31.377+0000",
    "level": "ERROR",
    "service": "user-service",
    "message": "User validation failed: email already exists",
    "error": "ValidationException",
    "request_id": "req_1756442971",
    "user_email": "duplicate@example.com"
}
{
    "timestamp": "2025-08-29T05:14:26.651+0000",
    "level": "INFO",
    "service": "user-service",
    "message": "User profile retrieved successfully",
    "request_id": "req_1756444466",
    "user_id": "user_24",
    "endpoint": "/api/users/profile",
    "method": "GET",
    "response_time_ms": 45
}

ここから、{$.level="ERROR" || $.level="WARN" || $.level="CRITICAL"} のように、通知処理をしたいログをフィルターし、Lambda に送信します。

カスタム通知で SNS、 Chatbot (Amazon Q Developer in chat applications)経由で Slack へ通知を送信できるようにします。

blog.serverworks.co.jp

こちらのブログの boto3 での送信方法を参考に実装します。

集約方法を考える

このままサブスクリプションフィルターから Lambda 起動 → 通知とするとエラーがでる度に Slack にメッセージが届きます。もちろんこれでも問題はありませんが、連続する大量のエラーが発生すると人間慌ててしまいますし、大量の同じメッセージを遡ったりしなければなりません。

また、Slack の レートリミットにも引っかかる可能性もあります。Posting messages は 1 per second と記載あります。

docs.slack.dev

SQS を使う

集約、貯める、、、、というば SQS でメッセージをキューに一旦保存する方法があります。

SQS トリガー

集約用 Lambda から SQS にメッセージを保存しておき、さらに SQS をトリガーにして送信用の Lambda を起動します。

docs.aws.amazon.com

関数を呼び出す前に、Lambda は、バッチ処理ウィンドウの期限が切れる、呼び出しペイロードサイズのクォータに到達する、または設定された最大バッチサイズに到達するまで、標準キューからのメッセージのポーリングを継続します。

とあるようにバッチサイズとバッチウィンドウを設定することで、送信用 Lambda を起動するタイミングを調節できます。
例えば

  • バッチサイズ: 10
  • バッチウィンドウ: 30秒

で設定した時の挙動を具体的に考えてみます(呼び出しペイロードサイズのクォータは考慮なし)

メッセージがゆっくり届く場合
時刻 SQSに届いたメッセージ数 Lambda起動タイミング Lambdaに渡される件数
0秒 1件 まだ起動しない
10秒 2件目 まだ起動しない
20秒 3件目 まだ起動しない
30秒 4件目 ここで起動(ウィンドウ満了) 4件(0秒, 10秒, 20秒, 30秒分)
メッセージが一気に届く場合
時刻 SQSに届いたメッセージ数 Lambda起動タイミング Lambdaに渡される件数
0秒 1件 まだ起動しない
1秒 2件目 まだ起動しない
9秒 10件目 ここで即時起動 10件

つまり、バッチサイズを 100くらいにまとめ、バッチウィンドウを 60秒程度にすることで、ある程度の即時通知と(60秒程度の遅延は発生)と集約を満たすことはできます。

若干の問題点

Slack で表示した集約したメッセージの例です。

バーストメッセージ
問題点1

一旦バーストしているメッセージを一つのメッセージに羅列していますが、単体で他 ECS サービスから発生しているエラーメッセージが混在してしまっています。

メッセージの混在

これを避けるためには、SQS をサービス毎に作成するか、FIFO キューの MessageGroupIdを使うなどの方法が考えられます。

一方、サービス毎に SQS を作成するのは管理リソースが増えますし、FIFO キューはバッチの集約を 10にすることが限界です。

docs.aws.amazon.com

各バッチで関数に送信されるレコードの最大数。標準キューの場合、最大 10,000 レコードまで可能です。FIFO キューの場合、最大値は 10 です。バッチサイズが 10 を超える場合は、バッチウィンドウ (MaximumBatchingWindowInSeconds) も 1 秒以上に設定する必要があります。

以下のように送信 Lambda で制御すれば回避できます。

records = event.get('Records', [])
service_messages = defaultdict(list)


for idx, record in enumerate(records):
    try: 
        service_messages[service].append(formatted)


for service, messages in service_messages.items():
    try:
        resp = sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Message=json.dumps(notification)
            )
問題点2

また、SQS から Lambda を起動するときに最大同時実行数を設定できるのですが、最小値は 2〜しか設定できません。

最大同時実行数

何度か検証してみましたが、以下条件で 1分間に 60件ほどエラーを起こすと、Lambda コンテナが並行で2つ起動してしまい、それぞれ約半分の 30件ほどにまとめられてしまいます。

  • バッチサイズ: 100
  • バッチウィンドウ: 60秒

↓ 60秒の間に2回メッセージが届く

SQSバッチ集約ログ(user-service, 33件)
サービス: user-service
LogGroup: /ecs/user-service
コンテナ: user-service/user-service-api/27500ee9163548f6981f9cc749966021
発生時刻: 2025-09-11 23:42:53
メッセージ: バーストエラー
⋮

SQSバッチ集約ログ(user-service, 27件)
サービス: user-service
LogGroup: /ecs/user-service
コンテナ: user-service/user-service-api/27500ee9163548f6981f9cc749966021
発生時刻: 2025-09-11 23:42:59
メッセージ: バーストエラー
⋮

つまり、バッチウィンドウ(1分)をトリガーとして Lambda は起動していますが、そこから Lambda インスタンス自体は 2つ動いているため、それぞれで SNS を呼び出すことで 2回通知が来ます。大きな問題ではありませんが少し気になるところではあります。

加えてここで振り分けられるレコードは FIFO キューではないため、タイムスタンプや ECS サービスに関係なくバラバラに散りばめられてしまいます。

問題点3

それはシステムがよくない、という話であまりないかもしれませんが、結構な頻度でエラーが出るものやサブスクリプションフィルターで引っかかるものが多い場合、バッチウィンドウを大きく取りたいかもしれません。そうすると「即時通知」という要件を満たしづらくなります。

一旦まとめ

Lambda → SQS → Lambda を工夫することで以下は達成できました。

  • 通常時:エラーが発生したら Slack へ「ある程度」即座に通知
    • → 最低バッチウィンドウ時間 + Lambda が起動、送信するまではかかってしまう
  • バースト時(短時間に大量発生):一定期間分を「いい感じに」集約して Slack へまとめて通知
    • → Lambda の同時実行数により並行して起動してしまう
    • 複数サービスの異なるログがキューに格納されてしまう(コードロジックで解決)

厳格にこの辺りを行うためには DynamoDB を用いることで可能にはなります。

おまけ:より厳格な集約を考えてみる

Dynamo で一回保存する

図のようなイメージになります

Dynamo に保存
テーブル例

以下のようにテーブル設計をします

aggregation_key timestamp message logGroup logStream level created_at
user-service 1717224001 エラーメッセージ /aws/ecs/user stream-abc ERROR 2024-06-01T12:00:01Z

これらを 送信用の Lambda が取得し、通知処理を行います。

集約とトリガー

図の通り集約とトリガーも少し変わってきます。

SQS 自体に複数のエラーメッセージは送信しません。代わりに以下のようなメッセージを Lambda が SQS へ一度だけ送信します。

{
  "aggregation_key": "user-service",
  "period_start": 1717224000,
  "period_seconds": 60,
  "trigger_time": 1717224060
}

SQS には遅延キューという仕組みがあり、キューに格納されたメッセージを指定した秒数間受信するまでに遅延させることができます。

docs.aws.amazon.com

これを 60秒に設定し、その間に発生したエラーログは DynamoDB に格納していきます。

遅延キュー

遅延時間経過後、格納した「1つの」メッセージによって Lambda が起動します。メッセージが一つであるため、Lambda インスタンスが複数並行で動くことはないはずです。
起動された Lambda では、DynamoDB から timestamp が period_start ~ trigger_time までの期間に一致するものを取り出すことで 60秒間の集約を行う仕組みとなっています。

取り出し
即時通知

この状態でも遅延時間の影響は発生します。これも DynamoDB の条件付き書き込みの力を借りて解決します。

docs.aws.amazon.com

具体的には condition-expressionaggregation_key を指定し、通知フラグによって同じ値がなければ書き込む〜という条件にします。以下が例になります。

aggregation_key timestamp notified message logGroup logStream level created_at
notified#user-service#1717224000 0 true (なし) (なし) (なし) (なし) 2024-06-01T12:00:01Z
即時通知

フラグ条件として、「同じサービス」かつ「集約期間内の時間(period_start = now – (now % 60))」で集約範囲内であれば既に存在するエラーログである、というものとして即時通知するかしないかを Lambda 内で判定します。これらを組み合わせたフロー図を作ると

フロー図

少々複雑なロジックを使う必要がありますが、なんとか要件を満たせました。

まとめ

本記事では、ECS で発生したエラーログ集約・通知アーキテクチャについて、SQS のバッチサイズやバッチウィンドウを活用した実装方法を紹介しました。
また、微妙に手が届かない箇所を DynamoDB を活用してどうにかする方法も考えてみました。

みなさんの現場で「どこまでやるべきか」「どこから工夫が必要か」を考えるヒントになれば幸いです。

付録(Lambda コードサンプル)

※ AI 君と相談して作ったものです、動作保証はないのでご注意ください

サブスクリプションフィルターから起動される Lambda コード
import os
import json
import boto3
import gzip
import base64
import traceback

sqs = boto3.client('sqs')
SQS_URL = os.environ['SQS_URL']

def handler(event, context):
    try:
        cw_data = event['awslogs']['data']
        payload = gzip.decompress(base64.b64decode(cw_data))
        logs = json.loads(payload)
        print(f"Decoded CloudWatch Logs data. logGroup: {logs.get('logGroup')}")
    except Exception as e:
        print(f"Failed to decode CloudWatch Logs event: {e}")
        print(traceback.format_exc())
        return {'status': 'decode_error', 'error': str(e)}

    log_group = logs.get('logGroup', '')
    log_stream = logs.get('logStream', '')

    sent_count = 0
    error_count = 0

    for log_event in logs.get('logEvents', []):
        try:
            message = log_event.get('message', '')
            timestamp = log_event.get('timestamp', 0)
            
            try:
                log = json.loads(message)
            except Exception:
                log = {}
            
            log['logGroup'] = log_group
            log['logStream'] = log_stream
            log['timestamp'] = timestamp
            
            log['message'] = log.get('message', message)
            
            if 'service' not in log:
                
                if log_group.startswith('/ecs/'):
                    log['service'] = log_group.split('/ecs/')[-1]
                else:
                    log['service'] = 'unknown'

            
            sqs.send_message(
                QueueUrl=SQS_URL,
                MessageBody=json.dumps(log)
            )
            sent_count += 1
        except Exception as e:
            print(f"Failed to send log to SQS: {e}")
            print(traceback.format_exc())
            error_count += 1

    print(f"=== Lambda handler completed: sent={sent_count}, error={error_count} ===")
    return {
        'status': 'completed',
        'sent_count': sent_count,
        'error_count': error_count
    }
SQS から呼び出される Lambda コード
import os
import json
import boto3
from datetime import datetime
from collections import defaultdict

sns = boto3.client('sns')
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')

def handler(event, context):
    print("=== Simple SQS Batch Lambda invoked ===")
    records = event.get('Records', [])
    service_messages = defaultdict(list)

    for idx, record in enumerate(records):
        try:
            body = json.loads(record['body'])
            service = body.get('service', 'unknown')
            log_group = body.get('logGroup', 'unknown')
            log_stream = body.get('logStream', 'unknown')
            msg = body.get('message', str(body))
            ts = body.get('timestamp')
            if ts:
                if ts > 1e12:
                    ts = int(ts / 1000)
                dt = datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
            else:
                dt = "unknown"
            formatted = (
                f"サービス: {service}\n"
                f"LogGroup: {log_group}\n"
                f"コンテナ: {log_stream}\n"
                f"発生時刻: {dt}\n"
                f"メッセージ: {msg}"
            )
            service_messages[service].append(formatted)
        except Exception as e:
            print(f"Failed to parse record[{idx}] as JSON: {e}")

    for service, messages in service_messages.items():
        aggregated_message = "\n\n".join(messages)
        notification = {
            "version": "1.0",
            "source": "custom",
            "content": {
                "title": f":package: SQSバッチ集約ログ({service}, {len(messages)}件)",
                "description": (
                    f"{aggregated_message}\n\n"
                    f"---\n"
                    f"集約件数: {len(messages)}件\n"
                    f"この通知はSQSバッチ集約(最大100件/最大60秒)で送信されています。"
                )
            }
        }
        try:
            resp = sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Message=json.dumps(notification)
            )
            print(f"SNS publish response for {service}: {resp}")
        except Exception as e:
            print(f"Failed to publish to SNS for {service}: {e}")

    return {"status": "sent", "service_count": len(service_messages)}
サブスクリプションフィルターから起動される DynamoDB を使った Lambda コード
import os
import json
import boto3
import time
import gzip
import base64
import traceback
from datetime import datetime
from boto3.dynamodb.conditions import Key
from botocore.exceptions import ClientError

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])
sqs = boto3.client('sqs')
sns = boto3.client('sns')
SQS_URL = os.environ['SQS_URL']
SNS_TOPIC_ARN = os.environ.get('SNS_TOPIC_ARN')
AGGREGATION_KEY = "all"
AGGREGATION_PERIOD_SECONDS = 60  

def put_item_with_retry(table, item, max_retries=3, condition_expression=None):
    """DynamoDBへのリトライ機能付きput_item(条件付き書き込み対応)"""
    for attempt in range(max_retries):
        try:
            if condition_expression:
                table.put_item(Item=item, ConditionExpression=condition_expression)
            else:
                table.put_item(Item=item)
            print(f"Successfully put item on attempt {attempt + 1}: {item.get('aggregation_key', 'unknown')}")
            return True
        except ClientError as e:
            error_code = e.response['Error']['Code']
            print(f"DynamoDB put_item attempt {attempt + 1} failed with {error_code}: {e}")

            if error_code in ['ConditionalCheckFailedException']:
                
                raise e
            elif attempt == max_retries - 1:
                print(f"Failed to put item after {max_retries} attempts: {e}")
                return False
            else:
                
                wait_time = (2 ** attempt) + (time.time() % 1)  
                print(f"Retrying in {wait_time:.2f} seconds...")
                time.sleep(wait_time)
        except Exception as e:
            print(f"Unexpected error on attempt {attempt + 1}: {e}")
            if attempt == max_retries - 1:
                print(f"Failed to put item after {max_retries} attempts: {e}")
                return False
            time.sleep(2 ** attempt)
    return False

def send_sns_with_retry(topic_arn, message, max_retries=3):
    """SNS送信のリトライ機能付き関数"""
    for attempt in range(max_retries):
        try:
            response = sns.publish(TopicArn=topic_arn, Message=json.dumps(message))
            print(f"SNS message sent successfully on attempt {attempt + 1}: {response['MessageId']}")
            return True
        except Exception as e:
            print(f"SNS send attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                print(f"Failed to send SNS after {max_retries} attempts: {e}")
                return False
            time.sleep(2 ** attempt)
    return False

def send_sqs_with_retry(queue_url, message_body, delay_seconds=0, max_retries=3):
    """SQS送信のリトライ機能付き関数"""
    for attempt in range(max_retries):
        try:
            response = sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=message_body,
                DelaySeconds=delay_seconds
            )
            print(f"SQS message sent successfully on attempt {attempt + 1}: {response['MessageId']}")
            return True
        except Exception as e:
            print(f"SQS send attempt {attempt + 1} failed: {e}")
            if attempt == max_retries - 1:
                print(f"Failed to send SQS after {max_retries} attempts: {e}")
                return False
            time.sleep(2 ** attempt)
    return False

def handler(event, context):
    """
    CloudWatch Logsサブスクリプションフィルターからのイベントを処理
    エラーハンドリングとリトライ機能を強化
    """
    print(f"=== Lambda handler started at {datetime.now().isoformat()} ===")
    print(f"Received event: {json.dumps(event)[:500]}...")  

    try:
        
        cw_data = event['awslogs']['data']
        payload = gzip.decompress(base64.b64decode(cw_data))
        logs = json.loads(payload)
        print(f"Successfully decoded CloudWatch Logs data. Log group: {logs.get('logGroup')}")
    except Exception as e:
        error_msg = f"Failed to decode CloudWatch Logs event: {e}"
        print(error_msg)
        print(traceback.format_exc())
        return {'status': 'decode_error', 'error': str(e)}

    log_group = logs.get('logGroup', 'unknown')
    log_stream = logs.get('logStream', 'unknown')
    now = int(time.time())
    processed_count = 0
    error_count = 0

    print(f"Processing {len(logs.get('logEvents', []))} log events from {log_group}/{log_stream}")

    for log_event in logs.get('logEvents', []):
        try:
            message = log_event.get('message', '')
            timestamp = log_event.get('timestamp', now * 1000) // 1000  

            try:
                log = json.loads(message)
                print(f"Successfully parsed JSON log: {log.get('level', 'unknown')} level")
            except Exception:
                print(f"Skip non-JSON message: {message[:100]}...")
                continue  

            
            aggregation_key = log_group

            
            period_start = now - (now % AGGREGATION_PERIOD_SECONDS)
            
            notified_flag = f"notified#{aggregation_key}#{period_start}"

            try:
                
                success = put_item_with_retry(
                    table,
                    {
                        'aggregation_key': notified_flag,
                        'timestamp': 0,
                        'notified': True
                    },
                    condition_expression='attribute_not_exists(aggregation_key)'
                )

                if not success:
                    print(f"Failed to put notified flag for {aggregation_key}")
                    error_count += 1
                    continue

                
                log_item = {
                    'aggregation_key': aggregation_key,
                    'timestamp': timestamp,
                    'message': log.get('message', message),
                    'logGroup': log_group,
                    'logStream': log_stream,
                    'level': log.get('level', 'ERROR'),
                    'created_at': datetime.now().isoformat()
                }

                success = put_item_with_retry(table, log_item)
                if not success:
                    print(f"Failed to put log item for {aggregation_key}")
                    error_count += 1
                    continue

                print(f"Successfully stored 1st error: aggregation_key={aggregation_key}, timestamp={timestamp}")

                
                if SNS_TOPIC_ARN:
                    custom_notification = {
                        "version": "1.0",
                        "source": "custom",
                        "content": {
                            "title": f":rotating_light: Dynamo方式: 1件目エラー発生",
                            "description": f"""
📍 **サービス情報**
- LogGroup: {log_group}
- LogStream: {log_stream}
- 発生時刻: {datetime.fromtimestamp(timestamp).isoformat()}

🔍 **エラー詳細**
{log.get('message', message)}

🏷️ **追加情報**
- レベル: {log.get('level', 'ERROR')}
- 期間: {period_start} - {period_start + AGGREGATION_PERIOD_SECONDS}秒
""".strip()
                        }
                    }

                    if send_sns_with_retry(SNS_TOPIC_ARN, custom_notification):
                        print(f"Successfully sent immediate SNS notification for {aggregation_key}")
                    else:
                        print(f"Failed to send immediate SNS notification for {aggregation_key}")
                        error_count += 1

                
                sqs_message = {
                    'aggregation_key': aggregation_key,
                    'period_start': period_start,
                    'period_seconds': AGGREGATION_PERIOD_SECONDS,
                    'trigger_time': now
                }

                if send_sqs_with_retry(SQS_URL, json.dumps(sqs_message), AGGREGATION_PERIOD_SECONDS):
                    print(f"Successfully sent SQS trigger message for {aggregation_key}")
                else:
                    print(f"Failed to send SQS trigger message for {aggregation_key}")
                    error_count += 1

            except ClientError as e:
                if e.response['Error']['Code'] == 'ConditionalCheckFailedException':
                    
                    log_item = {
                        'aggregation_key': aggregation_key,
                        'timestamp': timestamp,
                        'message': log.get('message', message),
                        'logGroup': log_group,
                        'logStream': log_stream,
                        'level': log.get('level', 'ERROR'),
                        'created_at': datetime.now().isoformat()
                    }

                    if put_item_with_retry(table, log_item):
                        print(f"Successfully stored subsequent error: aggregation_key={aggregation_key}, timestamp={timestamp}")
                    else:
                        print(f"Failed to store subsequent error for {aggregation_key}")
                        error_count += 1
                else:
                    print(f"Unexpected DynamoDB error for {aggregation_key}: {e}")
                    error_count += 1
            except Exception as e:
                print(f"Unexpected error processing log event for {aggregation_key}: {e}")
                print(traceback.format_exc())
                error_count += 1

            processed_count += 1

        except Exception as e:
            print(f"Error processing individual log event: {e}")
            print(traceback.format_exc())
            error_count += 1

    result = {
        'status': 'completed',
        'processed_count': processed_count,
        'error_count': error_count,
        'log_group': log_group,
        'timestamp': datetime.now().isoformat()
    }

    print(f"=== Lambda handler completed: {result} ===")
    return result
SQS から呼び出される DynamoDB を使った Lambda コード
import os
import json
import boto3
from boto3.dynamodb.conditions import Key

dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(os.environ['DYNAMODB_TABLE'])
sns = boto3.client('sns')
SNS_TOPIC_ARN = os.environ['SNS_TOPIC_ARN']


def handler(event, context):
    print("=== Dynamo Aggregator Lambda invoked ===")
    print(f"event: {json.dumps(event)[:1000]}")  

    
    for idx, record in enumerate(event.get('Records', [])):
        print(f"Processing record[{idx}]: {record['body']}")
        body = json.loads(record['body'])
        period_start = body.get('period_start')
        period_seconds = body.get('period_seconds', 60)
        period_end = period_start + period_seconds

        
        aggregation_key = body.get('aggregation_key', 'unknown')

        
        log_group = None
        log_stream = None

        
        
        
        log_group = body.get('logGroup', '不明')
        log_stream = body.get('logStream', '不明')

        print(f"Querying DynamoDB: aggregation_key={aggregation_key}, period_start={period_start}, period_end={period_end}")
        
        resp = table.query(
            KeyConditionExpression=Key('aggregation_key').eq(aggregation_key) & Key('timestamp').between(period_start, period_end)
        )
        logs = resp.get('Items', [])
        print(f"Found {len(logs)} logs in DynamoDB for this period.")
        if logs:
            
            
            messages_with_meta = [
                f"[logGroup]: {log.get('logGroup', '不明')}\n[logStream]: {log.get('logStream', '不明')}\n---\n{log.get('message', '')}"
                for log in logs
            ]
            message_with_meta = "\n\n".join(messages_with_meta)
            custom_notification = {
                "version": "1.0",
                "source": "custom",
                "content": {
                    "title": f":cyclone:Dynamo方式集約ログ({len(logs)}件)",
                    "description": (
                        f"{message_with_meta}\n\n"
                        f":ラベルタグ: **追加情報**\n"
                        f"- 集約件数: {len(logs)}件\n"
                        f"- 期間: {period_start} - {period_end}秒 "
                        f"(この期間内のエラーを集約しています)"
                    )
                }
            }
            print(f"Publishing to SNS: {SNS_TOPIC_ARN}")
            print(f"Aggregated message:\n{message_with_meta}")
            resp = sns.publish(
                TopicArn=SNS_TOPIC_ARN,
                Message=json.dumps(custom_notification)
            )
            print(f"SNS publish response: {resp}")
        else:
            print("No logs to aggregate for this period.")
    print("=== Dynamo Aggregator Lambda finished ===")
    return {'status': 'aggregated'}

末廣 満希(執筆記事の一覧)

2022年新卒入社です。ここに何かかっこいい一言を書くことができるエンジニアになれるように頑張ります。




Source link

関連記事

コメント

この記事へのコメントはありません。