Thinkphp的消息队列是基于queue的,先检查是否安装了queue
如果没有安装则使用下方代码进行安装 根据实际情况决定是否使用指定版本号

composer require topthink/think-queue:v1.1.6

配置队列链接信息queue.php文件

fastadmin 文件位置application/extra/queue.php
原生think的基本都位于config目录下

配置内容如下

return [
    //'connector' => 'Sync'
    'connector' => 'redis',         // 队列驱动使用 redis 推荐, 可选 database
    'host' => '127.0.0.1',          // redis 主机地址
    'password' => 'xiaotao',             // redis 密码
    'port' => 6379,                     // redis 端口
    'select' => 2,                   // redis db 库, 建议显示指定 1-15 的数字均可,如果缓存驱动是 redis,避免和缓存驱动 select 冲突
    'timeout' => 0,                     // redis 超时时间
    'persistent' => false,              // redis 持续性,连接复用
];

生产端 任意位置 执行下方代码调用

use think\Queue;
use app\common\job\Test;//改为消息队列任务的实际位置
 /**
     * 给用户发送消息
     * 生产端,将消息加入队列
     */
    public function sendMsg()
    {
        //消息内容
        $msgData = [
            'user_id' => 1,
            'time' => date('Y-m-d H:i:s'),
            'msg' => 'welcome to badianboke.com'
        ];
        //队列名称
        $queueName = 'badianbokeSendMsg';
        //加入队列
        Queue::push(Test::class, $msgData, $queueName);

    }

     public function register()
    {
        //1、用户注册成功
        //User::register(); //模拟用户注册成功

        //2、给用户发送消息
        $this->sendMsg();

        $this->success('注册成功');
    }

消息队列任务代码如下

<?php

namespace app\common\job;
use think\Db;
use think\Exception;
use think\Log;
use think\queue\Job;

class Test
{
    public function fire(Job $job, $data){
        try {
            //TODO
            Log::info('开始发送消息:' . json_encode($data));
            //1、给用户发送消息
            $flag = $this->insertMsg($data);

            if ($flag){
                //2、发送完成后 删除job
                $job->delete();
            }else{
                //任务轮询4次后删除
                if ($job->attempts() > 3) {
                    // 第1种处理方式:重新发布任务,该任务延迟10秒后再执行
                    //$job->release(10);
                    // 第2种处理方式:原任务的基础上1分钟执行一次并增加尝试次数
                    //$job->failed();
                    // 第3种处理方式:删除任务
                    $job->delete();
                }
            }



        } catch (Exception $e) {
            // 队列执行失败
            Log::error('发送消息队列执行失败:' . json_encode($data));
        }
    }

    // 消息队列执行失败后会自动执行该方法
    public function failed($data)
    {
        Log::error('消息队列达到最大重复执行次数后失败:' . json_encode($data));
    }

    public function insertMsg($data){
       $result = Db::name('msg')->insert([
            'user_id' => $data['user_id'],
            'msg' => $data['msg'],
            'time' => $data['time'],
        ]);
        return $result == 1;
    }
}

这样就定义完了消息队列
消息队列开启方式如下

php think queue:listen //监听 开发环境用
php think queue:work //只执行一次
nohup php think queue:work --daemon //守护进程,多次执行

指定消息队列任务 badianbokeSendMsg 实际队列任务名称

php think queue:listen --queue badianbokeSendMsg//监听 开发环境用
php think queue:work  --queue badianbokeSendMsg//只执行一次
nohup php think queue:work  --queue badianbokeSendMsg --daemon //守护进程,多次执行
Last modification:May 13, 2024
反正也没人会打赏