NorikraとFluentdで流れてきたログをリアルタイムに集計する

(2017-06-10)

NorikraはTD社のtagomoris氏が作った、 スキーマレスのストリーミングデータを処理するOSS。

モチベーションとしてはfluentdでElasticsearchにログを送って可視化していたのだけど、 流量が増えてきてピーク帯に耐えられなくなってしまったため、前もって集計してから送ることで流量を減らそうというもの。

Norikraを立ち上げてクエリを実行する

公式で紹介されているDockerイメージがあったのでこれで動かしてみる。

$ docker run -e "TZ=Asia/Tokyo" -p 26578:26578 -p 26571:26571 -v `pwd`:/var/tmp/norikra:rw -d myfinder/docker-norikra norikra start --stats /var/tmp/norikra/stats.json -l /var/tmp/norikra 

ほかのオプションとして-Xms-XmxでJVMのヒープメモリの量を設定したり、Experimentalではあるけど--shutoffでヒープメモリが一杯になる前に弾いて OutOfMemoryを防ぐことができる。 また、Norikraのコアエンジンで使われているOSSの CEP (Complex event processing)エンジン、 Esper のパフォーマンスチューニングとして--micro--smallなどを渡すこともできるけど試していない。

公式サイトの例に従ってクライアントからデータを入れてクエリを実行してみる。

まずはtargetをopenする。targetというのはスキーマレスのイベントストリームのこと。 ここで定義したフィールドは必須になる。

$ norikra-client target open www path:string status:integer referer:string agent:string userid:integer
$ norikra-client target list
TARGET	AUTO_FIELD
www	true

次にクエリを追加する。一見普通のSQLのように見えるけど、EsperのクエリであるEPL(Event Processing Language)。 ただしSELECTしか使えないのも含めてクエリにいくらかの制限がある。

このクエリではwin:time_batchで10秒のWindowを定義し、eventをgroup byして、その数をeventとして出力する。

$ norikra-client query add www.toppageviews 'SELECT count(*) AS cnt FROM www.win:time_batch(10 sec) WHERE path="/" AND status=200'

eventを流す。

$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/login", "status":301, "referer":"/", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www

クエリの値をfetchする。送るのが遅くてgroup byされなかったけどこんな感じ。 eventがこなかったはじめのWindowは0が出力されるが、それ以降のWindowでは出力されない。

$ norikra-client event fetch www.toppageviews
{"time":"2017/06/07 20:58:13","cnt":0}
{"time":"2017/06/07 21:42:33","cnt":1}
{"time":"2017/06/07 21:42:43","cnt":0}
{"time":"2017/06/07 21:43:13","cnt":1}
{"time":"2017/06/07 21:43:23","cnt":0}
{"time":"2017/06/07 21:43:33","cnt":1}

あとWeb-uiが用意されていて、クエリを追加したり、targetやクエリの一覧、メモリの使用量やサーバーログなどが取得できる。デフォルトでは26578ポート。

web-ui

クエリ(EPL)

Windowなし

上の例ではtime_batchでWindowを定義したけど、定義しないクエリを追加してみる。 以下のようなクエリを登録し、再びeventを流してfetchすると流した分が全てとれる。 ただし、このようなクエリはfetchされないと大量のoutput eventが溜まる可能性がある。

SELECT path, status AS cnt FROM www WHERE path="/" AND status=200
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ norikra-client event fetch www.toppageviews-nowin
{"time":"2017/06/07 23:06:12","cnt":200,"path":"/"}
{"time":"2017/06/07 23:09:10","cnt":200,"path":"/"}

win:time_batch

10 secのように秒以外にもmsecminhour、どう使うか想像できないけどyearまで指定でき、 10 minutes 30 secondsみたいに組み合わせることもできる

また、第二引数にミリ秒を渡すと出力するタイミングを指定できる。

SELECT count(*) AS cnt FROM www.win:time_batch(1min, 0L) WHERE path="/" AND status=200
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ norikra-client event fetch www.toppageviews-tb-opts
{"time":"2017/06/08 00:43:00","cnt":1}

win:ext_timed_batch

来た時間ではなくフィールドのUNIXミリ秒を参照するWindow。時系列順にソートされている必要があって、 tagomoris氏いわくおすすめしないとのこと。

SELECT count(*) AS cnt FROM www.win:ext_timed_batch(timestamp, 1 min) WHERE path="/" AND status=200
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3, "timestamp":1496852100000 }' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3, "timestamp":1496852200000 }' | norikra-client event send www
$ norikra-client event fetch www.toppageviews-ext_timed
{"time":"2017/06/08 01:19:02","cnt":2}

win:length_batch

event数のWindow。毎回渡した数ずつ集計できると思いきや、数が集まらなければfetchできず、 それ以上集まったらfetchできるようだ。使いづらいような気がする。

SELECT avg(userid) as nosense FROM www.win:length_batch(2) WHERE path="/" AND status=200
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ norikra-client event fetch www.length-lenbat

$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":1}' | norikra-client event send www
$ norikra-client event fetch www.length-lenbat

$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":2}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":1}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ norikra-client event fetch www.length-lenbat
{"time":"2017/06/08 20:42:20","nosense":2.0}

win:length

こっちは渡した数スライドして集計するもの。Windowなしのときと同様、大量に溜まる可能性がある。

SELECT avg(userid) as nosense FROM www.win:length(2) WHERE path="/" AND status=200
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":3}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":1}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":5}' | norikra-client event send www
$ echo '{"path":"/", "status":200, "referer":"", "agent":"MSIE", "userid":4}' | norikra-client event send www
$ norikra-client event fetch www.length-len
{"time":"2017/06/08 20:58:11","nosense":3.0}
{"time":"2017/06/08 20:58:22","nosense":2.0}
{"time":"2017/06/08 20:58:32","nosense":3.0}
{"time":"2017/06/08 20:58:45","nosense":4.5}

他にもいろいろあるし、JOINやサブクエリも使える。

NorikraでログをJOINする - sambaiz-net

fluentdとやり取りする

fluent-plugin-norikraでNorikraサーバーにeventを送り、 eventを受け取ってファイルに出力する。

c4.large(2コア,メモリ3.75GiB)でDockerでNorikraを立ち上げ、以下の設定でtd-agentを実行した。 auto_fieldは来たeventのフィールドを自動でtargetに登録するかの設定で、 true(デフォルト)にするとどんなフィールドが来ているかNorikra上で確認することができる。 falseにしてもクエリで使う分は自動で登録される。

<source>
  @type dummy
  dummy {"hello":"world"}
  tag event.dummy
  rate 1000
</source>
   
<match event.*>
  @type   norikra
  norikra localhost:26571
  
  remove_tag_prefix event # event.*の部分が
  target_map_tag    yes   # targetになる

  <default>
    auto_field false 
  </default>
</match>

<source>
  @type   norikra
  norikra localhost:26571
  <fetch>
    method   event
    # norikra-client query add dummy_count_1sec 'SELECT COUNT(*) AS count FROM dummy.win:time_batch(1 sec)'
    target   dummy_count_1sec
    tag      string data.dummy_count_1sec
 #  tag      field FIELDNAME : tag by value with specified field name in output event
    interval 1m
  </fetch>
</source>

<match data.*>
  @type file
  path /var/log/td-agent/dummy_count
  time_slice_format %Y%m%d%H
  time_slice_wait 10s
  time_format %Y%m%dT%H%M%S%z
  compress gzip
  symlink_path /var/log/td-agent/dummy_count
</match>

Norikraのスループットは以下の要素が影響する。

number of targets
number of queries
how complex queries are
how complex UDFs are

で、目安としてはこんな感じらしい。

10 queries
2,000 events per seconds
5% usage of 4core CPU

1target、単純な1クエリなら秒間10000送ってみても問題なかった。 あまり現実的なケースではないけど限界を目指してみる。

$ tail -f dummy_count
20170609T212717+0900	data.dummy_count_1sec	{"count":10000}
20170609T212718+0900	data.dummy_count_1sec	{"count":10000}
20170609T212719+0900	data.dummy_count_1sec	{"count":10000}
20170609T212720+0900	data.dummy_count_1sec	{"count":10000}
20170609T212721+0900	data.dummy_count_1sec	{"count":10000}
20170609T212722+0900	data.dummy_count_1sec	{"count":10000}
20170609T212723+0900	data.dummy_count_1sec	{"count":10000}
20170609T212724+0900	data.dummy_count_1sec	{"count":10000}
20170609T212725+0900	data.dummy_count_1sec	{"count":10000}
20170609T212726+0900	data.dummy_count_1sec	{"count":10000}
 PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
 8256 root      20   0 1878m 249m  19m S 29.3  6.6   6:46.94 java
 9812 root      20   0  296m  68m 6288 S 20.0  1.8   2:38.08 ruby  

秒間40000送ってみるとカウントがおかしい。 dummyの方の限界かと思ってnorikraを外してみたらおおよそ数が合ったので Norikraサーバーかやり取りの部分で処理が追いついていないようだ。 一旦rateを下げてみたところ20000あたりを境目にこうなってしまった。

$ tail -f dummy_count
20170609T222018+0900	data.dummy_count_1sec	{"count":31248}
20170609T222019+0900	data.dummy_count_1sec	{"count":27468}
20170609T222020+0900	data.dummy_count_1sec	{"count":35309}
20170609T222021+0900	data.dummy_count_1sec	{"count":31944}
20170609T222022+0900	data.dummy_count_1sec	{"count":22805}
20170609T222023+0900	data.dummy_count_1sec	{"count":30716}
20170609T222024+0900	data.dummy_count_1sec	{"count":33617}
20170609T222025+0900	data.dummy_count_1sec	{"count":28740}
20170609T222026+0900	data.dummy_count_1sec	{"count":32058}
20170609T222027+0900	data.dummy_count_1sec	{"count":27253}

CPUの使用量をみてみると、ほぼ限界まで使用されていた。 fluentdはrubyのGIL (Global Interpreter Lock = GVL(Giant VM Lock))のため同時に1ネイティブスレッドしか動かせず、1コアしかCPUを使えないが、 jrubyで動くNorikraは残りのコアを使うことができる。 今回はtargetもクエリも一つだし、データ量も小さいためかメモリにはまだ余裕があった。 ログのサイズやウィンドウサイズが大きければメモリを使う量が増えるため、実際のログをしばらく 流してどちらが問題になりそうか確認するべき。

 PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
11378 root      20   0  350m 111m 6336 S 96.1  3.0   1:53.03 ruby
8256 root      20   0 1892m 642m  19m S 84.2 17.1  34:36.38 java   
HEAP MEMORY USED: 244MB (55.8%), COMMITTED: 437MB, MAX: 437MB
NON-HEAP MEMORY USED: 51MB (23.8%), COMMITTED: 81MB, MAX: 214MB

1Gbps、1Mevent/sを超えるような高トラフィックではStormなどのフレームワークを使えとのこと。