パターン 6. IoT Coreを介してエッジ側にメッセージを送信したい

本パターンでは、イベントデータの連携先クラウドアプリとして、受け取ったイベント情報を基にIoT Coreを介してエッジ側にメッセージを送信します。

パターン5と組み合わせてシステムパッケージを作成することで、イベントによるクラウドアプリ間の連携処理を行うことができます。

クラウドアプリケーション内の構成は以下の通りです。

パターン06

No.

リソース名

概要

1

DynamoDB

イベントのトリガーになるデータベースです。

2

イベント制御Lambda

DynamoDBに登録されたイベントデータを、カスタムLambdaに通知して実行します。

3

カスタム処理

イベント制御Lambdaからイベント情報を受け取り、イベントに基づいてIoT Coreへのメッセージ送信Lambdaを呼び出します。

4

MQTT Publish

カスタムLambdaからメッセージを受け取り、MQTT Publishを実行します。

5

IoT Core

エッジ側にメッセージを送信します。

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

下記のファイル名で作成します。

  • cloudformation.yaml

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No.3 カスタム処理Lambdaのテンプレート例です
#
  sipsampleprocessorderlm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 
        Fn::Sub: arn:aws:iam::${AWS::AccountId}:role/sip-sample-lambda-role
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 10
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-06-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # MQTT_LAMBDA: No.4 MQTT PublishのLambda関数名を指定しています
          # EVENT_ACCESS_LAMBDA: No.2 イベントアクセスLambda関数名を指定しています
          # 
          MQTT_LAMBDA: myiot-rel-publish-mqtt-lambda
          EVENT_ACCESS_LAMBDA: myiot-rel-event-access-lambda
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
# Lambda Function
#************************************************************

イベント例

このサンプルアプリケーションでは、イベントによって下記のようなペイロードが渡されることを想定しています。

{
    "eventName": "イベント名",
    "edgeId": "エッジID",
}

カスタム処理ソースコード例(Python)

下記のファイル名で作成します。

  • lambda_function.py

import boto3
import json
import logging
import os

# Loggerオブジェクトを取得し、表示するレベルを設定します。
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# IoT Coreへのメッセージ送信関数名
MQTT_LAMBDA = os.environ.get('MQTT_LAMBDA')
# イベントアクセスLambda関数名
EVENT_ACCESS_LAMBDA = os.environ.get('EVENT_ACCESS_LAMBDA')
# テナントID
TENANT_ID = os.environ.get('TENANT_ID')

# Lambdaクライアントの生成
client = boto3.client('lambda')


def is_empty_str(val):
    """値が空文字列か、又はNoneかを調べます。

    Args:
        val (str/NonType): 調べる値

    Returns:
        bool: 値が空文字列か、又はNoneなら True を返す
    """

    return not val if isinstance(val, str) else val is None


def is_empty_check(val):
    """値が空か、またははNoneかを調べます。
       list,dict型に対応しています。

    Args:
        val: 調べる値

    Returns:
        bool: 値が空か、又はNoneなら True を返す
    """

    res = False

    if is_empty_str(val):
        res = True
    elif isinstance(val, (list, dict)):
        if not val:
            res = True

    return res


def mqtt_lambda_publish(event):
    """MQTT Publish APIを呼び出し、メッセージを送信します。

    Args:
        event (dict): メッセージ送信情報
    Returns:
        なし
    """
    try:
        # qosレベルを設定します。
        qos_level = 1

        # ==============================================================================
        # MQTT Publishする際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)
        # No.8 IoT Coreへのメッセージ送信を参照してください。
        # ここではイベントとして受け取った情報から
        # 送信先や、送信内容(メッセージ)などを設定しています。
        # ==============================================================================
        payload = {
            'tenantId': TENANT_ID,
            'edgeId': event['detail']['edgeId'],
            'payload': {
                'message': event['detail']['eventName']
            },
            'qos': qos_level
        }

        # ==============================================================================
        # No.5 IoT Coreへのメッセージ送信
        # 環境変数で定義されているmyiot-rel-publish-mqtt-lambdaに
        # リクエストを実施します。
        # ==============================================================================
        response = client.invoke(
            FunctionName=MQTT_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )
    except Exception as e:
        logger.error(e)


def event_close(eventId):
    """イベントデータの登録Lambdaを呼び出し、イベントをクローズします。

    Args:
        eventId (str): クローズ対象のイベントID
    Returns:
        なし
    """
    try:
        # ==============================================================================
        # イベントをクローズする際に必要な引数を設定しています。
        # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)
        # No.6 イベントデータの登録 を参照してください。
        # ここでは操作内容や、クローズ対象のイベントIDなどを設定しています。
        # ==============================================================================
        payload = {
            'method': 'close',
            'tenantId': TENANT_ID,
            'eventId': eventId,
        }

        # ==============================================================================
        # No.2 イベントアクセス
        # 環境変数で定義されているイベントアクセスLambda関数に
        # リクエストを実施し、イベントをクローズします。
        # ==============================================================================
        client.invoke(
            FunctionName=EVENT_ACCESS_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )
    except Exception as e:
        logger.error(e)


# Lambda関数の処理の入口
def lambda_handler(event, context):

    try:
        # 入力キーのチェックを行います。必須の引数がない場合は処理を終了します。
        if (event.get('detail') is None) or (event['detail'].get('edgeId') is None):
            logger.error('Required tag does not exist.')
            return

        # edgeIdが文字列型であるかをチェックします。
        if is_empty_check(event['detail']['edgeId']):
            logger.error('No Value specified for edgeId.')
            return

        # イベントIDを取得します
        eventId = event.get('eventId')

        # タイムアウトイベントで呼び出されたかどうかを判別します
        isTimeout = event.get('isTimeOut')
        if isTimeout is True:
            # ==============================================================================
            # タイムアウトイベントで呼び出された時の処理を記載してください
            # ==============================================================================
            logger.error('Timeout previous event.')
        else:
            # MQTT送信を行います。
            mqtt_lambda_publish(event)

        # 最後にイベントクローズ処理を行います。
        event_close(eventId)

    except Exception as e:
        logger.error(e)

エッジでのメッセージ受信

クラウドアプリケーションから送信したメッセージをエッジで受信するには、Node-REDフローでMy-IoT専用ノードを使用します。 My-IoT専用ノードの詳細は、My-IoT専用ノードを参照してください。