Kinesis Streams/Firehose/Analyticsを試す

(2017-02-20)

https://aws.amazon.com/jp/kinesis/

リアルタイムのストリーミングデータを扱うサービス群。 いまのところTokyoリージョンではKinesis Streamsしか使えない。

Kinesis Firehose

AWSのデータストアに送るストリーム。自分でデータを読む処理を書かなくてよく、スケーリングも勝手にやってくれるので簡単に使える。

https://aws.amazon.com/jp/kinesis/firehose/faqs/

Q: 送信先とは何ですか?
送信先はデータが配信されるデータストアです。Amazon Kinesis Firehose では、
現在送信先として Amazon S3、Amazon Redshift、Amazon Elasticsearch Service がサポートされています。

料金は取り込まれたデータ量による。

今回はS3に送ってみる。

firehose作成

圧縮方法を設定したり、Lambdaを噛ませたりすることができる。

firehose作成2

StatusがActiveになったらKinesis Agentで送ってみる。 CloudWatchとFirehoseにPutする権限が必要。Firehoseはkinesis:ではなくfirehose:なので注意。

$ sudo yum install –y aws-kinesis-agent

/etc/aws-kinesis/agent.jsonを編集する。リージョンごとのエンドポイントは ここ にある。

{
    "awsAccessKeyId": "",
    "awsSecretAccessKey": "",
    "firehose.endpoint": "https://firehose.us-east-1.amazonaws.com", 
    "flows": [
        {
            "filePattern": "/tmp/hoge.log", 
            "deliveryStream": "hogefugastream"
        }
    ] 
}
$ sudo service aws-kinesis-agent start
$ sudo chkconfig aws-kinesis-agent on
$ echo "aaa" >> /tmp/hoge.log
$ tail /var/log/aws-kinesis-agent/aws-kinesis-agent.log
com.amazon.kinesis.streaming.agent.Agent [INFO] Agent: Progress: 2 records parsed (168 bytes), 
and 2 records sent successfully to destinations. Uptime: 300044ms

S3に保存されているのを確認。

Kinesis Streams

用途を制限しないストリーム。データは保持期間の間、何度でも読むことができるので、 とりあえず必要なだけシャードを増やしてデータを入れておけばどうにかなる。 データを扱う側はそれぞれ独立に必要なタイミングで必要なだけpullするため、スケールするにあたってその先は別に考えることができ、 高負荷なシステムのlog aggregatorとして使われる。

料金

  • 時間単位のシャード速度: 1シャードは最大1000件/秒の1MB/秒の入力と2MB/秒の出力能力がある。
  • PUTペイロードユニット: 追加する25KBのチャンクの数。5KBでも1チャンク。
  • データ保持期間: デフォルトで24時間。7日まで延長可能。シャード時間ごとに課金。

による。

ストリーム作成時はシャード数を入れる。

streams作成

Firehoseと同じくKinesis Agentで送ってみる。 エンドポイントはここ

{
    "awsAccessKeyId": "",
    "awsSecretAccessKey": "",
    "kinesis.endpoint": "https://kinesis.us-east-1.amazonaws.com", 
    "flows": [
        {
            "filePattern": "/tmp/hoge.log", 
            "kinesisStream": "fugafugastream"
        }
    ] 
}

aws-cliでデータを取得する

まず、シャードイテレーターを取得する。有効時間は300秒。 TRIM_HORIZON で最も古い方からデータを取得していく。SequenceNumberを指定して途中から読むこともできる。

$ aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name fugafugastream
{
    "ShardIterator": "AAAAAAAAAAFjKI0neNqY2N5HzGljYFCzoFqpQsdncdC6xE+ylnqvZpmusNfyViY3hBSS8WQXa67gvtkF0f2eKzxQ/Fd7SXZG8Inkb8l1UDF5t+jHgErA28gVSWyT4uYxTzzbnhm9AhcbztyQrjqehYcjEfpWIz5XmhY9K3Kjp0Crygy+OYNSS5PoQFcB1PZ7xMFE8zLTxJXLv1ANRu0Q+1m/JFxKQ3WS"
}

このシャードイテレータを使ってget-recordsする。データはBase64で入っているのでデコードして確認する。

$ aws kinesis get-records --shard-iterator AAAAAAAAAAFjKI0neNqY2N5HzGljYFCzoFqpQsdncdC6xE+ylnqvZpmusNfyViY3hBSS8WQXa67gvtkF0f2eKzxQ/Fd7SXZG8Inkb8l1UDF5t+jHgErA28gVSWyT4uYxTzzbnhm9AhcbztyQrjqehYcjEfpWIz5XmhY9K3Kjp0Crygy+OYNSS5PoQFcB1PZ7xMFE8zLTxJXLv1ANRu0Q+1m/JFxKQ3WS
{
    "Records": [
        {
            "Data": "YWFhCg==", 
            "PartitionKey": "999679.8130737302", 
            "ApproximateArrivalTimestamp": 1487082145.518, 
            "SequenceNumber": "49570460043263608661463102123405561406360875697772167170"
        }, 
        ...
    ], 
    "NextShardIterator": "AAAAAAAAAAE08GRdLF1d76L1wCyLIiuAgpSEkKZSkUEO0VdUt3EOfdm1oOSXA1Xc4+tJPkSmB8g5NaQqDPRS/67u5IXermTUiAj6g2lgvDCGCqWFcYMAxIwIKZjKluCPQjL9kRaUqfVAaElRoKjp4Gv7JmuBDjKpxsbF2yk4uJJDAcevqH/VVkala8UbdhTweGyFgf9VhP/ljzXlrqkZ8wbD0eFwtZ3x", 
    "MillisBehindLatest": 0
}

$ echo "YWFhCg==" | base64 -d
aaa

Kinesis Analytics

SourceとなるKinesis Streamsか、Firehoseを指定し、SQLを実行できる。そして必要なら次のストリームに入れることができる。

analytics作成

今回はSourceとしてjsonで株価のデータが入っているDemo streamを使う。 いくつかSQLテンプレートが用意されていて、その中のContinuous Filterを選択。 Streamに入ってきたものをTECHで絞って出力する。

-- ** Continuous Filter ** 
-- Performs a continuous filter based on a WHERE condition.
--          .----------.   .----------.   .----------.              
--          |  SOURCE  |   |  INSERT  |   |  DESTIN. |              
-- Source-->|  STREAM  |-->| & SELECT |-->|  STREAM  |-->Destination
--          |          |   |  (PUMP)  |   |          |              
--          '----------'   '----------'   '----------'               
-- STREAM (in-application): a continuously updated entity that you can SELECT from and INSERT into like a TABLE
-- PUMP: an entity used to continuously 'SELECT ... FROM' a source STREAM, and INSERT SQL results into an output STREAM
-- Create output stream, which can be used to send to a destination
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), sector VARCHAR(12), change REAL, price REAL);
-- Create pump to insert into output 
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Select all columns from source stream
SELECT STREAM ticker_symbol, sector, change, price
FROM "SOURCE_SQL_STREAM_001"
-- LIKE compares a string to a string pattern (_ matches all char, % matches substring)
-- SIMILAR TO compares string to a regex, may use ESCAPE
WHERE sector SIMILAR TO '%TECH%';

analytics実行