php-fpm,以及现在比较火的workman,swoole,其都是使用的多进程的管理模式,使用manger进程,管理其子进程,今天通过纯php做了一个类似的小东西,这样以后再做守护脚本的时候,可以使用这种方式,好处就是,多进程,更能利用多核的优势,并且可以做平滑重启。
本代码实现的功能,可以自定义子进程数,比如你定3个子进程,运行会产生1个manger进程,3个work进程,通过linux信号,来实现,进程的平滑重启。
运行效果:
[zhangcunchao@localhost evente-member-client]$ php7 bin/timer/console.php queue02017-09-08 17:59:54 work proccess start2017-09-08 17:59:54 work proccess start2017-09-08 17:59:54 work proccess start2017-09-08 17:59:54 work proccess start2017-09-08 17:59:54 work proccess start
[zhangcunchao@localhost evente-member-service]$ ps -ef|grep queue541 15340 7098 0 18:30 pts/14 00:00:00 development-member-queue-manger541 15377 15340 0 18:30 pts/14 00:00:00 development-member-queue-work541 15378 15340 0 18:30 pts/14 00:00:00 development-member-queue-work541 15379 15340 0 18:30 pts/14 00:00:00 development-member-queue-work541 15380 15340 0 18:30 pts/14 00:00:00 development-member-queue-work541 15381 15340 0 18:30 pts/14 00:00:00 development-member-queue-work541 15863 6992 0 18:30 pts/9 00:00:00 grep --color=auto queue
主要代码
使用了Symfony 做cli任务
<?phpnamespace App\Console\Timer;use Symfony\Component\Console\Command\Command;use Symfony\Component\Console\Input\InputInterface;use Symfony\Component\Console\Output\OutputInterface;use Symfony\Component\Console\Input\InputArgument;use App\Console\Timer\QueueTools;/*** 多进程守护队列服务*/class QueueCmd extends Command{const proccess_num = 5;//work子进程数protected $proccess_name = 'member-queue';//进程名前缀,此设置主要为监控脚本提供方便protected $open_debug = false;//开启debug,警告直接输出,不发邮件protected $_pid;protected $shm_key; //共享内存key `protected $shm_id;protected $max_job_limit= 1000; //单个子进程最多处理的任务数,达到此任务就重启此子进程;0 没有限制protected $NOW_DIR;protected $max_try_num = 3;//队列数据最大尝试次数//超过计入日志protected $time_out = 20;//子进程任务获取单次循环最大等待时间protected $email_title = 'php-timer告警';public function __construct(){$this->NOW_DIR = dirname(__FILE__).'/';$this->proccess_name = APP_ENV.'-'.$this->proccess_name;parent::__construct();}protected function configure(){$this->setName('queue');}protected function execute(InputInterface $input, OutputInterface $output){error_reporting(E_ALL & ~E_NOTICE);//检查扩展if(!extension_loaded('pcntl')){$output->writeln("<error>需要安装pcntl扩展支持</error>");exit();}$shell = "ps w -C php|grep '{$this->proccess_name}-manger'|wc -l";$msg = system($shell);if($msg > 0){$output->writeln("<error>{$this->proccess_name}-manger 任务已经存在</error>");exit();}if(self::proccess_num > 0){//启动进程for ($i=0; $i<self::proccess_num; $i++){$this->parentProccess();}pcntl_signal(SIGUSR1, function(){$this->sig_handler(SIGUSR1);});// 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号.pcntl_signal(SIGTERM, function(){$this->sig_handler(SIGTERM);});//终止进程 终端线路挂断pcntl_signal(SIGHUP, function(){$this->sig_handler(SIGHUP);});cli_set_process_title($this->proccess_name . '-manger');while (true) {// do something$pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG);if($pid>0){$s = pcntl_wifexited($status);//todo非正常退出报警$this->parentProccess($pid);}pcntl_signal_dispatch(); // 接收到信号时,调用注册的signalHandler()usleep(100000);}}else{$output->writeln("<error>proccess_num must > 0</error>");}}function sig_handler($signo){switch ($signo) {case SIGUSR1:foreach($this->_pid as $key){posix_kill($key,SIGUSR1);}echo date('Y-m-d H:i:s')." [manage] [SIGUSR1] all work reload\n";break;case SIGTERM:foreach($this->_pid as $key){posix_kill($key,SIGUSR1);}echo date('Y-m-d H:i:s')." [manage] [SIGTERM] stop\n";die();break;case SIGHUP:echo date('Y-m-d H:i:s')." [manage] [SIGHUP] linux clinet exit\n";break;}}function sig_san_handler($signo){switch ($signo) {case SIGUSR1:echo date('Y-m-d H:i:s')." [work] [SIGUSR1] work die\n";die();break;case SIGTERM:echo date('Y-m-d H:i:s')." [work] [SIGTERM] work die\n";die();break;case SIGHUP:echo date('Y-m-d H:i:s')." [work] [SIGHUP] linux clinet exit\n";break;}}//父进程public function parentProccess($id=''){if(!empty($id)){unset($this->_pid[$id]);}$pid = pcntl_fork();if($pid == -1){return false;}else{if($pid){#父进程获得子进程的pid,存入数组$this->_pid[$pid] = $pid;return true;}else{//开始发送,子进程执行完自己的任务后,退出。$this->sanProccess();exit;}}}//子进程public function sanProccess(){//信号绑定cli_set_process_title($this->proccess_name . '-work');echo date('Y-m-d H:i:s')." work proccess start\n";pcntl_signal(SIGUSR1, function(){$this->sig_san_handler(SIGUSR1);});pcntl_signal(SIGHUP, function(){$this->sig_san_handler(SIGHUP);});pcntl_signal(SIGTERM, function(){$this->sig_san_handler(SIGTERM);});$san_job_i = 0;while(1){//初始容器,避免循环影响$job_data = array();$job_key = '';$jobs = [];try{//echo '1'.PHP_EOL;//用于排查进程是否异步$jobs = QueueTools::pull_queue($this->time_out);if(!empty($jobs) && !empty($jobs['key']) && !empty($jobs['data'])){$job_data = $jobs['data'];$job_key = $jobs['key'];switch($job_key){case QueueTools::MEMBER_QUEUE_LIST['member_open'] :print_r($job_data);break;default:app()->log->addError('未定义的队列事件',$jobs);}}//print_r($this->cache->redis->get('wapwx_cookie'));}catch (\Exception $e){//统一重试,失败记录重试队列if(!empty($job_data)&&!empty($job_key)){if(isset($job_data['_job_error_num'])){$job_data['_job_error_num']=$job_data['_job_error_num']+1;}else{$job_data['_job_error_num'] = 1;}}//重试上限if($this->max_try_num <= $job_data['_job_error_num']){$mes = '本数据第'.$job_data['_job_error_num'].'次错误尝试'.PHP_EOL;$mes .= '数据已达处理次数上限,重试结束,记入日志'.PHP_EOL;unset($job_data['_job_error_num']);if($this->open_debug){print_r($mes);}else{app()->log->addError($this->email_title,[$mes,$jobs,$e->getCode(),$e->getMessage()]);}}else{//统一重入队列if(!empty($job_data)&&!empty($job_key)){QueueTools::push_queue($job_key,$job_data);}}}// sleep(3);//控制单个进程最大任务数if($this->max_job_limit){++$san_job_i;//echo PHP_EOL;if($this->max_job_limit <= $san_job_i){echo date('Y-m-d H:i:s').' work job limit reload'.PHP_EOL;exit();}}pcntl_signal_dispatch();}}}
App\Console\Timer\QueueTools 代码
<?phpnamespace App\Console\Timer;use App\Redis\Config\RedisKey;/*** 队列服务辅助工具*/class QueueTools{const MEMBER_QUEUE_LIST = ['member_open'=>'member_open', //会员激活任务];//拉取队列任务static function pull_queue($time_out=20){$jobs = app()->redis->blpop(RedisKey::MEMBER_QUEUE_KEY,$time_out);if(!empty($jobs) && !empty($jobs[1])){$data = json_decode($jobs[1],true);if(isset($data['key']) && !empty($data['key']) && isset($data['data']) && !empty($data['data'])){return $data;}else{return [];}}else{return [];}}//push队列任务static function push_queue($queueKey='',$data = []){if(empty($data) || !is_array($data)){throw new \Exception('队列任务必须数组',10002);return false;}if(in_array($queueKey,self::MEMBER_QUEUE_LIST)){$data = ['key' => $queueKey,'data' => $data,];return app()->redis->rpush(RedisKey::MEMBER_QUEUE_KEY,json_encode($data));}return false;}//会员激活任务static function open_member_job($orgId=0,$userId=0){if(empty($orgId) || empty($userId)){return false;}$data = ['orgId' => $orgId,'userId' => $userId,];return self::push_queue(self::MEMBER_QUEUE_LIST['member_open'],$data);}}