パターン 5. 蓄積したデータを条件にイベント登録したい

本パターンでは、My-IoTデータストアに蓄積されたデータを定期的にチェックし、条件を満たしたときにイベント登録を行います。

パターン6と組み合わせてシステムパッケージを作成することで、イベントによるクラウドアプリ間の連携処理を行うことができます。

クラウドアプリケーション内の構成は以下の通りです。

パターン05

No.

リソース名

概要

1

定期実行

No.2のカスタム処理を定期的に実行するLambda

2

カスタム処理

My-IoTデータストアに蓄積したデータの検索、取得したデータの解析及び、イベント発行を行います

3

OpenSearchアクセス

myiot-rel-es-access-lambda
My-IoTデータストアへ検索、登録等を行います

4

OpenSearch

My-IoTデータストアです。エッジアプリから送信されたデータが蓄積されています

5

イベントアクセス

myiot-rel-event-access-lambda
DynamoDBへアクセスし、イベントデータの登録を行います

6

DynamoDB

イベントデータが格納されているデータベースです
イベントデータが登録されると、DynamoDBストリームによってイベント制御Lambdaが実行されます

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

下記のファイル名で作成します。

  • cloudformation.yaml

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No.2 カスタム処理Lambdaのテンプレート例です
#
  sipSamplePattern05lm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 
        Fn::Sub: arn:aws:iam::${AWS::AccountId}:role/sip-sample-lambda-role
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 30
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-05-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # ES_ACCESS_LAMBDA: No.2 OpenSearchアクセスLambda関数名を指定しています
          # EVENT_ACCESS_LAMBDA: No.5 イベントアクセスLambda関数名を指定しています
          # 
          ES_ACCESS_LAMBDA: myiot-rel-es-access-lambda
          EVENT_ACCESS_LAMBDA: myiot-rel-event-access-lambda
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
          # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
          # 
          # THRESHOLD: イベント登録を判断するための閾値を設定してください
          #
          THRESHOLD: '100'
          #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
#
# Lambda Function
#************************************************************

#************************************************************
# Lambda Permission
# No.2 カスタム処理のLambda Permissionのテンプレート例です
#
  LambdaPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'events.amazonaws.com'
      FunctionName: 
        Ref: sipSamplePattern05lm
      SourceArn: 
        Fn::GetAtt: Schedule.Arn
#
# Lambda Permission
#************************************************************

#************************************************************
# EventBridge
# No.1 定期実行のテンプレート例です
#
  Schedule: 
    Type: 'AWS::Events::Rule'
    Properties:
      Description: ''
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 定期実行される間隔を記載してください
      # 最短1分間隔で登録可能です
      #
      ScheduleExpression: 'cron(0/5 * * * ? *)'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      Targets:
        - Arn: 
            Fn::GetAtt: sipSamplePattern05lm.Arn
          Id: sipSamplePattern05
#
# EventBridge
#************************************************************

コネクタ例

このサンプルアプリケーションでは、下記のようなコネクタに従ってデータが蓄積されていることを想定しています。

{
    "$schema": "http://json-schema.org/draft-07/schema",
    "type": "object",
    "required": ["connectorID", "edgeID", "timestamp", "light"],
    "properties": {
         "connectorID": {
            "type": "string"
        },
         "edgeID": {
            "type": "string"
        },
         "timestamp": {
            "type": "string"
        },
        "light": {
            "type": "number"
        },
    },
    "additionalProperties": false
}

カスタム処理ソースコード例(Python)

下記のファイル名で作成します。

  • lambda_function.py

import boto3
import copy
import json
import logging
import os
from datetime import datetime
from dateutil import tz

# Loggerオブジェクトを取得し、表示するレベルを設定します。
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# ESアクセスLambda関数名
ES_ACCESS_LAMBDA = os.environ.get('ES_ACCESS_LAMBDA')
# イベントアクセスLambda関数名
EVENT_ACCESS_LAMBDA = os.environ.get('EVENT_ACCESS_LAMBDA')
# テナントID
TENANT_ID = os.environ.get('TENANT_ID')
# イベントキー
EVENT_KEY = os.environ.get('EVENT_KEY')
# コネクタID
CONNECTOR_IDS = os.environ.get('CONNECTOR_INFO')
# イベント発行の際の閾値
THRESHOLD = int(os.environ.get('THRESHOLD'))

# Lambdaクライアントの生成
client = boto3.client('lambda')


def event_access(edgeId):
    """イベントデータを登録します
    Args:
        edgeId (str): イベント連携先へ渡すエッジID
    Returns:
        なし
    """
    try:
        # イベント連携先へ渡すペイロードを生成します
        detail = {
            'eventName': 'light_alert',
            'edgeId': edgeId
        }

        # ==============================================================================
        # イベントデータを登録する際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)
        # No.6 イベントデータの登録 を参照してください。
        # ここでは操作内容や、イベント連携先へ渡すペイロードなどを設定しています。
        # ==============================================================================
        payload = {
            'method': 'register',
            'tenantId': TENANT_ID,
            'eventKey': EVENT_KEY,
            'detail': detail
        }

        # ==============================================================================
        # No.5 イベントアクセス
        # 環境変数で定義されているイベントアクセスLambda関数に
        # リクエストを実施し、結果を取得します。
        # ==============================================================================
        client.invoke(
            FunctionName=EVENT_ACCESS_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )
    except Exception:
        import traceback
        logger.error(traceback.format_exc())


def data_check(data):
    """条件を満たしたとき、イベント情報を登録します
    Args:
        data: 検索結果
    Returns:
        なし
    """
    try:
        # データを取得します。
        result_data = data['aggregations']['aggs_edgeId']['buckets'][0]['avg_light']['value']
        # edgeIdを取得します。
        edgeId = data['aggregations']['aggs_edgeId']['buckets'][0]['key']

        # 閾値未満か確認します。
        if result_data < THRESHOLD:
            # イベントアクセスLambdaを呼び出して登録します
            event_access(edgeId)

    except Exception:
        import traceback
        logger.error(traceback.format_exc())


def es_query_access():
    """OpenSearchアクセスAPIを呼び出し、データの検索を行います。

    Returns:
        dict: 処理結果を返します。
    """

    # テナントIDと日付からインデックス名を生成する(テナントID_YYYY.MM.DD)
    # 例: エッジアプリからMy-IoTデータストアに今日送信されたデータを取得する場合
    JST = tz.gettz('Asia/Tokyo')
    dt = datetime.now(JST)
    today = dt.strftime('%Y.%m.%d')
    index_name = '{}_{}'.format(TENANT_ID, today)

    # 現在時刻から5分前のdatetimeを生成する
    gte_time = dt - dt.datetime.timedelta(minutes=-5)

    # ==============================================================================
    # OpenSearchにリクエストする検索文を設定します。
    # このサンプルでは、サンプルのコネクタに従ってデータが登録されていることを
    # 前提としています
    #
    # サンプルクエリは同一のエッジIDにおける5分間の光度の平均値を取得するものです。
    # ==============================================================================
    # 環境変数に設定されたコネクタを検索する条件生成
    connectors = json.loads(CONNECTOR_IDS)
    connector_query = []
    connector_query_item = {
        'match': {
            'connectorID': ''
        }
    }
    for connector in connectors:
        query_item = copy.deepcopy(connector_query_item)
        query_item['match']['connectorID'] = connector
        connector_query.append(query_item)

    body = {
        'query': {
            'query': {
                'bool': {
                    'should': connector_query
                }
            },
            'range': {
                'timestamp': {
                    'gte': gte_time.isoformat(timespec='milliseconds'),
                    'lte': dt.isoformat(timespec='milliseconds')
                }
            }
        },
        'size': 0,
        'aggs': {
            'aggs_edgeId': {
                'terms': {
                    'field': 'edgeID.keyword',
                    'size': 1
                },
                'aggs': {
                    'avg_light': {
                        'avg': {
                            'field': 'light'
                        }
                    }
                }
            }
        }
    }

    try:
        # ==============================================================================
        # No.2 OpenSearchアクセス
        # 環境変数で定義されているESアクセスLambda関数に
        # リクエストを実施し、結果を取得します。
        # ==============================================================================
        res = boto3.client('lambda').invoke(
            FunctionName=ES_ACCESS_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )

        response = json.loads(res['Payload'].read())

        # 検索結果が1件以上の場合、データありと判断しデータチェック処理を行います。
        if response['body']['result']['hits']['total']['value'] > 0:
            data_check(response['body']['result'])
        else:
            return

    except Exception:
        import traceback
        logger.error(traceback.format_exc())
    return


# Lambda関数の処理の入口
def lambda_handler(event, context):

    # OpenSearchに蓄積されたデータをチェックし
    # 条件を満たしたときDynamoDBにイベント情報を登録する
    return es_query_access()