(1)任务队列,用于将业务逻辑中可以异步处理的操作放入队列中,在其他线程中处理后出队
(2)队列中使用了分布式锁和其他逻辑,保证入队和出队的一致性
(3)这个队列和普通队列不一样,入队时的id是用来区分重复入队的,队列里面只会有一条记录,同一个id后入的覆盖前入的,而不是追加, 如果需求要求重复入队当做不用的任务,请使用不同的id区分
先看入队的代码分析:首先当然是对参数的合法性检测,接着就用到上面加锁机制的内容了,就是开始加锁,入队时我这里选择当前时间戳作为score,接着就是入队了,使用的是zset数据结构的add()方法,入队完成后,就对该任务解锁,即完成了一个入队的操作。
/**
* 入队一个 Task
* @param [type] $name 队列名称
* @param [type] $id 任务id(或者其数组)
* @param integer $timeout 入队超时时间(秒)
* @param integer $afterInterval [description]
* @return [type] [description]
*/
public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
//合法性检测
if (empty($name) || empty($id) || $timeout <= 0)
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
return false;
}
//入队时以当前时间戳作为 score
$score = microtime(true) + $afterInterval;
//入队
foreach ((array) $id as $item) {
//先判断下是否已经存在该id了
if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
$this->_redis->zset->add("Queue:$name", $score, $item);
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return true;
}接着来看一下出队的代码分析:出队一个Task,需要指定它的$id 和 $score,如果$score与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理。首先和对参数进行合法性检测,接着又用到加锁的功能了,然后及时出队了,先使用getScore()从Redis里获取到该id的score,然后将传入的$score和Redis里存储的score进行对比,如果两者相等就进行出队操作,也就是使用zset里的delete()方法删掉该任务id,最后当前就是解锁了。这就是出队的代码分析。
/**
* 出队一个Task,需要指定$id 和 $score
* 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
*
* @param [type] $name 队列名称
* @param [type] $id 任务标识
* @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
* @param integer $timeout 超时时间(秒)
* @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
*/
public function dequeue($name, $id, $score, $timeout = 10) {
//合法性检测
if (empty($name) || empty($id) || empty($score))
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
return false;
}
//出队
//先取出redis的score
$serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
$result = false;
//先判断传进来的score和redis的score是否是一样
if ($serverScore == $score) {
//删掉该$id
$result = (float) $this->_redis->zset->delete("Queue:$name", $id);
if ($result == false) {
Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $result;
}学过数据结构这门课的朋友都应该知道,队列操作还有弹出顶部某个值的方法等等,这里处理入队出队操作,我还实现了 获取队列顶部若干个Task 并将其出队的方法,想了解的朋友可以看这段代码,假如看不太明白就留言,这里我不再对其进行分析了。
/**
* 获取队列顶部若干个Task 并将其出队
* @param [type] $name 队列名称
* @param integer $count 数量
* @param integer $timeout 超时时间
* @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
*/
public function pop($name, $count = 1, $timeout = 10) {
//合法性检测
if (empty($name) || $count <= 0)
return [];
//加锁
if (!$this->_redis->lock->lock("Queue:$name")) {
Log::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
return false;
}
//取出若干的Task
$result = [];
$array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将其放在$result数组里 并 删除掉redis对应的id
foreach ($array as $id => $score) {
$result[] = ['id' => $id, 'score' => $score];
$this->_redis->zset->delete("Queue:$name", $id);
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}以上就是用Redis实现任务队列的整一套思路和代码实现的总结和分享,这里我附上正一个实现类的代码,代码里我基本上对每一行进行了注释,方便大家快速看懂并且能模拟应用。想要深入了解的请看整个类的代码:
/**
* 任务队列
*
*/
class RedisQueue {
private $_redis;
public function __construct($param = null) {
$this->_redis = RedisFactory::get($param);
}
/**
* 入队一个 Task
* @param [type] $name 队列名称
* @param [type] $id 任务id(或者其数组)
* @param integer $timeout 入队超时时间(秒)
* @param integer $afterInterval [description]
* @return [type] [description]
*/
public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
//合法性检测
if (empty($name) || empty($id) || $timeout <= 0)
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
return false;
}
//入队时以当前时间戳作为 score
$score = microtime(true) + $afterInterval;
//入队
foreach ((array) $id as $item) {
//先判断下是否已经存在该id了
if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
$this->_redis->zset->add("Queue:$name", $score, $item);
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return true;
}
/**
* 出队一个Task,需要指定$id 和 $score
* 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
*
* @param [type] $name 队列名称
* @param [type] $id 任务标识
* @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
* @param integer $timeout 超时时间(秒)
* @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
*/
public function dequeue($name, $id, $score, $timeout = 10) {
//合法性检测
if (empty($name) || empty($id) || empty($score))
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
return false;
}
//出队
//先取出redis的score
$serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
$result = false;
//先判断传进来的score和redis的score是否是一样
if ($serverScore == $score) {
//删掉该$id
$result = (float) $this->_redis->zset->delete("Queue:$name", $id);
if ($result == false) {
Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $result;
}
/**
* 获取队列顶部若干个Task 并将其出队
* @param [type] $name 队列名称
* @param integer $count 数量
* @param integer $timeout 超时时间
* @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
*/
public function pop($name, $count = 1, $timeout = 10) {
//合法性检测
if (empty($name) || $count <= 0)
return [];
//加锁
if (!$this->_redis->lock->lock("Queue:$name")) {
Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
return false;
}
//取出若干的Task
$result = [];
$array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将其放在$result数组里 并 删除掉redis对应的id
foreach ($array as $id => $score) {
$result[] = ['id' => $id, 'score' => $score];
$this->_redis->zset->delete("Queue:$name", $id);
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}
/**
* 获取队列顶部的若干个Task
* @param [type] $name 队列名称
* @param integer $count 数量
* @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
*/
public function top($name, $count = 1) {
//合法性检测
if (empty($name) || $count < 1)
return [];
//取错若干个Task
$result = [];
$array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将Task存放在数组里
foreach ($array as $id => $score) {
$result[] = ['id' => $id, 'score' => $score];
}
//返回数组
return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}
} 到此,任务队列基本讲解完毕,你可以写一个shell脚本,让服务器定时运行某些程序,实现入队出队等操作,这里我就不在将其与实际应用结合起来去实现了,大家理解好这两大功能的实现思路即可。
好了,本次总结和分享到此完毕。最后我附上任务队列完整类的代码:
/**
* 任务队列
*/
class RedisQueue {
private $_redis;
public function __construct($param = null) {
$this->_redis = RedisFactory::get($param);
}
/**
* 入队一个 Task
* @param [type] $name 队列名称
* @param [type] $id 任务id(或者其数组)
* @param integer $timeout 入队超时时间(秒)
* @param integer $afterInterval [description]
* @return [type] [description]
*/
public function enqueue($name, $id, $timeout = 10, $afterInterval = 0) {
//合法性检测
if (empty($name) || empty($id) || $timeout <= 0)
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:{$name}", $timeout)) {
Logger::get('queue')->error("enqueue faild becouse of lock failure: name = $name, id = $id");
return false;
}
//入队时以当前时间戳作为 score
$score = microtime(true) + $afterInterval;
//入队
foreach ((array) $id as $item) {
//先判断下是否已经存在该id了
if (false === $this->_redis->zset->getScore("Queue:$name", $item)) {
$this->_redis->zset->add("Queue:$name", $score, $item);
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return true;
}
/**
* 出队一个Task,需要指定$id 和 $score
* 如果$score 与队列中的匹配则出队,否则认为该Task已被重新入队过,当前操作按失败处理
*
* @param [type] $name 队列名称
* @param [type] $id 任务标识
* @param [type] $score 任务对应score,从队列中获取任务时会返回一个score,只有$score和队列中的值匹配时Task才会被出队
* @param integer $timeout 超时时间(秒)
* @return [type] Task是否成功,返回false可能是redis操作失败,也有可能是$score与队列中的值不匹配(这表示该Task自从获取到本地之后被其他线程入队过)
*/
public function dequeue($name, $id, $score, $timeout = 10) {
//合法性检测
if (empty($name) || empty($id) || empty($score))
return false;
//加锁
if (!$this->_redis->lock->lock("Queue:$name", $timeout)) {
Logger:get('queue')->error("dequeue faild becouse of lock lailure:name=$name, id = $id");
return false;
}
//出队
//先取出redis的score
$serverScore = $this->_redis->zset->getScore("Queue:$name", $id);
$result = false;
//先判断传进来的score和redis的score是否是一样
if ($serverScore == $score) {
//删掉该$id
$result = (float) $this->_redis->zset->delete("Queue:$name", $id);
if ($result == false) {
Logger::get('queue')->error("dequeue faild because of redis delete failure: name =$name, id = $id");
}
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $result;
}
/**
* 获取队列顶部若干个Task 并将其出队
* @param [type] $name 队列名称
* @param integer $count 数量
* @param integer $timeout 超时时间
* @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
*/
public function pop($name, $count = 1, $timeout = 10) {
//合法性检测
if (empty($name) || $count <= 0)
return [];
//加锁
if (!$this->_redis->lock->lock("Queue:$name")) {
Logger::get('queue')->error("pop faild because of pop failure: name = $name, count = $count");
return false;
}
//取出若干的Task
$result = [];
$array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将其放在$result数组里 并 删除掉redis对应的id
foreach ($array as $id => $score) {
$result[] = ['id' => $id, 'score' => $score];
$this->_redis->zset->delete("Queue:$name", $id);
}
//解锁
$this->_redis->lock->unlock("Queue:$name");
return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}
/**
* 获取队列顶部的若干个Task
* @param [type] $name 队列名称
* @param integer $count 数量
* @return [type] 返回数组[0=>['id'=> , 'score'=> ], 1=>['id'=> , 'score'=> ], 2=>['id'=> , 'score'=> ]]
*/
public function top($name, $count = 1) {
//合法性检测
if (empty($name) || $count < 1)
return [];
//取错若干个Task
$result = [];
$array = $this->_redis->zset->getByScore("Queue:$name", false, microtime(true), true, false, [0, $count]);
//将Task存放在数组里
foreach ($array as $id => $score) {
$result[] = ['id' => $id, 'score' => $score];
}
//返回数组
return $count == 1 ? (empty($result) ? false : $result[0]) : $result;
}
}
