ロコガイド テックブログ

「地域のくらしを、かしこく、たのしく」する、株式会社ロコガイドの技術部ブログです。主にトクバイ・ロコナビのサービス開発について発信しています。

「地域のくらしを、かしこく、たのしく」する、株式会社ロコガイドの技術部ブログです。
主にトクバイ・ロコナビのサービス開発について発信しています。

gzipで圧縮された大きなログファイルを省メモリで処理できるようにした話

f:id:fukajun:20200619141038p:plain

どーも、 fukajun です。持っている低温調理器の主な使いみちは、自家製ヨーグルト(2kg)と鶏ハム(2kg)の製造です。最近は、ロコガイドのインフラをみつつ、社内のデータ基盤の整備なんかをやっています。
今回は、社内で開発したログ変換処理のメモリ使用量を削減した話を書きたいと思います。

背景

ロコガイドでは、トクバイやロコナビなどサービス上の行動データやアクセスログなどをS3に蓄積し、Redshift Spectrumを利用した分析に活用しています。
そのためにfluentdから送信されてくるjsonl.gz形式のログファイルを、Redshift Spectrumから参照するのに効率が良いフォーマットsnappy.parquet形式へ変換する処理が日々動いています。 変換処理自体は、データ処理周り(PyArrow,Pandasなど)がこなれてそうなPythonで書いています。
ログデータのサイズは、目的によって大小様々ですが、具体的には、圧縮後のサイズで1MB程度のものや300MBを超えるものまであります。

起きていた問題

そんな中で、ログファイルのフォーマット変換処理に大きくメモリー消費する状態が続いていました。
これにより、ログの変換タスクが滞留してしまい後続の変換タスクが遅延したり、メモリ不足で変換処理が強制停止されたりする問題が起きていました。

何が起きていたのか?

まずは原因の把握ということで、各処理ごとにメモリー使用量を計測してみました。
結果、全体の処理の中でもgzファイルを読み込む部分で、メモリの消費が非常に大きくなることがわかりました。
この読み込みでは、JSONをパースするために、解凍後のgzファイルの全内容を一度にメモリに載せていました。
解凍した全内容を読み込むこと自体は、想定通りでしたが、当初の想定よりもサイズの大きなログファイルが存在 していたことが問題を引き起こしていました。

問題を改善した方法

結論から書くと、1つのgzファイルを分割して少しずつ読み込む方法に切り替えることでこの問題への対応を行いました。
この対応は、次の2つのメソッドを組み合わせることで行うことができるようになりました

  • GzipFile.read()
  • str.splitlines()

GzipFile.read()について

元のコードでもgzファイルの内容をメモリ上に解凍するために利用していたメソッドですが、 一度リファレンスを読み直したところ、読み込みサイズを引数で指定することでそのサイズに近いバイト数ずつ読み込める事がわかりました。
この動作を利用することで、gzファイルの内容を少しずつ読み出すことができ、大きく使用するメモリを減らすことができました。
もともとgzファイルを部分的に読むことなんてできないのでは?という思い込みがあり考えてもみませんでしたが、リファレンスを当たることの大事さを改めて感じますね。

全て一度に読み込んでいたとき

Line #    Mem usage    Increment   Line Contents
================================================
    14     12.2 MiB     12.2 MiB   @profile
    15                             def run():
    16     12.2 MiB      0.0 MiB     file_path = sys.argv[1]
    17     12.2 MiB      0.0 MiB     with gzip.open(file_path) as f:
    18     12.2 MiB      0.0 MiB       while True:
    19   1905.6 MiB   1893.5 MiB         cont = f.read()
    20   1905.6 MiB      0.0 MiB         if cont:
    21   1905.7 MiB      0.0 MiB           print(len(cont))
    22                                   else:
    23    133.5 MiB      0.0 MiB           break

少しずつ読み込むように変更したとき

Line #    Mem usage    Increment   Line Contents
================================================
    14     12.2 MiB     12.2 MiB   @profile
    15                             def run():
    16     12.2 MiB      0.0 MiB     raw_read_size  = 1024 * 1024 * 50
    17     12.2 MiB      0.0 MiB     file_path = sys.argv[1]
    18     12.2 MiB      0.0 MiB     with gzip.open(file_path) as f:
    19     12.2 MiB      0.0 MiB       while True:
    20    112.5 MiB     50.1 MiB         cont = f.read(raw_read_size)
    21    112.5 MiB      0.0 MiB         if cont:
    22    112.5 MiB      0.0 MiB           print(len(cont))
    23                                   else:
    24     84.7 MiB      0.0 MiB           break

str.readlines()について

ここまでの対応で、メモリの使用量をかなり抑えることができるようになりました。
しかし、GzipFile.readでサイズを指定したことにより、行末まで読み出されていない不完全な行についてはJSONとして正しくパースできません。
この対策として、strのsplitlinesメソッドを利用しました。
単に行単位で分割しているだけです。
分割して読み込んだ場合、不完全な可能性のある行は、最後の1行のみなので行ごとに分割した上で、最終行以外を正常な行としてパースするようにします。
最終行に関しては、次の分割結果の最初の行と結合して正常なデータにします。
あとは繰り返しです。

動くようになった

以上で、gzファイルで圧縮されたファイルを、少しずつ解凍して読み込みつつ、JSONLのログデータを正しい内容で最後まで処理できるようになりました。
余談ですが、実はこのブログの執筆中に、該当部分の周辺で不必要に複雑な処理を行っていることがわかり、ロジックを見直すきっかけになりました。普段の業務ではついつい忘れがちですが、プログラミング言語以外でロジックを整理してみるって大切ですね。
今回の実装を簡略化したコードを載せておきます。イメージをつかめれば幸いです。

#!/usr/bin/env python

import gzip
import sys
import json


def run():
  raw_read_size = 1024 * 1024 * 50
  file_path = sys.argv[1]

  line_count = 0
  last_line = ''
  with gzip.open(file_path) as f:

    while True:
      cont = f.read(raw_read_size)
      if not cont:
        cont = b''

      text = last_line + cont.decode('UTF-8', 'replace')
      loaded_lines = text.splitlines(True)

      if len(loaded_lines) >= 2:
        lines = loaded_lines[0:-1]
        last_line = loaded_lines[-1]
      else:
        lines = loaded_lines

      for line in lines:
        json.loads(line)

      line_count += len(lines)
      print(line_count)

      if not cont:
        break

  print(line_count)

run()

最後に

ちょっとした変更でメモリ使用量を抑えつつ処理できるように改善できました。
今回はメソッドのオプションを発見することが処理の実現に繋がりました。
困ったとき、先入観を捨ててリファレンスやライブラリコードを読み返してみてはいががでしょうか?
解決のきっかけになるかもしれません。
この記事が同様の問題で悩んでいる人の助けになれば幸いです。