yasutomogのブログ

Software Engineerの雑記

Lumen(Laravel)のQueue管理クラス(DB)の拡張方法

概要

  • Lumenを使用したWebアプリを運用している中で、Queue(DB)を使用
  • DBは、Azure SQL Databaseを使用
  • Azure SQL Databaseでは一時的に負荷が上がると、強制的に接続を切断しDB側の再構成が走る
    DB接続をキャッシュしたり、コネクションプーリングなどを使用している場合、古いDB接続が利用されるとDB接続エラーが発生する docs.microsoft.com
  • LumenのQueue(DB)では、DB接続をキャッシュして使用しているため、デフォルトのまま使用しているとDB負荷が上がったときにDB接続エラーが連続して発生するようになる yasutomo.hatenablog.com

前提

  • lumen:5.5.7
  • PHP:7.1
  • php artisan queue:work のコマンド実行時

詳細

Illuminate\Queue\DatabaseQueueを継承したクラスを作成

<?php

namespace App\Libs;

use Illuminate\Queue\DatabaseQueue;

class CustomDatabaseQueue extends DatabaseQueue
{

    // DB再接続時のwait秒
    const RETRY_WAIT_SEC = 10;

    // DB再接続用のSQL STATEコード
    const RETRY_SQL_STATE = ["08S01", "HY000"];

    /**
     * DBへの再接続処理を加えたoverride関数
     *
     * @see DatabaseQueue::pop()
     * @param  string  $queue
     * @return \Illuminate\Contracts\Queue\Job|null
     * @throws \Exception|\Throwable
     */
    public function pop($queue = null)
    {
        $queue = $this->getQueue($queue);

        try {

            try {

                // DB負荷が高くなるとAzure SQL Databaseは、強制的に切断しDB再構築する
                // Lumenの標準実装では、DB接続インスタンスを保持して使いまわしているため
                // DB再構築後も、切断されたDB接続インスタンスを使用しようとするため
                // キュー管理テーブルを参照するタイミングでDB接続エラーが発生し続ける
                $this->database->beginTransaction();

            } catch (\Throwable $e) {

                \Log::info($e);

                if (property_exists($e, 'errorInfo')) {

                    $sqlstate = $e->errorInfo[0];
                    $sqlerrcd = $e->errorInfo[1];

                    \Log::info("SQLSTATE::" . $sqlstate);
                    \Log::info("SQLERRCD::" . $sqlerrcd);

                    // DBのコネクションプーリングを使用していない場合でも
                    // 連続して接続エラーが発生するケースがある(Azureサポート)
                    // MSサイトから再接続までには最低でも5秒は間隔を空けることを推奨しているので
                    // 余裕をもって10秒後に再接続する処理
                    // https://docs.microsoft.com/ja-jp/azure/sql-database/sql-database-develop-error-messages

                    if (in_array($sqlstate, self::RETRY_SQL_STATE)) {

                        \Log::info('キュー管理テーブル参照時にDB接続エラーが発生したので、10秒後にDB再接続します。');
                        sleep(10);
                        $this->database->reconnect();
                        $this->database->beginTransaction();
                        \Log::info('キュー管理テーブルの再接続に成功しました。');

                    } else {

                        throw $e;

                    }

                } else {

                    throw $e;

                }

            }

            if ($job = $this->getNextAvailableJob($queue)) {
                return $this->marshalJob($queue, $job);
            }

            $this->database->commit();
        } catch (\Throwable $e) {
            $this->database->rollBack();

            throw $e;
        }
    }
}

Illuminate\Queue\Connectors\DatabaseConnectorを継承したクラスを作成

<?php

namespace App\Libs;

use Illuminate\Queue\Connectors\DatabaseConnector;

/**
 * キュー管理用のDBコネクタクラス
 *
 * Class CustomDatabaseConnector
 * @package App\Libs
 */
class CustomDatabaseConnector extends DatabaseConnector
{
    /**
     * returnするクラスを変更したoverride関数
     *
     * @see DatabaseConnector::connect()
     * @param  array  $config
     * @return \Illuminate\Contracts\Queue\Queue
     */
    public function connect(array $config)
    {
        return new CustomDatabaseQueue(
            $this->connections->connection($config['connection'] ?? null),
            $config['table'],
            $config['queue'],
            $config['retry_after'] ?? 60
        );
    }
}

App\Providers\AppServiceProviderの改修

<?php

namespace App\Providers;

use Illuminate\Support\ServiceProvider;
use App\Libs\CustomDatabaseConnector;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     *
     * @return void
     */
    public function register()
    {
        // キュー管理のDBコネクタ登録
        $queueManager = $this->app['queue'];
        $queueManager->addConnector('customdatabase', function () {
            return new CustomDatabaseConnector($this->app['db']);
        });
    }
}

api/config/queue.php

<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Default Queue Driver
    |--------------------------------------------------------------------------
    |
    | The Laravel queue API supports a variety of back-ends via an unified
    | API, giving you convenient access to each back-end using the same
    | syntax for each one. Here you may set the default queue driver.
    |
    | Supported: "null", "sync", "database", "beanstalkd", "sqs", "redis"
    |
    */

    'default' => env('QUEUE_DRIVER', 'customdatabase'),

    /*
    |--------------------------------------------------------------------------
    | Queue Connections
    |--------------------------------------------------------------------------
    |
    | Here you may configure the connection information for each server that
    | is used by your application. A default configuration has been added
    | for each back-end shipped with Laravel. You are free to add more.
    |
    */

    'connections' => [

        'customdatabase' => [
            'connection' => 'hoge',
            'driver' => 'customdatabase',
            'table' => 'QueueJobs',
            'queue' => 'default',
            'retry_after' => 60,
        ]

    ],

];

artisanコマンドを変更

  • php artisan queue:work customdatabase