php-fpm,以及现在比较火的workman,swoole,其都是使用的多进程的管理模式,使用manger进程,管理其子进程,今天通过纯php做了一个类似的小东西,这样以后再做守护脚本的时候,可以使用这种方式,好处就是,多进程,更能利用多核的优势,并且可以做平滑重启。

本代码实现的功能,可以自定义子进程数,比如你定3个子进程,运行会产生1个manger进程,3个work进程,通过linux信号,来实现,进程的平滑重启。

运行效果:

  1. [zhangcunchao@localhost evente-member-client]$ php7 bin/timer/console.php queue
  2. 0
  3. 2017-09-08 17:59:54 work proccess start
  4. 2017-09-08 17:59:54 work proccess start
  5. 2017-09-08 17:59:54 work proccess start
  6. 2017-09-08 17:59:54 work proccess start
  7. 2017-09-08 17:59:54 work proccess start
  1. [zhangcunchao@localhost evente-member-service]$ ps -ef|grep queue
  2. 541 15340 7098 0 18:30 pts/14 00:00:00 development-member-queue-manger
  3. 541 15377 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
  4. 541 15378 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
  5. 541 15379 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
  6. 541 15380 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
  7. 541 15381 15340 0 18:30 pts/14 00:00:00 development-member-queue-work
  8. 541 15863 6992 0 18:30 pts/9 00:00:00 grep --color=auto queue

主要代码

使用了Symfony 做cli任务

  1. <?php
  2. namespace App\Console\Timer;
  3. use Symfony\Component\Console\Command\Command;
  4. use Symfony\Component\Console\Input\InputInterface;
  5. use Symfony\Component\Console\Output\OutputInterface;
  6. use Symfony\Component\Console\Input\InputArgument;
  7. use App\Console\Timer\QueueTools;
  8. /**
  9. * 多进程守护队列服务
  10. */
  11. class QueueCmd extends Command
  12. {
  13. const proccess_num = 5;//work子进程数
  14. protected $proccess_name = 'member-queue';//进程名前缀,此设置主要为监控脚本提供方便
  15. protected $open_debug = false;//开启debug,警告直接输出,不发邮件
  16. protected $_pid;
  17. protected $shm_key; //共享内存key `
  18. protected $shm_id;
  19. protected $max_job_limit= 1000; //单个子进程最多处理的任务数,达到此任务就重启此子进程;0 没有限制
  20. protected $NOW_DIR;
  21. protected $max_try_num = 3;//队列数据最大尝试次数//超过计入日志
  22. protected $time_out = 20;//子进程任务获取单次循环最大等待时间
  23. protected $email_title = 'php-timer告警';
  24. public function __construct()
  25. {
  26. $this->NOW_DIR = dirname(__FILE__).'/';
  27. $this->proccess_name = APP_ENV.'-'.$this->proccess_name;
  28. parent::__construct();
  29. }
  30. protected function configure()
  31. {
  32. $this->setName('queue');
  33. }
  34. protected function execute(InputInterface $input, OutputInterface $output)
  35. {
  36. error_reporting(E_ALL & ~E_NOTICE);
  37. //检查扩展
  38. if(!extension_loaded('pcntl')){
  39. $output->writeln("<error>需要安装pcntl扩展支持</error>");
  40. exit();
  41. }
  42. $shell = "ps w -C php|grep '{$this->proccess_name}-manger'|wc -l";
  43. $msg = system($shell);
  44. if($msg > 0)
  45. {
  46. $output->writeln("<error>{$this->proccess_name}-manger 任务已经存在</error>");
  47. exit();
  48. }
  49. if(self::proccess_num > 0){
  50. //启动进程
  51. for ($i=0; $i<self::proccess_num; $i++)
  52. {
  53. $this->parentProccess();
  54. }
  55. pcntl_signal(SIGUSR1, function(){
  56. $this->sig_handler(SIGUSR1);
  57. });
  58. // 程序结束(terminate)信号, 与SIGKILL不同的是该信号可以被阻塞和 处理. 通常用来要求程序自己正常退出. shell命令kill缺省产生这 个信号.
  59. pcntl_signal(SIGTERM, function(){
  60. $this->sig_handler(SIGTERM);
  61. });
  62. //终止进程 终端线路挂断
  63. pcntl_signal(SIGHUP, function(){
  64. $this->sig_handler(SIGHUP);
  65. });
  66. cli_set_process_title($this->proccess_name . '-manger');
  67. while (true) {
  68. // do something
  69. $pid = pcntl_waitpid(-1, $status, WUNTRACED | WNOHANG);
  70. if($pid>0){
  71. $s = pcntl_wifexited($status);
  72. //todo非正常退出报警
  73. $this->parentProccess($pid);
  74. }
  75. pcntl_signal_dispatch(); // 接收到信号时,调用注册的signalHandler()
  76. usleep(100000);
  77. }
  78. }else{
  79. $output->writeln("<error>proccess_num must > 0</error>");
  80. }
  81. }
  82. function sig_handler($signo){
  83. switch ($signo) {
  84. case SIGUSR1:
  85. foreach($this->_pid as $key){
  86. posix_kill($key,SIGUSR1);
  87. }
  88. echo date('Y-m-d H:i:s')." [manage] [SIGUSR1] all work reload\n";
  89. break;
  90. case SIGTERM:
  91. foreach($this->_pid as $key){
  92. posix_kill($key,SIGUSR1);
  93. }
  94. echo date('Y-m-d H:i:s')." [manage] [SIGTERM] stop\n";
  95. die();
  96. break;
  97. case SIGHUP:
  98. echo date('Y-m-d H:i:s')." [manage] [SIGHUP] linux clinet exit\n";
  99. break;
  100. }
  101. }
  102. function sig_san_handler($signo){
  103. switch ($signo) {
  104. case SIGUSR1:
  105. echo date('Y-m-d H:i:s')." [work] [SIGUSR1] work die\n";
  106. die();
  107. break;
  108. case SIGTERM:
  109. echo date('Y-m-d H:i:s')." [work] [SIGTERM] work die\n";
  110. die();
  111. break;
  112. case SIGHUP:
  113. echo date('Y-m-d H:i:s')." [work] [SIGHUP] linux clinet exit\n";
  114. break;
  115. }
  116. }
  117. //父进程
  118. public function parentProccess($id=''){
  119. if(!empty($id)){
  120. unset($this->_pid[$id]);
  121. }
  122. $pid = pcntl_fork();
  123. if($pid == -1)
  124. {
  125. return false;
  126. }
  127. else
  128. {
  129. if($pid)
  130. {
  131. #父进程获得子进程的pid,存入数组
  132. $this->_pid[$pid] = $pid;
  133. return true;
  134. }
  135. else
  136. {
  137. //开始发送,子进程执行完自己的任务后,退出。
  138. $this->sanProccess();
  139. exit;
  140. }
  141. }
  142. }
  143. //子进程
  144. public function sanProccess(){
  145. //信号绑定
  146. cli_set_process_title($this->proccess_name . '-work');
  147. echo date('Y-m-d H:i:s')." work proccess start\n";
  148. pcntl_signal(SIGUSR1, function(){
  149. $this->sig_san_handler(SIGUSR1);
  150. });
  151. pcntl_signal(SIGHUP, function(){
  152. $this->sig_san_handler(SIGHUP);
  153. });
  154. pcntl_signal(SIGTERM, function(){
  155. $this->sig_san_handler(SIGTERM);
  156. });
  157. $san_job_i = 0;
  158. while(1){
  159. //初始容器,避免循环影响
  160. $job_data = array();
  161. $job_key = '';
  162. $jobs = [];
  163. try{
  164. //echo '1'.PHP_EOL;//用于排查进程是否异步
  165. $jobs = QueueTools::pull_queue($this->time_out);
  166. if(!empty($jobs) && !empty($jobs['key']) && !empty($jobs['data'])){
  167. $job_data = $jobs['data'];
  168. $job_key = $jobs['key'];
  169. switch($job_key){
  170. case QueueTools::MEMBER_QUEUE_LIST['member_open'] :
  171. print_r($job_data);
  172. break;
  173. default:
  174. app()->log->addError('未定义的队列事件',$jobs);
  175. }
  176. }
  177. //print_r($this->cache->redis->get('wapwx_cookie'));
  178. }catch (\Exception $e){
  179. //统一重试,失败记录重试队列
  180. if(!empty($job_data)&&!empty($job_key)){
  181. if(isset($job_data['_job_error_num'])){
  182. $job_data['_job_error_num']=$job_data['_job_error_num']+1;
  183. }else{
  184. $job_data['_job_error_num'] = 1;
  185. }
  186. }
  187. //重试上限
  188. if($this->max_try_num <= $job_data['_job_error_num']){
  189. $mes = '本数据第'.$job_data['_job_error_num'].'次错误尝试'.PHP_EOL;
  190. $mes .= '数据已达处理次数上限,重试结束,记入日志'.PHP_EOL;
  191. unset($job_data['_job_error_num']);
  192. if($this->open_debug){
  193. print_r($mes);
  194. }else{
  195. app()->log->addError($this->email_title,[$mes,$jobs,$e->getCode(),$e->getMessage()]);
  196. }
  197. }else{
  198. //统一重入队列
  199. if(!empty($job_data)&&!empty($job_key)){
  200. QueueTools::push_queue($job_key,$job_data);
  201. }
  202. }
  203. }
  204. // sleep(3);
  205. //控制单个进程最大任务数
  206. if($this->max_job_limit){
  207. ++$san_job_i;
  208. //echo PHP_EOL;
  209. if($this->max_job_limit <= $san_job_i){
  210. echo date('Y-m-d H:i:s').' work job limit reload'.PHP_EOL;
  211. exit();
  212. }
  213. }
  214. pcntl_signal_dispatch();
  215. }
  216. }
  217. }

App\Console\Timer\QueueTools 代码

  1. <?php
  2. namespace App\Console\Timer;
  3. use App\Redis\Config\RedisKey;
  4. /**
  5. * 队列服务辅助工具
  6. */
  7. class QueueTools
  8. {
  9. const MEMBER_QUEUE_LIST = [
  10. 'member_open'=>'member_open', //会员激活任务
  11. ];
  12. //拉取队列任务
  13. static function pull_queue($time_out=20)
  14. {
  15. $jobs = app()->redis->blpop(RedisKey::MEMBER_QUEUE_KEY,$time_out);
  16. if(!empty($jobs) && !empty($jobs[1])){
  17. $data = json_decode($jobs[1],true);
  18. if(isset($data['key']) && !empty($data['key']) && isset($data['data']) && !empty($data['data'])){
  19. return $data;
  20. }else{
  21. return [];
  22. }
  23. }else{
  24. return [];
  25. }
  26. }
  27. //push队列任务
  28. static function push_queue($queueKey='',$data = [])
  29. {
  30. if(empty($data) || !is_array($data)){
  31. throw new \Exception('队列任务必须数组',10002);
  32. return false;
  33. }
  34. if(in_array($queueKey,self::MEMBER_QUEUE_LIST)){
  35. $data = [
  36. 'key' => $queueKey,
  37. 'data' => $data,
  38. ];
  39. return app()->redis->rpush(RedisKey::MEMBER_QUEUE_KEY,json_encode($data));
  40. }
  41. return false;
  42. }
  43. //会员激活任务
  44. static function open_member_job($orgId=0,$userId=0)
  45. {
  46. if(empty($orgId) || empty($userId)){
  47. return false;
  48. }
  49. $data = [
  50. 'orgId' => $orgId,
  51. 'userId' => $userId,
  52. ];
  53. return self::push_queue(self::MEMBER_QUEUE_LIST['member_open'],$data);
  54. }
  55. }