让我们一起爱米兰
站内搜搜:
移动设备
请扫描二维码
或访问
m.milan100.com
您所在的位置 -> 米兰百分百 -> redis -> php+redis实现任务队列

php+redis实现任务队列

点击数:1578 发表时间:2016-01-06 17:58:51 作者: 来源链接:
分享到:
分享到微信

    (1)任务队列,用于将业务逻辑中可以异步处理的操作放入队列中,在其他线程中处理后出队

    (2)队列中使用了分布式锁和其他逻辑,保证入队和出队的一致性

    (3)这个队列和普通队列不一样,入队时的id是用来区分重复入队的,队列里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求要求重复入队当做不用的任务,请使用不同的id区分

    先看入队的代码分析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,就是开始加锁,入队时我这里选择当前时间戳作为score,接着就是入队了,使用的是zset数据结构的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。

/**
 * 入队一个 Task
 * @param [type] $name 队列名称
 * @param [type] $id 任务id(或者其数组)
 * @param integer $timeout 入队超时时间(秒)
 * @param integer $afterInterval [description]
 * @return [type] [description]
 */
public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
    //合法性检测
    if (empty($name) || empty($id) || $timeout <= 0)
        return false;
    //加锁
    if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
        Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
        return false;
    }
    //入队时以当前时间戳作为 score
    $score = microtime(true) + $afterInterval;
    //入队
    foreach ((array) $id as $item) {
    //先判断下是否已经存在该id了
        if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
            $this->_redis->zset->add("Queue:$name", $score, $item);
        }
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");
    return true;
}

    接着来看一下出队的代码分析:出队一个Task,需要指定它的$id 和 $score,如果$score与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了,先使用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行对比,如果两者相等就进行出队操作,也就是使用zset里的delete()方法删掉该任务id,最后当前就是解锁了。这就是出队的代码分析。

/**
 * 出队一个Task,需要指定$id 和 $score
 * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
 *
 * @param [type] $name 队列名称
 * @param [type] $id 任务标识
 * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
 * @param integer $timeout 超时时间(秒)
 * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
 */
public function dequeue($name, $id, $score, $timeout = 10) {
    //合法性检测
    if (empty($name) || empty($id) || empty($score))
        return false;
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
        Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
        return false;
    }
    
    //出队
    //先取出redis的score
    $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
    $result = false;
    //先判断传进来的score和redis的score是否是一样
    if ($serverScore == $score) {
    //删掉该$id
        $result = (float) $this->_redis->zset->delete("Queue:$name", $id);
        if ($result == false) {
            Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
        }
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");
    return $result;
}

    学过数据结构这门课的朋友都应该知道,队列操作还有弹出顶部某个值的方法等等,这里处理入队出队操作,我还实现了 获取队列顶部若干个Task 并将其出队的方法,想了解的朋友可以看这段代码,假如看不太明白就留言,这里我不再对其进行分析了。

/**
 * 获取队列顶部若干个Task 并将其出队
 * @param [type] $name 队列名称
 * @param integer $count 数量
 * @param integer $timeout 超时时间
 * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
 */
public function pop($name, $count = 1, $timeout = 10) {
    //合法性检测
    if (empty($name) || $count <= 0)
        return [];
    //加锁
    if (!$this->_redis->lock->lock("Queue:$name")) {
        Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
        return false;
    }
    //取出若干的Task
    $result = [];
    $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
    //将其放在$result数组里 并 删除掉redis对应的id
    foreach ($array as $id => $score) {
        $result[] = ['id' => $id, 'score' => $score];
        $this->_redis->zset->delete("Queue:$name", $id);
    }
    //解锁
    $this->_redis->lock->unlock("Queue:$name");
    return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}

 以上就是用Redis实现任务队列的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:

/**
 * 任务队列
 *
 */
class RedisQueue {

    private $_redis;

    public function __construct($param = null) {
        $this->_redis = RedisFactory::get($param);
    }

    /**
     * 入队一个 Task
     * @param [type] $name 队列名称
     * @param [type] $id 任务id(或者其数组)
     * @param integer $timeout 入队超时时间(秒)
     * @param integer $afterInterval [description]
     * @return [type] [description]
     */
    public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
        //合法性检测
        if (empty($name) || empty($id) || $timeout <= 0)
            return false;
        
        //加锁
        if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
            Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
            return false;
        }
        
        //入队时以当前时间戳作为 score
        $score = microtime(true) + $afterInterval;
        
        //入队
        foreach ((array) $id as $item) {
            //先判断下是否已经存在该id了
            if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
                $this->_redis->zset->add("Queue:$name", $score, $item);
            }
        }
        
        //解锁
        $this->_redis->lock->unlock("Queue:$name");
        return true;
    }

    /**
     * 出队一个Task,需要指定$id 和 $score
     * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
     *
     * @param [type] $name 队列名称
     * @param [type] $id 任务标识
     * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
     * @param integer $timeout 超时时间(秒)
     * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
     */
    public function dequeue($name, $id, $score, $timeout = 10) {
        //合法性检测
        if (empty($name) || empty($id) || empty($score))
            return false;
        
        //加锁
        if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
            Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
            return false;
        }
        
        //出队
        //先取出redis的score
        $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
        $result = false;
        
        //先判断传进来的score和redis的score是否是一样
        if ($serverScore == $score) {
        //删掉该$id
            $result = (float) $this->_redis->zset->delete("Queue:$name", $id);
            if ($result == false) {
                Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
            }
        }
        
        //解锁
        $this->_redis->lock->unlock("Queue:$name");
        return $result;
    }

    /**
     * 获取队列顶部若干个Task 并将其出队
     * @param [type] $name 队列名称
     * @param integer $count 数量
     * @param integer $timeout 超时时间
     * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
     */
    public function pop($name, $count = 1, $timeout = 10) {
        //合法性检测
        if (empty($name) || $count <= 0)
            return [];
        
        //加锁
        if (!$this->_redis->lock->lock("Queue:$name")) {
            Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
            return false;
        }
        
        //取出若干的Task
        $result = [];
        $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
        
        //将其放在$result数组里 并 删除掉redis对应的id
        foreach ($array as $id => $score) {
            $result[] = ['id' => $id, 'score' => $score];
            $this->_redis->zset->delete("Queue:$name", $id);
        }
        
        //解锁
        $this->_redis->lock->unlock("Queue:$name");
        return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
    }

    /**
     * 获取队列顶部的若干个Task
     * @param [type] $name 队列名称
     * @param integer $count 数量
     * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
     */
    public function top($name, $count = 1) {
        //合法性检测
        if (empty($name) || $count < 1)
            return [];
        
        //取错若干个Task
        $result = [];
        $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
        
        //将Task存放在数组里
        foreach ($array as $id => $score) {
            $result[] = ['id' => $id, 'score' => $score];
        }
        
        //返回数组
        return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
    }

}


  到此,任务队列基本讲解完毕,你可以写一个shell脚本,让服务器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际应用结合起来去实现了,大家理解好这两大功能的实现思路即可。
  好了,本次总结和分享到此完毕。最后我附上任务队列完整类的代码:

/**
 * 任务队列
 */
class RedisQueue {

    private $_redis;

    public function __construct($param = null) {
        $this->_redis = RedisFactory::get($param);
    }

    /**
     * 入队一个 Task
     * @param [type] $name 队列名称
     * @param [type] $id 任务id(或者其数组)
     * @param integer $timeout 入队超时时间(秒)
     * @param integer $afterInterval [description]
     * @return [type] [description]
     */
    public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
//合法性检测
        if (empty($name) || empty($id) || $timeout <= 0)
            return false;
//加锁
        if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
            Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
            return false;
        }
//入队时以当前时间戳作为 score
        $score = microtime(true) + $afterInterval;
//入队
        foreach ((array) $id as $item) {
//先判断下是否已经存在该id了
            if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
                $this->_redis->zset->add("Queue:$name", $score, $item);
            }
        }
//解锁
        $this->_redis->lock->unlock("Queue:$name");
        return true;
    }

    /**
     * 出队一个Task,需要指定$id 和 $score
     * 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
     *
     * @param [type] $name 队列名称
     * @param [type] $id 任务标识
     * @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
     * @param integer $timeout 超时时间(秒)
     * @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
     */
    public function dequeue($name, $id, $score, $timeout = 10) {
//合法性检测
        if (empty($name) || empty($id) || empty($score))
            return false;
//加锁
        if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
            Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
            return false;
        }
//出队
//先取出redis的score
        $serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
        $result = false;
//先判断传进来的score和redis的score是否是一样
        if ($serverScore == $score) {
//删掉该$id
            $result = (float) $this->_redis->zset->delete("Queue:$name", $id);
            if ($result == false) {
                Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
            }
        }
//解锁
        $this->_redis->lock->unlock("Queue:$name");
        return $result;
    }

    /**
     * 获取队列顶部若干个Task 并将其出队
     * @param [type] $name 队列名称
     * @param integer $count 数量
     * @param integer $timeout 超时时间
     * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
     */
    public function pop($name, $count = 1, $timeout = 10) {
//合法性检测
        if (empty($name) || $count <= 0)
            return [];
//加锁
        if (!$this->_redis->lock->lock("Queue:$name")) {
            Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
            return false;
        }
//取出若干的Task
        $result = [];
        $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将其放在$result数组里 并 删除掉redis对应的id
        foreach ($array as $id => $score) {
            $result[] = ['id' => $id, 'score' => $score];
            $this->_redis->zset->delete("Queue:$name", $id);
        }
//解锁
        $this->_redis->lock->unlock("Queue:$name");
        return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
    }

    /**
     * 获取队列顶部的若干个Task
     * @param [type] $name 队列名称
     * @param integer $count 数量
     * @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
     */
    public function top($name, $count = 1) {
//合法性检测
        if (empty($name) || $count < 1)
            return [];
//取错若干个Task
        $result = [];
        $array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将Task存放在数组里
        foreach ($array as $id => $score) {
            $result[] = ['id' => $id, 'score' => $score];
        }
//返回数组
        return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
    }

}


16
很 好
1
一 般
5
差 劲
热门新闻
相关文章
上一篇: Redis Cluster架构优化
下一篇: redis持久化保存到磁盘与灾难恢复 rdb和aof的区别
评论区
匿名

返回首页 | 收藏本页 | 回到顶部
Copyright 2010. 米兰百分百 Powered By Bridge.
京ICP备15050557号