パターン 1. APIを介してデータを検索、登録を行いたい

本パターンでは、WebアプリクライアントがAPIを介してデータを検索、登録します。

クラウドアプリケーション内の構成は以下の通りです。

パターン01

No.

リソース名

概要

1

API Gateway

Webクライアントから受け取ったリクエストをカスタムLambdaに送信し、結果を返却します。

2

カスタム処理

Webクライアントからのリクエストを受けて、OpenSearchアクセスLambdaを実行します。

3

OpenSearchアクセス

myiot-rel-es-access-lambda
My-IoTデータストアに検索、登録等を行います。

4

OpenSearch

My-IoTデータストアです。エッジアプリから送信されたデータが蓄積されています。

CloudFormationテンプレート例

本パターンにおけるCloudFormationテンプレートを作成します。
各項目についての設定の詳細はAWSのドキュメントを参照してください。
※yml/yamlファイルの場合に、IoTストアでは!GetAttなど、短縮形の構文で組み込み関数は使用できないため、Fn::GetAttのように完全名関数の構文で記述する必要があります。

テンプレート作成する際の注意事項として以下のコメント種別で説明をします。

コメント種別

内容

+

利用目的に応じて開発者側で適切な値の設定が必要な箇所を示しています

!

My-IoTが提供する共通リソースに関する記載のため変更禁止の箇所を示しています

*

その他の補足説明を示しています

yaml形式の場合の例

下記のファイル名で作成します。

  • cloudformation.yaml

AWSTemplateFormatVersion: '2010-09-09'
Description: An AWS function.
Resources:
#************************************************************
# Lambda Function
# No.2 カスタム処理Lambdaのテンプレート例です
#
  sipSamplePattern01lm:
    Type: 'AWS::Lambda::Function'
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ソースコードの格納先はIoTストアで展開時に自動設定されるため記載しないでください
      # Code:
      #   S3Bucket: 
      #   S3Key: 
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Description: ''
      Handler: lambda_function.lambda_handler
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # ロールは変更しないでください
      #
      Role: 
        Fn::Sub: arn:aws:iam::${AWS::AccountId}:role/sip-sample-lambda-role
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      Runtime: python3.7
      MemorySize: 128
      Timeout: 30
      #************************************************************
      # Lambdaのファンクション名は導入時にIoTストアにて一意の名称に変換されます
      # テンプレート内では任意の名称で構いません
      #
      FunctionName: 'sip-sample-pattern-01-lm'
      #************************************************************
      Environment:
        Variables:
        #************************************************************
        # カスタムLambdaが参照する環境変数と値を定義します
        #
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
          # 下記の項目は変更しないでください
          #
          # ES_ACCESS_LAMBDA: No.3 OpenSearchアクセスLambda関数名を指定しています
          # 
          ES_ACCESS_LAMBDA: myiot-rel-es-access-lambda
          #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
#
# Lambda Function
#************************************************************

#************************************************************
# ApiGateway
# No.1 API Gatewayのテンプレート例です
#
  RestAPI:
    Type: AWS::ApiGateway::RestApi
    Properties:
      EndpointConfiguration:
        Types:
          - REGIONAL
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
      # 
      # Description: 説明文を記載してください
      # ※空文字はエラーとなります。不要の場合は「Description」を削除してください
      # Name: API名を記載してください
      #
      Description: 'sip-sample-pattern-01 RestApi'
      Name: 'sample-apigw'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  RestAPIDeployment:
    Type: AWS::ApiGateway::Deployment
    Properties:
      Description: ''
      RestApiId: 
        Ref: RestAPI
    DependsOn:
      - RestAPIMethod
      - RestAPIResource

  RestAPIStage:
    Type: AWS::ApiGateway::Stage
    Properties:
      RestApiId: 
        Ref: RestAPI
      DeploymentId: 
        Ref: RestAPIDeployment
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
      # 下記の項目は作成するクラウドアプリケーションに応じて変更してください
      # 
      # StageName: ステージ名を記載してください
      #
      StageName: 'sample-stage'
      #++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  RestAPIResource:
    Type: AWS::ApiGateway::Resource
    Properties:
      PathPart: '{proxy+}'
      ParentId: 
        Fn::GetAtt: RestAPI.RootResourceId
      RestApiId: 
        Ref: RestAPI

  RestAPIMethod:
    Type: AWS::ApiGateway::Method
    Properties:
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      # 下記の項目は変更しないでください
      #
      # HttpMethod:        メソッド種別を指定しています
      # AuthorizationType: 認証種別を指定しています
      # Integration:       プロキシ統合に関する設定を指定しています
      # 
      HttpMethod: ANY
      AuthorizationType: NONE
      RequestParameters: 
        'method.request.path.proxy': true
      Integration:
        CacheKeyParameters: 
          - 'method.request.path.proxy'
        CacheNamespace: 
          Ref: RestAPIResource
        IntegrationHttpMethod: POST
        Type: AWS_PROXY
        Uri: 
          Fn::Sub: 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${sipSamplePattern01lm.Arn}/invocations'
      #!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
      ResourceId: 
        Ref: RestAPIResource
      RestApiId: 
        Ref: RestAPI

#
# ApiGateway
#************************************************************

#************************************************************
# LambdaPermission
#
  LambdaPermission:
    Type: 'AWS::Lambda::Permission'
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'apigateway.amazonaws.com'
      FunctionName: 
        Fn::GetAtt: sipSamplePattern01lm.Arn
#
# LambdaPermission
#************************************************************

カスタム処理ソースコード例(Python)

下記のファイル名で作成します。

  • lambda_function.py

import boto3
import copy
import json
import logging
import os
from datetime import datetime
from dateutil import tz

# Loggerオブジェクトを取得し、表示するレベルを設定します。
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# ESアクセスLambda関数名
ES_ACCESS_LAMBDA = os.environ.get('ES_ACCESS_LAMBDA')
# テナントID
TENANT_ID = os.environ.get('TENANT_ID')
# コネクタID
CONNECTOR_IDS = os.environ.get('CONNECTOR_INFO')

# エラーコード&メッセージ
ERR_INVALID_ARGUMENT = 400
ERR_INTERNAL_ERROR = 500

ERR_MSG_INVALID_ARGUMENT = {'message': 'Invalid Argument.'}
ERR_MSG_INTERNAL_ERROR = {'message': 'Internal error.'}


def is_empty_str(val):
    """値が空文字列か、又はNoneかを調べます。

    Args:
        val (str/NonType): 調べる値

    Returns:
        bool: 値が空文字列か、又はNoneなら True を返す
    """

    return not val if isinstance(val, str) else val is None


def make_response(code, body):
    """実行結果をJSON形式で返します。

    Args:
        code (int): ステータスコード
        body (str): 実行結果

    Returns:
        [dict]: 実行結果
    """

    # ==============================================================================
    # APIで返却するレスポンスには、オリジン間リソース共有(CORS)を有効にするために
    # 「Access-Control-Allow-Origin」をヘッダーに設定しています。
    # ここでは、すべてのドメインからのアクセスを許可するように設定しています。
    # ==============================================================================
    response = {
        'statusCode': code,
        'headers': {
            'Access-Control-Allow-Origin': '*',
            'Content-Type': 'application/json'
        },
        'body': json.dumps(body),
        'isBase64Encoded': False
    }

    return response


def es_query_access(event):
    """OpenSearchアクセスAPIを呼び出し、データの検索を行います。

    Args:
        event (dict): イベントデータ

    Returns:
        dict: 処理結果を返します。
    """

    # テナントIDと日付からインデックス名を生成する(テナントID_YYYY.MM.DD)
    # 例: エッジアプリからMy-IoTデータストアに今日送信されたデータを取得する場合
    JST = tz.gettz('Asia/Tokyo')
    today = datetime.now(JST).strftime('%Y.%m.%d')
    index_name = '{}_{}'.format(TENANT_ID, today)

    # ==============================================================================
    # OpenSearchにアクセスする際に必要な引数を設定しています。
    # 詳細については【PF仕様書】共通リソースの利用方法(API仕様など)No.2 IoTデータへのアクセス
    # を参照してください。
    # ここでは操作内容や、登録データなどを設定しています。
    # ==============================================================================
    # 環境変数に設定されたコネクタを検索する条件生成
    connectors = json.loads(CONNECTOR_IDS)
    connector_query = []
    connector_query_item = {
        'match': {
            'connectorID': ''
        }
    }
    for connector in connectors:
        query_item = copy.deepcopy(connector_query_item)
        query_item['match']['connectorID'] = connector
        connector_query.append(query_item)

    payload = {
        'method': 'Get',
        'index': index_name,
        'query': {
            'query': {
                'bool': {
                    'should': connector_query
                }
            },
            'sort': [{'timestamp': 'desc'}]
        }
    }

    try:
        # ==============================================================================
        # No.3 OpenSearchアクセス
        # 環境変数で定義されているmyiot-rel-es-access-lambdaに
        # リクエストを実施し、結果を取得します。
        # ==============================================================================
        res = boto3.client('lambda').invoke(
            FunctionName=ES_ACCESS_LAMBDA,
            InvocationType='RequestResponse',
            Payload=json.dumps(payload)
        )

        response = json.loads(res['Payload'].read())

    except Exception:
        import traceback
        logger.error(traceback.format_exc())
        return make_response(ERR_INTERNAL_ERROR, ERR_MSG_INTERNAL_ERROR)

    # 処理結果を返します。
    return make_response(response['statusCode'], response['body'])


# Lambda関数の処理の入口
def lambda_handler(event, context):

    # OpenSearchアクセスAPIを呼び出し、データの検索を行います。
    return es_query_access(event)

OpenSearch検索結果の例(cURLコマンドを使用)

APIGatewayを利用したOpenSearchへの検索リクエスト及びレスポンス例を示します。
リクエストに必要なAPI URLは、クラウドアプリケーションをインストールしたグループの詳細画面から確認することができます。

API URLを確認する方法の詳細については、 My-IoT ユーザーマニュアル を参照してください。

リクエスト例

curl <エッジグループ詳細画面で確認できるApiUrl>/<任意のパス>

レスポンス例

{"result": {"took": 8, "timed_out": false, "_shards": {"total": 5, "successful": 5, "skipped": 0, 
 "failed": 0}, "hits": {"total": {"value": 1, "relation": "eq"}, "max_score": 1.0, 
 "hits": [{"_index": "sample-index", "_type": "_doc", "_id": "AAAABBBBCCC", "_score": 1.0, 
 "_source": {"Id": "1001", "temperature": 36.3, "time": 1626188400}}]}}}