#linedevday New stream processing platform with Apache Flink の参加レポ@LINE DEVELOPER DAY 2016

公開日: 

LINE DEVELOPER DAY 2016に今年も今年も参加してきたのでそのメモです。

sponcer link

Data Labs 荻林裕憲様

B 6 new stream processing platform with apache flink from LINE Corporation

Data Labsについて

  • ユーザー数や売上などを社内横断でサポートする部署です
  • 大きくやることは2つある
    • 1:データという切り口からサービスをサポートする
      • データ分析の専門家もいるので、分析結果をサービスにフィードバックする
    • 2:分析のための新技術の開発
  • データラボができるまではそれぞれのサービスでデータが作られていたが、この部署ができることで統合的に分析できるようになった
  • サービスとの連携を考えるプランナー、分析を行うアナライザー、開発を行うエンジニアの3種類の役職がある

Data Platform

  • LINEやGamesなどのサービスからData Lakeとよばれるものにfluetndなどを使って蓄積されている
  • 分析ツールについては多くはOSSを使用している

LINE LIVEでのストリーミング処理について

  • LINE Liveは動画配信サービスです
  • 視聴者数については数秒おきに集計される
    • 集計後結果をAPIごしにサービスに戻しています
  • 解決すべき課題としては、リアルタイム集計、ユニークユーザー数、耐障害性があった
  • ユニークユーザー数に関してはアプリをとじてひらいても視聴者数は1になるので、覚えておくためにメモリを多く使用する

1st version

  • Norikraというストリーミング処理ツールを使った
    • SQLで処理を書ける
    • もともとfluentdで収集してNorikraで集計していたのでそのまま採用した
  • 番組ごとのユニークユーザー数を集計するのが以下のようなSQLでかける SELECT broadcast_id, count(distinct userky) as num_uu FROM linelive_viewerlog.win:time(3 day) GROUP BY type, broadcast_id OUTPUT LAST EVERY 10 seconds

win:time(3 day)について

  • バッチと違ってずっとデータが流れ続けるので、どのデータに対してアウトプットをだすかを定義する必要があるので、今回は3日として定義して集計している
    • ユニーク数単位だとどんどんメモリに溜まっていく
    • サービス開始当初はあまり長時間の再生は考慮されていなかった
    • 1週間や1ヶ月の番組ができたらここの指定日を変えてメモリも増やさないといけない

採用した方法

  • リアルタイムはNorikraで問題なかったが、ユニークユーザ数のカウントについては大量のメモリを搭載した。
  • 耐障害性としては2つのNorikraで運用することにした

障害事例

  • 乃木坂46が46時間の生放送をやった
    • この時Norkra JVM Heapは右肩上がりに上がって120Gの上限まで使い切りました
    • 金曜の夜からheapが上昇して日曜の昼に上限に達した
    • out of memoryにはならなかったが処理が遅延して実質的に停止しているような状態だった
  • 2台構成でやっていたが、2台めはさらにメモリを搭載していたのでそちらに切り替えることで復旧した

2nd versionでの改善

  • LINE Liveは公式アカウントのみの配信だったが、個人も配信できるようになり、ハートもプレゼントできるようになった
    • 流石に1サーバでの処理は無理
    • さらに拡張性も必要になった
  • 分散ストリーミング処理でさがすことになった。StormやSpark streamingが有名だが、Apache Flinkを採用した
  • OSSの分散処理ストリーミング
  • ストリーム処理用とバッチ処理用のAPIがある
  • そのAPIのうえにCEPや機械学習がのります
  • 障害回復が自動的に行なってくれる
    • 無造作に再起動されようが、再投入される
  • 高いスループットだった
    • 1サーバで秒間数万メッセージできた
    • さらに分散もできるので性能的には十分だった
  • 様々なサービスのログを扱うが、もしうまくいったら様々な処理をのせることになるので、その処理を書くのにコストが低いのがいい!
    • →Flinkは低コストでAPIがかけた
  • Windowを柔軟に書けるし、ログの到着が遅くてもログの時間をベースに取り込んでくれる
  • FlinkはjavaやScalaで簡単にかける
  • timeWindowに、耐障害性の確保を指定できる

アーキテクチャ

  • jobmanagerとtaskmanagerがあり、TaskManagerがkafkaなどから読み取ってHDFSやRedisに書き込む
    • taskmanagerは計算の状態を定期的にストレージに保存する
    • いまのwindowの様子がどうなのか、ログをどこまで読み込んだかなどを書き込む
    • jobmanagerはtaskmanagerを監視していておちたら、処理をキャンセルしてtaskmanagerを再投入する
  • 途中で落ちてもoffset以降のログを書き込むので、ログは1回しか書き込まないようになっている。
    • これをプログラマーが意識せずとも実装できるのでとてもいい!

トラブル

  • メモリ食いすぎて病患2000メッセージしか処理できなかった
    • jsonのパースにscalaの標準ライブラリをつかっていたせいだった
  • windowにたまったデータをいつ書き込むかを制御できるクラスがあるが、うまく期待した動作にならなかった
    • 例えば10秒毎に生成したいのに書き込まれなかったり、別のイベントで書き込んだりしていた
    • ソースをよんで、改良を加えることで対応しました
  • taskmanagerが状態を保存するのにスナップショットをとるが、その間にとまってしまうという事象があった
    • 原因はいまだ不明
    • kafkaの0.9以降のAPIで発生するので、古いkafkaのAPIを使うことで対処している

HyperLogLog

  • ユニークユーザー数のカウントはFlinkを導入することでユニークユーザ数で大量メモリを使ってしまい、解決できなかったので他の方法を考えた結果、HyperLogLogを使って集計することにした
  • 文字列を2進数にして、戦闘の0の数を数えて覚えておき、固定の入力があれば書き込む
    • 連続した0の数がいくつのときに4になるかを想定できる
    • 入力を幾つかのグループにわけてやった
  • 正確な値は集計できないが後ほどバッチで実行することでカバーすることにした

検証

  • FlinkとHyperLogLogでやって乃木坂の3日分のログを投入してやってみたところ、120G使っていたものが、80mbで処理できるようになった
    • 誤差も評価したが、誤差1%以下で処理できていた
    • 全体で7分かからないくらいで38万件?くらい処理できた
  • 7月に切り替えて現在この状態で稼働しています
  • クリック数のリアルタイムレポートにも使われている
  • アクセスログの集計などもやっている
  • ステータスごとの集計もやっている
  • 多いと秒間15万-20万くらい集まる リアルタイムで集計するのが厳しい
    • 今まではサンプリングでやっていた
    • これだと500と400の以上応答を見落としてしまう
    • Flinkに置き換えることで全量の集計ができるようになりました

メリデリ

いいところ

  • 障害に強い
  • 開発が活発
  • アップグレード頻度も頻繁
  • MLも活発で投稿数が結構あり、さらにレスポンスがすぐもらえる
    • またバグと判明した場合2〜3日で治ったりしたこともあった
  • meatupなども開催されています

デメリット

  • ジョブの改修、クラスタアップグレードなどは、内部状態に制限がある
    • 引き継げないパターンがいくつかあるので手動でやっています
  • 稼働中のジョブの並列度を後からあげることもできません
    • 停止して、覚えてるジョブをまっさらにして上げ直さないと並列度を上げられない
    • 今後回収予定はある模様
  • ジョブの間のIsolatinoはタスクマネージャの中で別スレッドで動く。
    • 場合によっては不安定になることがある
    • やーんの上で実行すると対応できる

まとめ

  • ログのストリーム処理で、途中の状態をどうやって復旧するのかが難しい課題だが、それをApache Flinkを使って解決できた
  • ユニークユーザー数は100%の正確さをあきらめることで大量のログを処理できるようになった
  • 大量のログを即時性を盛って生成できるようになったので今後も使っていきます

FAQ

  • HperLogLogで性格には測れなくてバッチであとで計算するという話だが、実際バッチ処理するとどれくらい時間かかるか
    • 結構多段にテーブルを使ってその中に数字があるので難しいが、大体そのジョブ全体で1時間半から2時間くらいで計算している
    • 同じことをバッチでやるとしてもかなりコンパクトに処理ができると思います

@mogmetの所感

問題を解決する上で解決がそのミドルウェアで難しい場合は他のミドルウェアを随時組み合わせて改善していくというチャレンジの精神を感じ取れるセッションだと思いました。

違った視点ですぐに舵を切り替えることも大事ですね。

  • このエントリーをはてなブックマークに追加
  • Pocket
PAGE TOP ↑