魏长东

weichangdong

php swoole小试牛刀

一直觉得swoole很神奇,终于自己研究了一把,果然很牛逼。之前我实现队列,简单的就是while true死循环,一直查询数据库(mysql或者redis),如果有新数据就处理。稍微高级一点的就是,php fork子进程来处理,每处理完成一个就exit。

但是发现swoole实现起来,很简单。

<?php
declare(ticks = 1);
class wcd{
    private $maxProcesses = 10;
    private $child;
    private $masterRedis;
    private $redis_task_wing = 'wcd:test'; //待处理队列

    public function __construct(){
        // install signal handler for dead kids
        pcntl_signal(SIGCHLD, array($this, "sig_handler"));
        set_time_limit(0);
        ini_set('default_socket_timeout', -1); //队列处理不超时,解决redis报错:read error on connection
		$this->masterRedis = $this->redis_client();
    }

    private function redis_client(){
        $rds = new Redis();
        $rds->connect('10.103.1.162',6379);
		$rds->select(6);
        return $rds;
    }

    public function process(swoole_process $worker){// 第一个处理
        $GLOBALS['worker'] = $worker;
        swoole_event_add($worker->pipe, function($pipe) {
            $worker = $GLOBALS['worker'];
            $recv = $worker->read();            //send data to master

            sleep(rand(1, 3));
            echo "From Master: $recv\n";
            $worker->exit(0);
        });
        exit;
    }

    public function test(){
        for ($i = 0; $i < 100; $i++){
            $data = [
                'abc' => $i,
                'timestamp' => time().rand(100,999)
            ];
            $this->masterRedis->lpush($this->redis_task_wing, json_encode($data));
        }
        exit;
    }

    public function run(){
        while (1){
//            echo "\t now we de have $this->child child processes\n";
            if ($this->child < $this->maxProcesses){
                $rds = $this->redis_client();
                $data_pop = $rds->brpop($this->redis_task_wing, 3);//无任务时,阻塞等待
                if (!$data_pop){
                    continue;
                }
                echo "\t Starting new child | now we de have $this->child child processes\n";
                $this->child++;
                $process = new swoole_process([$this, 'process']);
                $process->write(json_encode($data_pop));
                $pid = $process->start();
            }
        }
    }

    private function sig_handler($signo) {
        echo "Recive: $signo \r\n";
        switch ($signo) {
            case SIGCHLD:
                while($ret = swoole_process::wait(false)) {
                    echo "PID={$ret['pid']}\n";
                    $this->child--;
                }
        }
    }
}
$cl = new wcd;
$cd->test();

执行结果,执行期间,我ps看了下是,是10个进程,应该是9个子进程。

From Master: ["wcd:test","{\"abc\":70,\"timestamp\":\"1475547206982\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
From Master: ["wcd:test","{\"abc\":72,\"timestamp\":\"1475547206509\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
From Master: ["wcd:test","{\"abc\":73,\"timestamp\":\"1475547206815\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
From Master: ["wcd:test","{\"abc\":76,\"timestamp\":\"1475547206812\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
Recive: 17 
PID=22454
	 Starting new child | now we de have 9 child processes
Recive: 17 
PID=22451
	 Starting new child | now we de have 9 child processes
Recive: 17 
PID=22450
	 Starting new child | now we de have 9 child processes
Recive: 17 
PID=22447
Recive: 17 
	 Starting new child | now we de have 9 child processes
From Master: ["wcd:test","{\"abc\":74,\"timestamp\":\"1475547206283\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
Recive: 17 
PID=22452
	 Starting new child | now we de have 9 child processes
From Master: ["wcd:test","{\"abc\":78,\"timestamp\":\"1475547206595\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
From Master: ["wcd:test","{\"abc\":79,\"timestamp\":\"1475547206459\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
From Master: ["wcd:test","{\"abc\":75,\"timestamp\":\"1475547206664\"}"]
PHP Notice:  Unknown: send of 6 bytes failed with errno=32 Broken pipe in Unknown on line 0
Recive: 17 
PID=22457
	 Starti