Amazon SQS を使ったアプリケーションを本番で運用する際に考慮すべき基本的な 5 つのこと
Amazon SQS は可用性やスケーラビリティの高いメッセジキューサービスであり、AWS の代表的なサービスの 1 つと言えるでしょう。ところが、本番の運用に耐えられるアプリケーションにしようと思うと考えることが意外に多いものです。本エントリーでは簡単なサンプルアプリケーションをベースに、本番で運用するために考慮すべき点・注意点について見ていきます。題材として扱うのが SQS なだけで、SQS 以外を使ったアプリケーションにも応用できる内容もあるでしょう。
なお、SQS には Standard queue と FIFO queue がありますが、Standard queue を使う前提とします。
アジェンダは次のとおりです。
サンプルアプリケーション
本エントリーでは次のようなアプリケーションを例にします。
- S3 にオブジェクトが PUT される
- 簡単のため、新規作成のみで上書きされることはないものとする
- S3 notification によって SQS にメッセージが入る
- SQS のメッセージを処理する複数の worker が S3 オブジェクトを処理する
- S3 オブジェクトに対応する MySQL のレコードを作成する
具体的には、次のようなコードを想定しています。これを改善していきます。
require 'active_record'
require 'aws-sdk-s3'
require 'aws-sdk-sqs'
ActiveRecord::Base.establish_connection(
adapter: 'mysql2',
host: ENV['MYSQL_HOST'],
port: ENV['MYSQL_PORT'],
username: ENV['MYSQL_USERNAME'],
password: ENV['MYSQL_PASSWORD'],
database: ENV['MYSQL_DATABASE'],
)
# This table is created by the following DSL:
# create_table(:s3_objects) do |t|
# t.string :bucket
# t.string :key
# t.boolean :processed
# t.index [:bucket, :key], name: "ux_bucket_key", unique: true
# end
class S3Object < ActiveRecord::Base; end
class S3ObjectProcessor
QUEUE_URL = ENV['QUEUE_URL']
def run
@stop = false
trap(:SIGTERM) { @stop = true }
poller = Aws::SQS::QueuePoller.new(QUEUE_URL)
poller.before_request do |stats|
throw :stop_polling if @stop
end
poller.poll do |msg, stats|
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
unless s3_object.processed?
process(s3_object)
s3_object.update!(processed: true)
end
rescue
end
end
end
def process(s3_object)
Tempfile.open('s3_object') do |f|
Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
f.write(chunk)
end
f.flush
f.rewind
f.each do |line|
# Do something
end
end
end
end
S3ObjectProcessor.new.run
1. ログ
これは SQS を使う場合に限らないですが、本番環境では自分たちが想定していなかったトラブルに見舞われることが多々あるものです。ログはそんな不測の事態が起きた場合に原因を調査するのに非常に重要な役割を果たします。サンプルコードの run
メソッドは次のようになっていました。ログの観点ではどう修正すべきだと思いますか?
def run
@stop = false
trap(:SIGTERM) { @stop = true }
poller = Aws::SQS::QueuePoller.new(QUEUE)
poller.before_request do |stats|
throw :stop_polling if @stop
end
poller.poll do |msg, stats|
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
unless s3_object.processed?
process(s3_object)
s3_object.update!(processed: true)
end
rescue
end
end
end
僕であれば例えば次のように修正します。
def run
logger = Logger.new($stdout)
@stop = false
signals = Queue.new
trap(:SIGTERM) { signals << :SIGTERM }
Thread.new do
sig = signals.pop
logger.info "Received #{sig}, shutting down gracefully"
@stop = true
end
poller = Aws::SQS::QueuePoller.new(QUEUE)
poller.before_request do |stats|
throw :stop_polling if @stop
end
poller.poll do |msg, stats|
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
if s3_object.processed?
logger.info "Skip processing s3://#{bucket}/#{key} because it has already been processed"
next
end
logger.info "Start processing s3://#{bucket}/#{key}"
process(s3_object)
s3_object.update!(processed: true)
logger.info "Finished processing s3://#{bucket}/#{key}"
rescue => e
logger.error "Failed to process s3://#{bucket}/#{key}: #{e} (#{e.class})"
logger.error e.backtrace.join("\n")
end
end
end
これらのログがあれば、「何故か意図したとおりに処理されていないメッセージがある」という問題が発生した時に次のようなことがわかります。
- Skip processing というログがあれば、メッセージが正常に処理されていないのに
s3_object.processed?
が true になるケースがあると言える - Failed to process というログがあれば、エラーが原因で処理されなかったことがわかる
- Finished processing というログがあれば正常に処理されているはずなので、
process(s3_object)
の実装の問題か他の問題と言える - Start processing というログがあるが、Finished processing というログがないのであれば
process(target)
の中でプロセスが突然死、再処理もされていないことがわかる
もちろんログより素晴らしい仕組みが導入されているのであればそれを使えば良いです。
ログ管理サービスを利用していると、ログを吐き過ぎることで利用料金が膨れ上がる可能性もあるので注意が必要ですが、コストインパクトがほとんどない処理であれば上記ぐらいのログは吐いた方が良いでしょう。
2. At-least-once delivery と visibility timeout
SQS を利用する場合、最低限次の 3 点は抑えるべきでしょう。
- メッセージの順番が保証されない
- At-least-once delivery なので同じメッセージが複数の worker によって処理される可能性がある
- visibility timeout が経過するまでにメッセージが削除されないと再度取得可能な状態になる
今回のサンプルアプリケーションでは複数 worker でメッセージを処理する想定なので、メッセージの順番が保証されないことは気にしていません。もし複数の wokrer で処理しつつも特定の基準においてある程度順番を保証したい場合(例えば同じユーザが PUT したオブジェクトはその順番で処理したい場合)は SQS ではなく Kafka などが選択肢として挙がるでしょう。
At-least-once delivery は、全く同じ message ID のメッセージが重複して処理される可能性があるということですが、もし worker 側で同じ message ID のメッセージを重複処理しないよう実装したとしても、メッセージを送る側が SQS にメッセージが送れているにも関わらず、ネットワークの不具合等でリトライして全く同じ内容を送るケースもあることは注意が必要です。
At-least-once delivery のことを考えると処理は冪等にしなければなりません。冪等性をどう担保するかはアプリケーションの特性次第なので、ここでは完全な冪等性を担保するのではなく、同じ処理が不必要に行われることを避けるために排他制御することを考えます。
サンプルコードに戻ってみると、メッセージの処理部分は次のようになっていました。これで排他制御できているでしょうか?
poller.poll do |msg, stats|
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
s3_object = S3Object.create_or_find_by!(bucket: bucket, key: key)
unless s3_object.processed?
process(s3_object)
s3_object.update!(processed: true)
end
rescue
end
end
答えは No です。というのも、s3_object.processed?
の結果が S3Object.create_or_find_by!
を実行した時のものだからです。排他制御するなら次のように lock を取る必要があります。
poller.poll do |msg, stats|
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
begin
S3Object.create!(bucket: bucket, key: key)
rescue ActiveRecord::RecordNotUnique
end
S3Object.transaction do
s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
unless s3_object.processed?
process(s3_object)
s3_object.update!(processed: true)
end
end
rescue
end
end
もし process
が lock wait timeout 以内に終わらないような重い処理の場合、ちゃんと排他制御しようと思うと次のようにしなければなりません。
poller.poll(skip_delete: true) do |msg, stats|
skip_deletion = false
JSON.parse(msg.body)['Records'].each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
begin
S3Object.create!(bucket: bucket, key: key)
rescue ActiveRecord::RecordNotUnique
end
skip_process = false
s3_object = nil
S3Object.transaction do
s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
if s3_object.processed?
skip_process = true
elsif s3_object.processing?
skip_process = true
skip_deletion = true
else
s3_object.processing!
end
end
unless skip_process
process(s3_object)
s3_object.processed!
end
rescue
end
poller.delete_message(msg) unless skip_deletion
end
ここで、S3Object
の定義を次のように変えています。
# This table is created by the following DSL:
# create_table(:s3_objects) do |t|
# t.string :bucket
# t.string :key
# t.integer :status, limit: 2, default: 0
# t.index [:bucket, :key], name: "idx_key", unique: true
# end
class S3Object < ActiveRecord::Base
enum status: { unprocessed: 0, processing: 1, processed: 2 }
end
ポイントは「処理済み」だけではなく「処理中」の状態も定義し、「処理中」であればメッセージを削除しないことです。メッセージを削除してしまうと、処理中のプロセスが何らかの理由で異常終了した場合に問題になります。
この説明でピンと来たかもしれませんが、上記のコードではまだ不十分です。visibility timeout のことが考えられていないからです。もし処理中のプロセスが異常終了した場合、visibility timeout が経過しても処理されません。
visibility timeout も考慮するなら次のようにする必要があります。
poller.poll(skip_delete: true) do |msg, stats|
skip_deletion = false
JSON.parse(msg.body)['Records'].each do |record|
# snip
skip_process = false
s3_object = nil
S3Object.transaction do
s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
if s3_object.processed?
skip_process = true
elsif s3_object.processing? && Time.now - s3_object.start_processing_at < PROCESS_TIMEOUT
skip_process = true
skip_deletion = true
else
s3_object.update!(status: :processing, start_processing_at: Time.now)
end
end
# snip
rescue
end
poller.delete_message(msg) unless skip_deletion
end
s3_objects テーブルに start_processing_at
カラムを追加し、processing になっていても一定時間 (PROCESS_TIMEOUT
) 経過していれば再処理できるようにしたのがポイントです。処理内容によっては再処理の際に中途半端に処理されてしまったデータを破棄するといったことも必要になります。
もし process
にかかる時間が読めず、visibility timeout を経過しても処理しきれないケースがあるならばもっと複雑になってきます。そのような場合は visibility timeout を延長しつつ、start_processing_at
の代わりに heartbeat 的な情報を保持するカラムを追加することになるでしょう。具体的な実装は「完成版」のコードを参照してください。
3. デプロイ
アプリケーションに変更があるとデプロイする必要があります。デプロイの方法によって考慮することは変わってきますが、今回の実装ではデプロイに伴う終了時には SIGTERM を受け取ることを想定しています。例えば ECS のタスクであれば、デプロイ時に SIGTERM を受け取り、stopTimeout
または ECS_CONTAINER_STOP_TIMEOUT
が経過しても終了していない場合は SIGKILL が送られます。よって、timeout 以内に何らかの形で安全に終了させることが望ましいです。
サンプルアプリケーションの場合、愚直にやるなら例えば 1 行処理するごとに @stop
の値を見て、true であればすぐに処理を終了するということが考えられます。
def process(s3_object)
Tempfile.open('s3_object') do |f|
Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
f.write(chunk)
end
f.flush
f.rewind
f.each do |line|
if @stop
s3_object.unprocessed!
break
end
# Do something
end
end
end
ただ、これだけでは不十分で、途中で処理を中断した場合はメッセージを削除しないようにしなければなりません。具体的なコードは「完成版」のコードを参照してください。
4. 異常系
異常系を全部列挙するのは不可能ですが、サンプルアプリケーションのようなものであれば次のような典型的な異常系については対策を考えて実装するべきでしょう。このような典型的なエラーで手動による復旧作業が必要となるのであれば問題です。
- SQS のメッセージが重複して処理された場合
- 複数の worker で同時に同じメッセージを処理することに起因するエラーも含む
- これについては 2 で考慮済み(at-least-one delivery に対する対策)
- ネットワークエラー
- MySQL の failover 等も含む
- 不正なデータが PUT された場合
- e.g. ArgumentError: invalid byte sequence in UTF-8
- プロセスの突然死
- Segmentation fault, OOM Killer に殺される等
- これについては 2 で考慮済み(visibility timeout が経過すれば再処理される)
これらの観点に立つと、2 で修正したコードにはエラーが発生してもメッセージを削除してしまうというクリティカルな問題が存在します。
poller.poll(skip_delete: true) do |msg, stats|
skip_deletion = false
JSON.parse(msg.body)['Records'].each do |record|
# snip
rescue
end
poller.delete_message(msg) unless skip_deletion
end
少なくともメッセージを削除しないように修正すべきでしょう。
poller.poll(skip_delete: true) do |msg, stats|
skip_deletion = false
JSON.parse(msg.body)['Records'].each do |record|
# snip
rescue
skip_deletion = true
end
poller.delete_message(msg) unless skip_deletion
end
高頻度で起きるエラーに関しては visibility timeout に頼るのではなく retriable 等を使ってリトライ処理すると良いでしょう。retriable を使う場合でも、もちろんリトライ時にはログを吐くようにした方が良いです。例えば、処理が詰まっている場合に頻繁にリトライが発生していないか確認することができます。
def process
Retriable.retriable(on: RETRIABLE_ERRORS, on_retry: method(:log_retries)) do
# Do something
end
end
private
def log_retries(exception, try, elapsed_time, interval)
return unless @logger
@logger.warn("Failed to ... (#{try}th try): #{exception} (#{exception.class})")
end
5. 監視
サンプルアプリケーションの場合、最低限次のようなことを監視すべきでしょう
- 想定外のアプリケーションエラーが起きていないか
- メッセージが遅延なく処理されているか
- 処理されていない S3 オブジェクトがないか
順に見ていきます。
想定外のアプリケーションエラーが起きていないか
想定外のアプリケーションエラーが起きていないかを監視する方法として、次のような選択肢が考えられます。
- アプリケーションログを監視する
- エラーが起きた場合にどこかに通知する処理を書く
アプリケーションログを監視する場合、例えば fluentd でログを収集しているのであれば、特定のログにマッチした場合に Slack に通知するといったことができるでしょう。Papertrail 等のログ管理サービスを使っているのであれば、特定のパターンにマッチするログがあれば通知する仕組みを備えている場合があるので、それを利用すると良いでしょう。もちろん、エラーが起きたらちゃんとログを吐くことが前提です。
エラーが起きた場合にどこかに通知する処理を書くのも難しくないでしょう。例えば Slack に通知するのであればエラーを rescue した際に Slack に通知する処理を書くだけです。stacktrace も忘れず含めるようにしましょう。Rollbar 等のサービスを使えばより管理しやすいでしょう。もし Rollbar を Rails アプリケーションで使う場合、Rack アプリケーションや ActiveJob 等で使うのであれば明示的に通知する処理を書かなくてもエラーが通知されるようになっていますが、rails runner
で実行する場合などは明示的に通知する処理を書かないといけません。なので、初めて書くアプリケーションで動作確認する場合はエラーが通知されることもしっかり確認すべきです。
メッセージが遅延なく処理されているか
メッセージが遅延なく処理されているかについては例えば SQS のメトリクスである ApproximateNumberOfMessagesVisible や ApproximateAgeOfOldestMessage を使うことができます。ApproximateAgeOfOldestMessage は非常にわかりにくい指標なので、解説は次のエントリーを参照してください。
Amazon SQS の ApproximateAgeOfOldestMessage とは何なのか?
処理されていない S3 オブジェクトがないか
処理されていない S3 オブジェクトがあるケースとして次の 2 つのケースが考えられます。
- S3 または SQS の障害等で、PUT された S3 オブジェクトに対応するメッセージを SQS から取得できなかった
- アプリケーションコードの想定外のエラーで未処理なのに SQS のメッセージを削除してしまった
前者については把握するのが非常に難しいです。例えば、処理した S3 オブジェクトを別の key や bucket に変更したり、メタデータを付与したりして、S3 オブジェクトを見るだけで未処理か処理済みかを判別できるようにし、定期的に未処理のオブジェクトがあれば通知する仕組みを作るのが現実的な解でしょう。
後者については、例えば s3_objects テーブルにレコードが作成されているのにいつまで経っても processed になっていないレコードがあれば通知する仕組みを作ることになるでしょう。
完成版
監視の仕組みについては別途 CloudWatch Alarm の設定や定期ジョブが必要になるので割愛しますが、上記を踏まえると最初に出てきたサンプルアプリケーションは次のようになります。リファクタリングの余地はありますが、ここまでやって本番環境で動かすコードとしては及第点と言えるでしょう。
require 'active_record'
require 'aws-sdk-s3'
require 'aws-sdk-sqs'
require 'rollbar'
ActiveRecord::Base.establish_connection(
adapter: 'mysql2',
host: ENV['MYSQL_HOST'],
port: ENV['MYSQL_PORT'],
username: ENV['MYSQL_USERNAME'],
password: ENV['MYSQL_PASSWORD'],
database: ENV['MYSQL_DATABASE'],
)
Rollbar.configure do |config|
config.access_token = ENV['ROLLBAR_ACCESS_TOKEN']
config.environment = ENV['ROLLBAR_ENV'] || 'development'
end
# This table is created by the following DSL:
# create_table(:s3_objects) do |t|
# t.string :bucket
# t.string :key
# t.integer :status, limit: 2, default: 0
# t.timestamps
# t.index [:bucket, :key], name: "ux_bucket_key", unique: true
# end
class S3Object < ActiveRecord::Base
enum status: { unprocessed: 0, processing: 1, processed: 2 }
def self.uri(bucket, key)
"s3://#{bucket}/#{key}"
end
def uri
self.class.uri(bucket, key)
end
end
class S3ObjectProcessor
QUEUE_URL = ENV['QUEUE_URL']
PROCESS_TIMEOUT = 60
VISIBILITY_TIMEOUT = PROCESS_TIMEOUT * 2
LINES_PER_HEARTBEAT = 10_000
def initialize
@logger = Logger.new($stdout)
end
def run
@stop = false
signals = Queue.new
trap(:SIGTERM) { signals << :SIGTERM }
Thread.new do
sig = signals.pop
@logger.info "Received #{sig}, shutting down gracefully"
@stop = true
end
poller.poll(skip_delete: true, visibility_timeout: VISIBILITY_TIMEOUT) do |msg, stats|
skip_deletion = false
s3_object_uris = []
records = JSON.parse(msg.body)['Records']
if records.nil?
@logger.warn 'Received a message without the field Records'
else
records.each do |record|
bucket = record.dig('s3', 'bucket', 'name')
key = record.dig('s3', 'object', 'key')
if process_record(msg, bucket, key)
s3_object_uris << S3Object.uri(bucket, key)
else
skip_deletion = true
end
rescue => e
skip_deletion = true
@logger.error "Failed to process #{S3Object.uri(bucket, key)}: #{e} (#{e.class})"
@logger.error e.backtrace.join("\n")
Rollbar.error(e, bucket: bucket, key: key)
end
end
unless skip_deletion
poller.delete_message(msg)
@logger.info "Deleted the message for #{s3_object_uris.join(', ')}"
end
rescue => e
@logger.error "Unknown error: #{e} (#{e.class})"
@logger.error e.backtrace.join("\n")
Rollbar.error(e)
end
end
private
def poller
@poller ||= Aws::SQS::QueuePoller.new(QUEUE_URL).tap do |p|
p.before_request do |stats|
throw :stop_polling if @stop
end
end
end
def process_record(msg, bucket, key)
begin
S3Object.create!(bucket: bucket, key: key)
rescue ActiveRecord::RecordNotUnique
end
skip_process = false
delete_message = true
s3_object = nil
S3Object.transaction do
s3_object = S3Object.lock.find_by!(bucket: bucket, key: key)
if s3_object.processed?
skip_process = true
elsif s3_object.processing? && Time.now - s3_object.updated_at < PROCESS_TIMEOUT
skip_process = true
delete_message = false
else
s3_object.update!(status: :processing, updated_at: Time.now)
end
end
if skip_process
@logger.info "Skip processing #{s3_object.uri} because it has already been processed"
return delete_message
end
@logger.info "Start processing #{s3_object.uri}"
process_s3_object(s3_object) do
if @stop
s3_object.unprocessed!
@logger.info "Changed the status to 'unprocessed' for #{s3_object.uri}"
return false
end
s3_object.touch
poller.change_message_visibility_timeout(msg, VISIBILITY_TIMEOUT)
@logger.info "Updated the message visibility timeout for #{s3_object.uri}"
end
s3_object.processed!
@logger.info "Finished processing #{s3_object.uri}"
end
def process_s3_object(s3_object)
Tempfile.open('s3_object') do |f|
Aws::S3::Client.new.get_object(bucket: s3_object.bucket, key: s3_object.key) do |chunk, _|
f.write(chunk)
end
f.flush
f.rewind
f.each.with_index(1) do |line, i|
yield if (i % LINES_PER_HEARTBEAT).zero?
# Do something
rescue ArgumentError => e
if e.message != 'invalid byte sequence in UTF-8'
raise
end
@logger.warn "Skip the line at #{i} due to invalid byte sequence"
end
end
end
end
S3ObjectProcessor.new.run
最初のコードと比べるとかなり複雑になりました。これに加えて監視のためのコードも必要です。ここまでしても、実際に本番で運用すると不測の事態が起きてトラブルが起きる発生する可能性があると思います。上記のコードはあくまで僕が想像力を働かせながら書いただけのコードなので。要件によってはここまでやる必要はないでしょうが、少なくともここまで考えた上で最終的にどういう実装にするか考えるべきです。