あるけみーの小屋

趣味の話とか色々

Raspberry Pi工作日記③~AWS編(完結編)~

前回のあらすじ

CO2濃度の値が取れた。

fieldpaddy.hatenablog.com

本日のお品書き

  • AWSアカウント作るよ
  • センサーデータをAWSに送るよ

AWSアカウント作成

必要事項を入力して淡々と進める。

 

ほい完了。

 

今回久々に個人アカウントを作成しましたが、IAM Identity Centerなるものが使えるようになっていたのでこちらを有効にしてやってみました。IAM ユーザを作らなくて良くなりました。

 

あとは、

  • MFA認証の設定
  • Budgetの設定
  • Cost Anomalry Detectionの設定

だけ最低限設定。本当はCloudTrailとかSecuriyHubとか諸々やった方が良いんだけどね。

 

センサーデータをAWSに送信

さて、AWSアカウントの準備ができたのでセンサーデータをAWSに送信していく。

IoT Coreというサービスを使っていくのだが、実は先駆者がいるのでこちらを参考にしてみる。

 

 

dev.classmethod.jp

 

まずはIoT Coreで検索してページ遷移。

 

管理の「すべてのデバイス」>「モノ」を選択し、「モノを作成」

 

1つのモノを作成。

 

名前だけ入力し、あとはデフォルト。

 

証明書は自動生成

 

ポリシーは特にアタッチせずにそのまま

 

作成されたデバイス証明書等をすべてダウンロードしておきます。

 

 

ポリシーを作成します。

 

ポリシーアクションとポリシーリソースはここでは「*」にしますが、実運用では制限をかけた方が良さそう。

 

作成したポリシーを先ほど作った証明書にアタッチ。

 

この時点で、証明書を持っているモノからデータを送信できる状態になりました。

 

github.com

実際にデータを送るために、こちらのgitをラズパイ上でcloneします。

git clone https://github.com/aws/aws-iot-device-sdk-python.git

 

AWSIoTPythonSDKというライブラリをインストールしておきます。

pip3 install AWSIoTPythonSDK

 

その後、basicPubSub.pyをendpointなどのオプションをつけて実行する。

python3 basicPubSub.py --endpoint xxxxxxxx.iot.ap-northeast-1.amazonaws.com --rootCA ~/cert/AmazonRootCA1.pem --cert xxxxx-certificate.pem.crt --key xxxxxx-private.pem.key 

 

すると、

2024-04-06 16:33:35,125 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Initializing MQTT layer...
2024-04-06 16:33:35,131 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Registering internal event callbacks to MQTT layer...
2024-04-06 16:33:35,134 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - MqttCore initialized
2024-04-06 16:33:35,136 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Client id: basicPubSub
2024-04-06 16:33:35,139 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Protocol version: MQTTv3.1.1
2024-04-06 16:33:35,141 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Authentication type: TLSv1.2 certificate based Mutual Auth.
2024-04-06 16:33:35,143 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring endpoint...
2024-04-06 16:33:35,146 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring certificates and ciphers...
2024-04-06 16:33:35,149 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring reconnect back off timing...
2024-04-06 16:33:35,151 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Base quiet time: 1.000000 sec
2024-04-06 16:33:35,153 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Max quiet time: 32.000000 sec
2024-04-06 16:33:35,155 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Stable connection time: 20.000000 sec
2024-04-06 16:33:35,157 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queueing: max queue size: -1
2024-04-06 16:33:35,172 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring offline requests queue draining interval: 0.500000 sec
2024-04-06 16:33:35,175 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring connect/disconnect time out: 10.000000 sec
2024-04-06 16:33:35,177 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Configuring MQTT operation time out: 5.000000 sec
2024-04-06 16:33:35,179 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync connect...
2024-04-06 16:33:35,181 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing async connect...
2024-04-06 16:33:35,183 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Keep-alive: 600.000000 sec
2024-04-06 16:33:35,187 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Event consuming thread started
2024-04-06 16:33:35,189 - AWSIoTPythonSDK.core.protocol.mqtt_core - DEBUG - Passing in general notification callbacks to internal client...
2024-04-06 16:33:35,191 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in fixed event callbacks: CONNACK, DISCONNECT, MESSAGE
2024-04-06 16:33:35,413 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Starting network I/O thread...
2024-04-06 16:33:35,476 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [connack] event
2024-04-06 16:33:35,477 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [connack] event
2024-04-06 16:33:35,482 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - No need for recovery
2024-04-06 16:33:35,484 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
2024-04-06 16:33:35,487 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync subscribe...
2024-04-06 16:33:35,489 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Adding a new subscription record: sdk/test/Python qos: 1
2024-04-06 16:33:35,492 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom suback event callback...
2024-04-06 16:33:35,561 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [suback] event
2024-04-06 16:33:35,563 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [suback] event
2024-04-06 16:33:35,566 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
2024-04-06 16:33:35,569 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
2024-04-06 16:33:37,572 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish...
2024-04-06 16:33:37,579 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Filling in custom puback (QoS>0) event callback...
2024-04-06 16:33:37,598 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event
2024-04-06 16:33:37,600 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event
2024-04-06 16:33:37,605 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback...
2024-04-06 16:33:37,608 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - This custom event callback is for pub/sub/unsub, removing it after invocation...
2024-04-06 16:33:37,626 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [message] event
2024-04-06 16:33:37,629 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [message] event
Received a new message: 
b'{"message": "Hello World!", "sequence": 0}'
from topic: 
sdk/test/Python
--------------

といった形でAWSにテストメッセージが送信されるようになりました。

 

AWSコンソール上でも確認してみます。

MQTTテストクライアントからトピックを指定して、サブスクライブしておきます。

 

この状態で、先ほどのbasicPubSub.pyを実行すると

AWS側に送信されていることが確認できました。

 

これをベースにプログラムを書き換えました

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json
import mh_z19 #ライブラリをインポート

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
    print("Received a new message: ")
    print(message.payload)
    print("from topic: ")
    print(message.topic)
    print("--------------\n\n")


# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
                    help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
                    help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="co2/value", help="Targeted topic")  #デフォルトの送信先Topicを変更
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
                    help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
                    help="Message to publish")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic

if args.mode not in AllowedActions:
    parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
    exit(2)

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
    parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
    exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
    parser.error("Missing credentials for authentication.")
    exit(2)

# Port defaults
if args.useWebsocket and not args.port:  # When no port override for WebSocket, default to 443
    port = 443
if not args.useWebsocket and not args.port:  # When no port override for non-WebSocket, default to 8883
    port = 8883

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
    myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
    myAWSIoTMQTTClient.configureEndpoint(host, port)
    myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1)  # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2)  # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10)  # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5)  # 5 sec

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if args.mode == 'both' or args.mode == 'subscribe':
    myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

# Publish to the same topic in a loop forever
loopCount = 0
while True:
    if args.mode == 'both' or args.mode == 'publish':
        message = {}
        messageJson = json.dumps(mh_z19.read())  #修正
        myAWSIoTMQTTClient.publish(topic, messageJson, 1)
        if args.mode == 'publish':
            print('Published topic %s: %s\n' % (topic, messageJson))
        loopCount += 1
    time.sleep(10) #適宜送信間隔を修正

 

主に太字の箇所のみ修正しています。これで同じようにトピックでサブスクライブしてプログラムを動かしてみます。

 

良い感じに値が取れました!

 

さて、この値をCloudWatchに送信できるようにします。

 

まずIoT Core上でルールを作成します。

名前を設定

 

SQLステートメントはこんな感じで設定。FROM句にIoT Coreの送信先トピック名を記載。FROMの値は必ずシングルクォートを入れて文字列になるようにしましょう(1敗)



 

ルールアクションではCloudWatch Logsに送信できるように、ロググループとIAMロールを作成。

 

あとは確認して作成。

※上記のSQLステートメントは間違えた時の値が入っています。FROM句には文字列を入れましょう。

 

それではプログラムを動かして、CloudWatchにログが来ているかを確認。

いっぱいきた。

 

中身も問題なし。

 

値をグラフ化する

ログで値を取れたので、今度は可視化してみる。

ログからメトリクスフィルターを設定する。

 

こんな感じでフィルターパターンを設定

 

 

メトリクスの詳細はこんな感じ。デフォルトを400にしたのは、センサーで取得できる下限値が400のため。

 

グラフで表示できるようになった。

 

異常時にメールを飛ばすようにする

この調子でアラームの設定もしてみる。

 

10秒おきに取得するようにし、1000を超える値が6連続(=1分)発生したらアラーム状態になるようにした。

 

SNSトピックを一緒に作った場合、アラーム作成後はConfirmメールが届くので確認すること。

 

こんな感じでアラームの設定ができた。

なお、通知はアラーム状態になったタイミングで来るので、すでにアラーム状態であった場合メールは来ない。

 

というわけで元々やりたかったことは完了!

 

今回の進捗報告

  • AWSアカウント作成
  • AWSにセンサーデータ送信
  • CO2濃度が高くなった時にメール通知されるようにした

というわけで本来の目的は達成したので、このシリーズは完。

細かいチューニングはどっかでまとめるかも。