php-fpm,以及现在比较火的workman,swoole,其都是使用的多进程的管理模式,使用manger进程,管理其子进程,今天通过纯php做了一个类似的小东西,这样以后再做守护脚本的时候,可以使用这种方式,好处就是,多进程,更能利用多核的优势,并且可以做平滑重启。
本代码实现的功能,可以自定义子进程数,比如你定3个子进程,运行会产生1个manger进程,3个work进程,通过linux信号,来实现,进程的平滑重启。
运行效果:
[zhangcunchao@localhost evente-member-client]$ php7 bin/timer/console.php queue
0
2017-09-08 17:59:54 work proccess start
2017-09-08 17:59:54 work proccess start
2017-09-08 17:59:54 work proccess start
2017-09-08 17:59:54 work proccess start
2017-09-08 17:59:54 work proccess start
[zhangcunchao@localhost evente-member-service]$ ps -ef|grep queue
541 15340 7098 0 18:30 pts/14 00:00:00 development-member-queue-manger
541 15377 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
541 15378 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
541 15379 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
541 15380 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
541 15381 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
541 15863 6992 0 18:30 pts/9 00:00:00 grep --color=auto queue
主要代码
使用了Symfony 做cli任务
<?php
namespace App\Console\Timer;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use App\Console\Timer\QueueTools;
/**
* 多进程守护队列服务
*/
class QueueCmd extends Command
{
const proccess_num = 5;//work子进程数
protected $proccess_name = 'member-queue';//进程名前缀,此设置主要为监控脚本提供方便
protected $open_debug = false;//开启debug,警告直接输出,不发邮件
protected $_pid;
protected $shm_key; //共享内存key `
protected $shm_id;
protected $max_job_limit= 1000; //单个子进程最多处理的任务数,达到此任务就重启此子进程;0 没有限制
protected $NOW_DIR;
protected $max_try_num = 3;//队列数据最大尝试次数//超过计入日志
protected $time_out = 20;//子进程任务获取单次循环最大等待时间
protected $email_title = 'php-timer告警';
public function __construct()
{
$this->NOW_DIR = dirname(__FILE__).'/';
$this->proccess_name = APP_ENV.'-'.$this->proccess_name;
parent::__construct();
}
protected function configure()
{
$this->setName('queue');
}
protected function execute(InputInterface $input, OutputInterface $output)
{
error_reporting(E_ALL & ~E_NOTICE);
//检查扩展
if(!extension_loaded('pcntl')){
$output->writeln("<error>需要安装pcntl扩展支持</error>");
exit();
}
$shell = "ps w -C php|grep '{$this->proccess_name}-manger'|wc -l";
$msg = system($shell);
if($msg > 0)
{
$output->writeln("<error>{$this->proccess_name}-manger 任务已经存在</error>");
exit();
}
if(self::proccess_num > 0){
//启动进程
for ($i=0; $i<self::proccess_num; $i++)
{
$this->parentProccess();
}
pcntl_signal(SIGUSR1, function(){
$this->sig_handler(SIGUSR1);
});
// 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号.
pcntl_signal(SIGTERM, function(){
$this->sig_handler(SIGTERM);
});
//终止进程 终端线路挂断
pcntl_signal(SIGHUP, function(){
$this->sig_handler(SIGHUP);
});
cli_set_process_title($this->proccess_name . '-manger');
while (true) {
// do something
$pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG);
if($pid>0){
$s = pcntl_wifexited($status);
//todo非正常退出报警
$this->parentProccess($pid);
}
pcntl_signal_dispatch(); // 接收到信号时,调用注册的signalHandler()
usleep(100000);
}
}else{
$output->writeln("<error>proccess_num must > 0</error>");
}
}
function sig_handler($signo){
switch ($signo) {
case SIGUSR1:
foreach($this->_pid as $key){
posix_kill($key,SIGUSR1);
}
echo date('Y-m-d H:i:s')." [manage] [SIGUSR1] all work reload\n";
break;
case SIGTERM:
foreach($this->_pid as $key){
posix_kill($key,SIGUSR1);
}
echo date('Y-m-d H:i:s')." [manage] [SIGTERM] stop\n";
die();
break;
case SIGHUP:
echo date('Y-m-d H:i:s')." [manage] [SIGHUP] linux clinet exit\n";
break;
}
}
function sig_san_handler($signo){
switch ($signo) {
case SIGUSR1:
echo date('Y-m-d H:i:s')." [work] [SIGUSR1] work die\n";
die();
break;
case SIGTERM:
echo date('Y-m-d H:i:s')." [work] [SIGTERM] work die\n";
die();
break;
case SIGHUP:
echo date('Y-m-d H:i:s')." [work] [SIGHUP] linux clinet exit\n";
break;
}
}
//父进程
public function parentProccess($id=''){
if(!empty($id)){
unset($this->_pid[$id]);
}
$pid = pcntl_fork();
if($pid == -1)
{
return false;
}
else
{
if($pid)
{
#父进程获得子进程的pid,存入数组
$this->_pid[$pid] = $pid;
return true;
}
else
{
//开始发送,子进程执行完自己的任务后,退出。
$this->sanProccess();
exit;
}
}
}
//子进程
public function sanProccess(){
//信号绑定
cli_set_process_title($this->proccess_name . '-work');
echo date('Y-m-d H:i:s')." work proccess start\n";
pcntl_signal(SIGUSR1, function(){
$this->sig_san_handler(SIGUSR1);
});
pcntl_signal(SIGHUP, function(){
$this->sig_san_handler(SIGHUP);
});
pcntl_signal(SIGTERM, function(){
$this->sig_san_handler(SIGTERM);
});
$san_job_i = 0;
while(1){
//初始容器,避免循环影响
$job_data = array();
$job_key = '';
$jobs = [];
try{
//echo '1'.PHP_EOL;//用于排查进程是否异步
$jobs = QueueTools::pull_queue($this->time_out);
if(!empty($jobs) && !empty($jobs['key']) && !empty($jobs['data'])){
$job_data = $jobs['data'];
$job_key = $jobs['key'];
switch($job_key){
case QueueTools::MEMBER_QUEUE_LIST['member_open'] :
print_r($job_data);
break;
default:
app()->log->addError('未定义的队列事件',$jobs);
}
}
//print_r($this->cache->redis->get('wapwx_cookie'));
}catch (\Exception $e){
//统一重试,失败记录重试队列
if(!empty($job_data)&&!empty($job_key)){
if(isset($job_data['_job_error_num'])){
$job_data['_job_error_num']=$job_data['_job_error_num']+1;
}else{
$job_data['_job_error_num'] = 1;
}
}
//重试上限
if($this->max_try_num <= $job_data['_job_error_num']){
$mes = '本数据第'.$job_data['_job_error_num'].'次错误尝试'.PHP_EOL;
$mes .= '数据已达处理次数上限,重试结束,记入日志'.PHP_EOL;
unset($job_data['_job_error_num']);
if($this->open_debug){
print_r($mes);
}else{
app()->log->addError($this->email_title,[$mes,$jobs,$e->getCode(),$e->getMessage()]);
}
}else{
//统一重入队列
if(!empty($job_data)&&!empty($job_key)){
QueueTools::push_queue($job_key,$job_data);
}
}
}
// sleep(3);
//控制单个进程最大任务数
if($this->max_job_limit){
++$san_job_i;
//echo PHP_EOL;
if($this->max_job_limit <= $san_job_i){
echo date('Y-m-d H:i:s').' work job limit reload'.PHP_EOL;
exit();
}
}
pcntl_signal_dispatch();
}
}
}
App\Console\Timer\QueueTools 代码
<?php
namespace App\Console\Timer;
use App\Redis\Config\RedisKey;
/**
* 队列服务辅助工具
*/
class QueueTools
{
const MEMBER_QUEUE_LIST = [
'member_open'=>'member_open', //会员激活任务
];
//拉取队列任务
static function pull_queue($time_out=20)
{
$jobs = app()->redis->blpop(RedisKey::MEMBER_QUEUE_KEY,$time_out);
if(!empty($jobs) && !empty($jobs[1])){
$data = json_decode($jobs[1],true);
if(isset($data['key']) && !empty($data['key']) && isset($data['data']) && !empty($data['data'])){
return $data;
}else{
return [];
}
}else{
return [];
}
}
//push队列任务
static function push_queue($queueKey='',$data = [])
{
if(empty($data) || !is_array($data)){
throw new \Exception('队列任务必须数组',10002);
return false;
}
if(in_array($queueKey,self::MEMBER_QUEUE_LIST)){
$data = [
'key' => $queueKey,
'data' => $data,
];
return app()->redis->rpush(RedisKey::MEMBER_QUEUE_KEY,json_encode($data));
}
return false;
}
//会员激活任务
static function open_member_job($orgId=0,$userId=0)
{
if(empty($orgId) || empty($userId)){
return false;
}
$data = [
'orgId' => $orgId,
'userId' => $userId,
];
return self::push_queue(self::MEMBER_QUEUE_LIST['member_open'],$data);
}
}