Amazon SQS のメッセージを処理するアプリケーションを本番で運用するために考えるべき基本的な 5 つのこと

Amazon SQS は可用性やスケーラビリティの高いメッセジキューサービスであり、AWS の代表的なサービスの 1 つと言えるでしょう。ところが、本番の運用に耐えられるアプリケーションにしようと思うと考えることが意外に多いものです。本エントリーでは簡単なサンプルアプリケーションをベースに、本番で運用するために考慮すべき点について見ていきます。題材として扱うのが SQS なだけで、SQS 以外を使ったアプリケーションにも応用できる内容もあるでしょう。
なお、SQS には Standard queueFIFO queue がありますが、Standard queue を使う前提とします。

アジェンダは次のとおりです。

サンプルアプリケーション

本エントリーでは次のようなアプリケーションを例にします。

  1. S3 にオブジェクトが PUT される
    • 簡単のため、新規作成のみで上書きされることはないものとする
  2. S3 notification によって SQS にメッセージが入る
  3. SQS のメッセージを処理する複数の worker が S3 オブジェクトを処理する
    • S3 オブジェクトに対応する MySQL のレコードを作成する

具体的には、次のようなコードを想定しています。これを改善していきます。

require 'active_record'
require 'aws-sdk-s3'
require 'aws-sdk-sqs'

ActiveRecord::Base.establish_connection(
  adapter:  'mysql2',
  host:     ENV['MYSQL_HOST'],
  port:     ENV['MYSQL_PORT'],
  username: ENV['MYSQL_USERNAME'],
  password: ENV['MYSQL_PASSWORD'],
  database: ENV['MYSQL_DATABASE'],
)

# This table is created by the following DSL:
#   create_table(:s3_objects) do |t|
#     t.string :bucket
#     t.string :key
#     t.boolean :processed
#     t.index [:bucket, :key], name: "ux_bucket_key", unique: true
#   end
class S3Object < ActiveRecord::Base; end

class S3ObjectProcessor
  QUEUE_URL = ENV['QUEUE_URL']

  def run
    @stop = false
    trap(:SIGTERM) { @stop = true }

    poller = Aws::SQS::QueuePoller.new(QUEUE_URL)
    poller.before_request do |stats|
      throw :stop_polling if @stop
    end
    poller.poll do |msg, stats|
      JSON.parse(msg.body)['Records'].each do |record|
        bucket = record.dig('s3', 'bucket', 'name')
        key = record.dig('s3', 'object', 'key')
        s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
        unless s3_object.processed?
          process(s3_object)
          s3_object.update!(processed: true)
        end
      rescue
      end
    end
  end

  def process(s3_object)
    Tempfile.open('s3_object') do |f|
      Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
        f.write(chunk)
      end
      f.flush
      f.rewind

      f.each do |line|
        # Do something
      end
    end
  end
end

S3ObjectProcessor.new.run

1. ログ

これは SQS を使う場合に限らないですが、本番環境では自分たちが想定していなかったトラブルに見舞われることが多々あるものです。ログはそんな不測の事態が起きた場合に原因を調査するのに非常に重要な役割を果たします。サンプルコードの run メソッドは次のようになっていました。ログの観点ではどう修正すべきだと思いますか?

def run
  @stop = false
  trap(:SIGTERM) { @stop = true }

  poller = Aws::SQS::QueuePoller.new(QUEUE)
  poller.before_request do |stats|
    throw :stop_polling if @stop
  end
  poller.poll do |msg, stats|
    JSON.parse(msg.body)['Records'].each do |record|
      bucket = record.dig('s3', 'bucket', 'name')
      key = record.dig('s3', 'object', 'key')
      s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
      unless s3_object.processed?
        process(s3_object)
        s3_object.update!(processed: true)
      end
    rescue
    end
  end
end

僕であれば例えば次のように修正します。

def run
  logger = Logger.new($stdout)

  @stop = false
  signals = Queue.new
  trap(:SIGTERM) { signals << :SIGTERM }
  Thread.new do
    sig = signals.pop
    logger.info "Received #{sig}, shutting down gracefully"
    @stop = true
  end

  poller = Aws::SQS::QueuePoller.new(QUEUE)
  poller.before_request do |stats|
    throw :stop_polling if @stop
  end
  poller.poll do |msg, stats|
    JSON.parse(msg.body)['Records'].each do |record|
      bucket = record.dig('s3', 'bucket', 'name')
      key = record.dig('s3', 'object', 'key')
      s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
      if s3_object.processed?
        logger.info "Skip processing s3://#{bucket}/#{key} because it has already been processed"
        next
      end

      logger.info "Start processing s3://#{bucket}/#{key}"
      process(s3_object)
      s3_object.update!(processed: true)
      logger.info "Finished processing s3://#{bucket}/#{key}"
    rescue => e
      logger.error "Failed to process s3://#{bucket}/#{key}: #{e} (#{e.class})"
      logger.error e.backtrace.join("\n")
    end
  end
end

これらのログがあれば、「何故か意図したとおりに処理されていないメッセージがある」という問題が発生した時に次のようなことがわかります。

  • Skip processing というログがあれば、メッセージが正常に処理されていないのに s3_object.processed? が true になるケースがあると言える
  • Failed to process というログがあれば、エラーが原因で処理されなかったことがわかる
  • Finished processing というログがあれば正常に処理されているはずなので、process(s3_object) の実装の問題か他の問題と言える
  • Start processing というログがあるが、Finished processing というログがないのであれば process(target) の中でプロセスが突然死、再処理もされていないことがわかる

もちろんログより素晴らしい仕組みが導入されているのであればそれを使えば良いです。
ログ管理サービスを利用していると、ログを吐き過ぎることで利用料金が膨れ上がる可能性もあるので注意が必要ですが、コストインパクトがほとんどない処理であれば上記ぐらいのログは吐いた方が良いでしょう。

2. At-least-once delivery と visibility timeout

SQS を利用する場合、最低限次の 3 点は抑えるべきでしょう。

  • メッセージの順番が保証されない
  • At-least-once delivery なので同じメッセージが複数の worker によって処理される可能性がある
  • visibility timeout が経過するまでにメッセージが削除されないと再度取得可能な状態になる

今回のサンプルアプリケーションでは複数 worker でメッセージを処理する想定なので、メッセージの順番が保証されないことは気にしていません。もし複数の wokrer で処理しつつも特定の基準においてある程度順番を保証したい場合(例えば同じユーザが PUT したオブジェクトはその順番で処理したい場合)は SQS ではなく Kafka などが選択肢として挙がるでしょう。

At-least-once delivery は、全く同じ message ID のメッセージが重複して処理される可能性があるということですが、もし worker 側で同じ message ID のメッセージを重複処理しないよう実装したとしても、メッセージを送る側が SQS にメッセージが送れているにも関わらず、ネットワークの不具合等でリトライして全く同じ内容を送るケースもあることは注意が必要です。

At-least-once delivery のことを考えると処理は冪等にしなければなりません。冪等性をどう担保するかはアプリケーションの特性次第なので、ここでは完全な冪等性を担保するのではなく、同じ処理が不必要に行われることを避けるために排他制御することを考えます。

サンプルコードに戻ってみると、メッセージの処理部分は次のようになっていました。これで排他制御できているでしょうか?

poller.poll do |msg, stats|
  JSON.parse(msg.body)['Records'].each do |record|
    bucket = record.dig('s3', 'bucket', 'name')
    key = record.dig('s3', 'object', 'key')
    s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
    unless s3_object.processed?
      process(s3_object)
      s3_object.update!(processed: true)
    end
  rescue
  end
end

答えは No です。というのも、s3_object.processed? の結果が S3Object.create_or_find_by! を実行した時のものだからです。排他制御するなら次のように lock を取る必要があります。

poller.poll do |msg, stats|
  JSON.parse(msg.body)['Records'].each do |record|
    bucket = record.dig('s3', 'bucket', 'name')
    key = record.dig('s3', 'object', 'key')
    begin
      S3Object.create!(bucket: bucket, key: key)
    rescue ActiveRecord::RecordNotUnique
    end

    S3Object.transaction do
      s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
      unless s3_object.processed?
        process(s3_object)
        s3_object.update!(processed: true)
      end
    end
  rescue
  end
end

もし process が lock wait timeout 以内に終わらないような重い処理の場合、ちゃんと排他制御しようと思うと次のようにしなければなりません。

poller.poll(skip_delete: true) do |msg, stats|
  skip_deletion = false
  JSON.parse(msg.body)['Records'].each do |record|
    bucket = record.dig('s3', 'bucket', 'name')
    key = record.dig('s3', 'object', 'key')
    begin
      S3Object.create!(bucket: bucket, key: key)
    rescue ActiveRecord::RecordNotUnique
    end

    skip_process = false
    s3_object = nil
    S3Object.transaction do
      s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
      if s3_object.processed?
        skip_process = true
      elsif s3_object.processing?
        skip_process = true
        skip_deletion = true
      else
        s3_object.processing!
      end
    end

    unless skip_process
      process(s3_object)
      s3_object.processed!
    end
  rescue
  end
  poller.delete_message(msg) unless skip_deletion
end

ここで、S3Object の定義を次のように変えています。

# This table is created by the following DSL:
#   create_table(:s3_objects) do |t|
#     t.string :bucket
#     t.string :key
#     t.integer :status, limit: 2, default: 0
#     t.index [:bucket, :key], name: "idx_key", unique: true
#   end
class S3Object < ActiveRecord::Base
  enum status: { unprocessed: 0, processing: 1, processed: 2 }
end

ポイントは「処理済み」だけではなく「処理中」の状態も定義し、「処理中」であればメッセージを削除しないことです。メッセージを削除してしまうと、処理中のプロセスが何らかの理由で異常終了した場合に問題になります。
この説明でピンと来たかもしれませんが、上記のコードではまだ不十分です。visibility timeout のことが考えられていないからです。もし処理中のプロセスが異常終了した場合、visibility timeout が経過しても処理されません。

visibility timeout も考慮するなら次のようにする必要があります。

poller.poll(skip_delete: true) do |msg, stats|
  skip_deletion = false
  JSON.parse(msg.body)['Records'].each do |record|
    # snip

    skip_process = false
    s3_object = nil
    S3Object.transaction do
      s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
      if s3_object.processed?
        skip_process = true
      elsif s3_object.processing? && Time.now - s3_object.start_processing_at < PROCESS_TIMEOUT
        skip_process = true
        skip_deletion = true
      else
        s3_object.update!(status: :processing, start_processing_at: Time.now)
      end
    end

    # snip
  rescue
  end
  poller.delete_message(msg) unless skip_deletion
end

s3_objects テーブルに start_processing_at カラムを追加し、processing になっていても一定時間 (PROCESS_TIMEOUT) 経過していれば再処理できるようにしたのがポイントです。処理内容によっては再処理の際に中途半端に処理されてしまったデータを破棄するといったことも必要になります。

もし process にかかる時間が読めず、visibility timeout を経過しても処理しきれないケースがあるならばもっと複雑になってきます。そのような場合は visibility timeout を延長しつつ、start_processing_at の代わりに heartbeat 的な情報を保持するカラムを追加することになるでしょう。具体的な実装は「完成版」のコードを参照してください。

3. デプロイ

アプリケーションに変更があるとデプロイする必要があります。デプロイの方法によって考慮することは変わってきますが、今回の実装ではデプロイに伴う終了時には SIGTERM を受け取ることを想定しています。例えば ECS のタスクであれば、デプロイ時に SIGTERM を受け取り、stopTimeout または ECS_CONTAINER_STOP_TIMEOUT が経過しても終了していない場合は SIGKILL が送られます。よって、timeout 以内に何らかの形で安全に終了させることが望ましいです。

サンプルアプリケーションの場合、愚直にやるなら例えば 1 行処理するごとに @stop の値を見て、true であればすぐに処理を終了するということが考えられます。

def process(s3_object)
  Tempfile.open('s3_object') do |f|
    Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
      f.write(chunk)
    end
    f.flush
    f.rewind

    f.each do |line|
      if @stop
        s3_object.unprocessed!
        break
      end
      # Do something
    end
  end
end

ただ、これだけでは不十分で、途中で処理を中断した場合はメッセージを削除しないようにしなければなりません。具体的なコードは「完成版」のコードを参照してください。

4. 異常系

異常系を全部列挙するのは不可能ですが、サンプルアプリケーションのようなものであれば次のような典型的な異常系については対策を考えて実装するべきでしょう。このような典型的なエラーで手動による復旧作業が必要となるのであれば問題です。

  • SQS のメッセージが重複して処理された場合
    • 複数の worker で同時に同じメッセージを処理することに起因するエラーも含む
    • これについては 2 で考慮済み(at-least-one delivery に対する対策)
  • ネットワークエラー
    • MySQL の failover 等も含む
  • 不正なデータが PUT された場合
    • e.g. ArgumentError: invalid byte sequence in UTF-8
  • プロセスの突然死
    • Segmentation fault, OOM Killer に殺される等
    • これについては 2 で考慮済み(visibility timeout が経過すれば再処理される)

これらの観点に立つと、2 で修正したコードにはエラーが発生してもメッセージを削除してしまうというクリティカルな問題が存在します。

poller.poll(skip_delete: true) do |msg, stats|
  skip_deletion = false
  JSON.parse(msg.body)['Records'].each do |record|
    # snip
  rescue
  end
  poller.delete_message(msg) unless skip_deletion
end

少なくともメッセージを削除しないように修正すべきでしょう。

poller.poll(skip_delete: true) do |msg, stats|
  skip_deletion = false
  JSON.parse(msg.body)['Records'].each do |record|
    # snip
  rescue
    skip_deletion = true
  end
  poller.delete_message(msg) unless skip_deletion
end

高頻度で起きるエラーに関しては visibility timeout に頼るのではなく retriable 等を使ってリトライ処理すると良いでしょう。retriable を使う場合でも、もちろんリトライ時にはログを吐くようにした方が良いです。例えば、処理が詰まっている場合に頻繁にリトライが発生していないか確認することができます。

def process
  Retriable.retriable(on: RETRIABLE_ERRORS, on_retry: method(:log_retries)) do
    # Do something
  end
end

private

def log_retries(exception, try, elapsed_time, interval)
  return unless @logger
  @logger.warn("Failed to ... (#{try}th try): #{exception} (#{exception.class})")
end

5. 監視

サンプルアプリケーションの場合、最低限次のようなことを監視すべきでしょう

  • 想定外のアプリケーションエラーが起きていないか
  • メッセージが遅延なく処理されているか
  • 処理されていない S3 オブジェクトがないか

順に見ていきます。

想定外のアプリケーションエラーが起きていないか

想定外のアプリケーションエラーが起きていないかを監視する方法として、次のような選択肢が考えられます。

  • アプリケーションログを監視する
  • エラーが起きた場合にどこかに通知する処理を書く

アプリケーションログを監視する場合、例えば fluentd でログを収集しているのであれば、特定のログにマッチした場合に Slack に通知するといったことができるでしょう。Papertrail 等のログ管理サービスを使っているのであれば、特定のパターンにマッチするログがあれば通知する仕組みを備えている場合があるので、それを利用すると良いでしょう。もちろん、エラーが起きたらちゃんとログを吐くことが前提です。

エラーが起きた場合にどこかに通知する処理を書くのも難しくないでしょう。例えば Slack に通知するのであればエラーを rescue した際に Slack に通知する処理を書くだけです。stacktrace も忘れず含めるようにしましょう。Rollbar 等のサービスを使えばより管理しやすいでしょう。もし Rollbar を Rails アプリケーションで使う場合、Rack アプリケーションや ActiveJob 等で使うのであれば明示的に通知する処理を書かなくてもエラーが通知されるようになっていますが、rails runner で実行する場合などは明示的に通知する処理を書かないといけません。なので、初めて書くアプリケーションで動作確認する場合はエラーが通知されることもしっかり確認すべきです。

メッセージが遅延なく処理されているか

メッセージが遅延なく処理されているかについては例えば SQS のメトリクスである ApproximateNumberOfMessagesVisible や ApproximateAgeOfOldestMessage を使うことができます。ApproximateAgeOfOldestMessage は非常にわかりにくい指標なので、解説は次のエントリーを参照してください。
Amazon SQS の ApproximateAgeOfOldestMessage とは何なのか?

処理されていない S3 オブジェクトがないか

処理されていない S3 オブジェクトがあるケースとして次の 2 つのケースが考えられます。

  • S3 または SQS の障害等で、PUT された S3 オブジェクトに対応するメッセージを SQS から取得できなかった
  • アプリケーションコードの想定外のエラーで未処理なのに SQS のメッセージを削除してしまった

前者については把握するのが非常に難しいです。例えば、処理した S3 オブジェクトを別の key や bucket に変更したり、メタデータを付与したりして、S3 オブジェクトを見るだけで未処理か処理済みかを判別できるようにし、定期的に未処理のオブジェクトがあれば通知する仕組みを作るのが現実的な解でしょう。
後者については、例えば s3_objects テーブルにレコードが作成されているのにいつまで経っても processed になっていないレコードがあれば通知する仕組みを作ることになるでしょう。

完成版

監視の仕組みについては別途 CloudWatch Alarm の設定や定期ジョブが必要になるので割愛しますが、上記を踏まえると最初に出てきたサンプルアプリケーションは次のようになります。リファクタリングの余地はありますが、ここまでやって本番環境で動かすコードとしては及第点と言えるでしょう。

require 'active_record'
require 'aws-sdk-s3'
require 'aws-sdk-sqs'
require 'rollbar'

ActiveRecord::Base.establish_connection(
  adapter:  'mysql2',
  host:     ENV['MYSQL_HOST'],
  port:     ENV['MYSQL_PORT'],
  username: ENV['MYSQL_USERNAME'],
  password: ENV['MYSQL_PASSWORD'],
  database: ENV['MYSQL_DATABASE'],
)

Rollbar.configure do |config|
  config.access_token = ENV['ROLLBAR_ACCESS_TOKEN']
  config.environment = ENV['ROLLBAR_ENV'] || 'development'
end

# This table is created by the following DSL:
#   create_table(:s3_objects) do |t|
#     t.string :bucket
#     t.string :key
#     t.integer :status, limit: 2, default: 0
#     t.timestamps
#     t.index [:bucket, :key], name: "ux_bucket_key", unique: true
#   end
class S3Object < ActiveRecord::Base
  enum status: { unprocessed: 0, processing: 1, processed: 2 }

  def self.uri(bucket, key)
    "s3://#{bucket}/#{key}"
  end

  def uri
    self.class.uri(bucket, key)
  end
end

class S3ObjectProcessor
  QUEUE_URL = ENV['QUEUE_URL']
  PROCESS_TIMEOUT = 60
  VISIBILITY_TIMEOUT = PROCESS_TIMEOUT * 2
  LINES_PER_HEARTBEAT = 10_000

  def initialize
    @logger = Logger.new($stdout)
  end

  def run
    @stop = false
    signals = Queue.new
    trap(:SIGTERM) { signals << :SIGTERM }
    Thread.new do
      sig = signals.pop
      @logger.info "Received #{sig}, shutting down gracefully"
      @stop = true
    end

    poller.poll(skip_delete: true, visibility_timeout: VISIBILITY_TIMEOUT) do |msg, stats|
      skip_deletion = false
      s3_object_uris = []
      records = JSON.parse(msg.body)['Records']
      if records.nil?
        @logger.warn 'Received a message without the field Records'
      else
        records.each do |record|
          bucket = record.dig('s3', 'bucket', 'name')
          key = record.dig('s3', 'object', 'key')
          if process_record(msg, bucket, key)
            s3_object_uris << S3Object.uri(bucket, key)
          else
            skip_deletion = true
          end
        rescue => e
          skip_deletion = true
          @logger.error "Failed to process #{S3Object.uri(bucket, key)}: #{e} (#{e.class})"
          @logger.error e.backtrace.join("\n")
          Rollbar.error(e, bucket: bucket, key: key)
        end
      end

      unless skip_deletion
        poller.delete_message(msg)
        @logger.info "Deleted the message for #{s3_object_uris.join(', ')}"
      end
    rescue => e
      @logger.error "Unknown error: #{e} (#{e.class})"
      @logger.error e.backtrace.join("\n")
      Rollbar.error(e)
    end
  end

  private

  def poller
    @poller ||= Aws::SQS::QueuePoller.new(QUEUE_URL).tap do |p|
      p.before_request do |stats|
        throw :stop_polling if @stop
      end
    end
  end

  def process_record(msg, bucket, key)
    begin
      S3Object.create!(bucket: bucket, key: key)
    rescue ActiveRecord::RecordNotUnique
    end

    skip_process = false
    delete_message = true
    s3_object = nil
    S3Object.transaction do
      s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
      if s3_object.processed?
        skip_process = true
      elsif s3_object.processing? && Time.now - s3_object.updated_at < PROCESS_TIMEOUT
        skip_process = true
        delete_message = false
      else
        s3_object.update!(status: :processing, updated_at: Time.now)
      end
    end

    if skip_process
      @logger.info "Skip processing #{s3_object.uri} because it has already been processed"
      return delete_message
    end

    @logger.info "Start processing #{s3_object.uri}"
    process_s3_object(s3_object) do
      if @stop
        s3_object.unprocessed!
        @logger.info "Changed the status to 'unprocessed' for #{s3_object.uri}"
        return false
      end

      s3_object.touch
      poller.change_message_visibility_timeout(msg, VISIBILITY_TIMEOUT)
      @logger.info "Updated the message visibility timeout for #{s3_object.uri}"
    end

    s3_object.processed!
    @logger.info "Finished processing #{s3_object.uri}"
  end

  def process_s3_object(s3_object)
    Tempfile.open('s3_object') do |f|
      Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
        f.write(chunk)
      end
      f.flush
      f.rewind

      f.each.with_index(1) do |line, i|
        yield if (i % LINES_PER_HEARTBEAT).zero?

        # Do something
      rescue ArgumentError => e
        if e.message != 'invalid byte sequence in UTF-8'
          raise
        end
        @logger.warn "Skip the line at #{i} due to invalid byte sequence"
      end
    end
  end
end

S3ObjectProcessor.new.run

最初のコードと比べるとかなり複雑になりました。これに加えて監視のためのコードも必要です。ここまでしても、実際に本番で運用すると不測の事態が起きてトラブルが起きる発生する可能性があると思います。上記のコードはあくまで僕が想像力を働かせながら書いただけのコードなので。要件によってはここまでやる必要はないでしょうが、少なくともここまで考えた上で最終的にどういう実装にするか考えるべきです。