Something Like Threading – PHP Process Forking and Interprocess Communication

7th June, 2007Insights

UPDATE Wednesday December 29 14:27 GMT+11: I’ve added another post that follows this up with a fix for the blocking behaviour of PHP sockets: Interprocess Communication in PHP with Non-Blocking Sockets which is intended to complement but not replace this original post.

I recently wrote a little application that dumps a file across a forwarded port. It was tricky, but very convenient because you can do things like report status of the upload process (I could also have done this by parsing the output of something like scp, but I kind of liked having direct access to the copy process/stats).

When I first wrote it, I didn’t know what I was doing and had never written socket code before. It was a big procedural mess. Naturally I was keen to separate out my socket class into it’s own package but this presented a problem: the controlling process needed to check the status but how could I decouple the process that instantiated the socket class from the socket code itself? I didn’t want to hard code the status reporting into the socket code and I wasn’t too hot on the idea of passing in some kind of status reporting ‘callback object’ to receive messages during the process.

I figured what I really needed was a new thread. I’m not sure if that’s exactly what a fork is, but it’s close enough and serves my purposes. You can fork in PHP using the pcntl_fork method:

http://php.net/pcntl_fork

When I first started out, I thought that this would be really easy. Think again fool! It did turn out to be quite easy, but just not obvious. For starters, I read this article on PHP forking (ed: the original article was located at http://www.van-steenbeek.net/?q=php_pcntl_fork which has been taken offline, although the author was good enough to organise an archive) and the man page at php.net.

As you can see from the PHP manual, in order to use the features in this article you will need to compile PHP with the following options:

--enable-pcntl
--enable-sockets

OO and Me

I’m a nut on object orientation. To me it makes loads of sense so I always write stuff with objects. The rest of this article uses objects but the issues I’m discussing apply equally well in a procedural arena.

The First Barrier – Copy On Write

When you fork a process, the script splits in two (like Lorraine in that episode of Astro Boy when she gets into the robot fighting league). As “Jeff” on the pcntl_fork man page describes (in the user contributed notes), this operation is not expensive because it uses a “copy-on-write” model. This means that, until each process writes to a variable, that variable is shared between the two processes. Great! Inexpensive.

But this causes some problems. Consider the following piece of code (which was my first attempt):

<?php
    class ForkMaster()
    {
        private $up_to;

        public function ForkMaster()
        {
            $this->up_to = 0;
        }

        public function countToOneThousand()
        {
            $pid = pcntl_fork();

            if($pid < 0)
                throw new Exception('No fork for you!');

            else if(!$pid)
            {
                //THIS IS THE CHILD PROCESS
                for($i = 1;$i <= 1000;$i++)
                    $this->up_to = $i;
            }

            else if($pid)
            {
                //THIS IS THE PARENT PROCESS
                while($this->up_to < 1000)
                    echo($this->up_to."\n");
            }
        }
    }

    $mast = new ForkMaster();
    $mast->countToOneThousand();

If you run that code with PHP on the command line, it will swamp your screen with 0’s. Oh if only it were that easy!!

See the problem is that as soon as the child process writes to the $this->up_to variable, it gets it’s own copy. So when the parent process checks if $this->up_to < 1000, it’s always checking it’s own copy, which is always 0 and never changes.

Interprocess Communication using Sockets

So it turns out there is a really convenient way to communicate between processes using the PHP socket_create_pair() function!

You can read about it here:

http://php.net/socket_create_pair

There’s even an example on there doing exactly what I was after! So after reading that I was sold. The only thing left to do was to put it into a nice, reusable object pattern. The code I came up with is below, and I’ll explain each part of it for you in detail:

<?php
    class Threader
    {
        private $pid;
        private $socket;
        private $up_to;
        private static $inst = NULL;
        const MESSAGE_LENGTH = 10;

        private function Threader()
        {
            $this->pid = FALSE;
            $this->socket  = NULL;
            $this->up_to  = 0;
        }

        public static function current()
        {
            if(self::$inst == NULL)
                self::$inst = new Threader();

            return self::$inst;
        }

        public function countToOneMillion()
        {
            $sockets = array();

            if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0,$sockets) === false)
                echo "socket_create_pair failed. Reason: ".socket_strerror(socket_last_error());

            $this->socket = $sockets[1];
            $this->pid = pcntl_fork();

            if($this->pid == -1)
                throw new Exception('Could not fork');

            else if(!$this->pid)
            {
                //THIS IS THE CHILD PROCESS
                if(socket_write($sockets[0],str_pad("RUNNING",
                                Threader::MESSAGE_LENGTH),
                                Threader::MESSAGE_LENGTH)
                                === false)
                    throw new Exception("socket_write() failed. Reason: ".
                              socket_strerror(socket_last_error($sockets[0])));

                for($i = 0;$i < 100000;$i++)
                {
                    if (socket_write($sockets[0], str_pad("UPTO $i\n", Threader::MESSAGE_LENGTH),
                        Threader::MESSAGE_LENGTH) === false)
                        throw new Exception("socket_write() failed. Reason: ".
                                  socket_strerror (socket_last_error($sockets[0])));
                }

                if(socket_write($sockets[0],str_pad("STOP", Threader::MESSAGE_LENGTH),
                   Threader::MESSAGE_LENGTH) === false)
                    throw new Exception("socket_write() failed.  Reason: ".
                              socket_strerror(socket_last_error($sockets[0])));

                socket_close($sockets[0]);
            }
        }

        public function pid()
        {
            return $this->pid;
        }

        public function running()
        {
            if($this->pid)
            {
                $ret = FALSE;
                //THIS IS THE PARENT PROCESS
                if(($data = socket_read($this->socket, Threader::MESSAGE_LENGTH, PHP_BINARY_READ))
                   !== false)
                {
                    $data = trim($data);

                    if(strpos($data,"RUNNING")!==FALSE)
                        $ret = true;

                    else if(strpos($data,"STOP")!==FALSE)
                    {
                        $ret = false;
                        socket_close($this->socket);
                    }

                    else
                    {
                        $this->up_to = str_replace('UPTO ','',
                                                   $data);
                        $ret = true;
                    }
                }

                return $ret;
            }
        }

        public function upTo()
        {
            if($this->pid)
                return $this->up_to;
        }
    }

    Threader::current()->countToOneMillion();

    if(Threader::current()->pid())
    {
        $old = 0;

        while($data = Threader::current()->running())
        {
            $up_to = Threader::current()->upTo();

            if(($up_to - $old)>1000)
            {
                echo($up_to."\n");
                $old = $up_to;
            }
        }

        echo("All done!\n");
    }

What??

Okay let’s take a look at the various parts of that code.

<?php
    class Threader
    {
        private $pid;
        private $socket;
        private $up_to;
        private static $inst = NULL;
        const MESSAGE_LENGTH = 10;

        private function Threader()
        {
            $this->pid = FALSE;
            $this->socket  = NULL;
            $this->up_to  = 0;
        }

So this is pretty standard. I’m defining a class called Threader. Interesting to note that, for reasons you’ll see later on, I’ve chosen to implement this class using the Singleton pattern, so I’ve got:

private static $inst = NULL;

The other thing to notice is that the constructor is private. That’s because in a Singleton pattern you never instantiate a class directly. You do it with a factory method like this:

public static function current()
{
    if(self::$inst == NULL)
        self::$inst = new Threader();

    return self::$inst;
}

This means that when you call:

$thread = Threader::current();

you will always get the same instance of the Threader object that is instantiated the first time you call it. Of course this does not hold true in the case of forked processes (I had hoped that it would but I wasn’t so lucky). Now we get down to the nitty gritty.

The countToOneMillion() method is the one that we want to fork because it’s going to take so long and we want to have status updates. So the first thing to do is create the sockets we’ll use for interprocess communication:

$sockets = array();

if (socket_create_pair(AF_UNIX, SOCK_STREAM, 0, $sockets) === false)
    echo "socket_create_pair failed. Reason: ".socket_strerror(socket_last_error());

$this->socket = $sockets[1];

This is straight off the socket_create_pair man page. The only tricky bit here is that we are taking the second of the two sockets we created and storing it in a member variable. Now comes the fork we’ve all been waiting for:

$this->pid = pcntl_fork();

if($this->pid == -1)
    throw new Exception('Could not fork');

else if(!$this->pid)
{
    //THIS IS THE CHILD PROCESS
    if(socket_write($sockets[0],str_pad("RUNNING", Threader::MESSAGE_LENGTH),
                    Threader::MESSAGE_LENGTH) === false)
        throw new Exception("socket_write() failed. Reason: ".
                             socket_strerror(socket_last_error($sockets[0])));

    for($i = 0;$i < 100000;$i++)
    {
        if (socket_write($sockets[0],str_pad("UPTO $i\n", Threader::MESSAGE_LENGTH),
            Threader::MESSAGE_LENGTH) === false)
            throw new Exception("socket_write() failed. Reason: ".
                                socket_strerror(socket_last_error($sockets[0])));
    }

    if(socket_write($sockets[0],str_pad("STOP", Threader::MESSAGE_LENGTH),
                    Threader::MESSAGE_LENGTH) === false)
        throw new Exception("socket_write() failed. Reason: ".
                            socket_strerror(socket_last_error($sockets[0])));

    socket_close($sockets[0]);
}

I won’t got too much into explaining the fork code because the PHP manual pages as well as the article I posted above do a good job of that. I’ll just draw your attention to two things: firstly when I fork, I assign the $pid returned to a member variable of the Threader instance. Luckily for me, this adds it to both the Threader instance in the child process and the parent instance.

This means that where the code checks if(!$this->pid) that code is only executed in the child process. This is where this example differs significantly from other examples in the manual and tutorial above. Because we set the $pid as a member variable, we are now free to go about our merry way. As the child process counts up to one million, it reports its status by writing to the first of the pair of sockets we created.

Curious Interlude

Although you can’t write to variables owned by other processes, all the resources are shared. This means that file descriptors, sockets and database connections are shared between the two processes. This is why we can share sockets between the two processes. There is an interesting article explaining this here:

http://www.hudzilla.org/phpbook/read.php/16_1_4

Back Into It …

So now we have a child process which is going to happily sit there and count to one million, and keep us up to do date by writing status messages (in a pre-determined format) to a socket. So we just have to listen to the other end of the socket! Here is the code that will do that for us:

public function running()
{
    if($this->pid)
    {
        $ret = FALSE;
        //THIS IS THE PARENT PROCESS
        if(($data = socket_read($this->socket, Threader::MESSAGE_LENGTH, PHP_BINARY_READ)) !== false)
        {
            $data = trim($data);

            if(strpos($data,"RUNNING")!==FALSE)
                $ret = true;

            else if(strpos($data,"STOP")!==FALSE)
            {
                $ret = false;
                socket_close($this->socket);
            }

            else
            {
                $this->up_to = str_replace('UPTO ','',$data);
                $ret = true;
            }
        }

        return $ret;
    }
}

The first thing it does is check that $this->pid evaluates to true with if($this->pid). As you’ll recall, because we assigned the $pid returned from pcntl_fork to a member variable of the Threader instance, we can now use that to check if we are in the parent or child process at any time. As it states in the PHP manual, a $pid > 0 means we are in the parent process. Convenient!!

We then call the socket_read function to get the data that was written using the socket_write function from our other process. One not so convenient part of this is that you have to provide a length for the messages, which I’ve included as a member constant of the Threader class. What this means is that, when the child process writes to the socket, we will only get one message at a time, and we won’t miss any messages.

I then just use a few simple if/else statements to check what the message was. If it was an “UP TO” message, which means that it was updating the status of the progress in counting up to one million, then I set the $up_to member variable on the object. Now …

The Code that Calls It

This is what I’ve been building up to the whole time, and chiefly why I just wasted a day doing this when I could have just as easily passed a callback object. Because I’ve used a Singleton pattern and because the pid is stored in that Singleton, the code inside the Threader can be completely decoupled from the code that calls it. So:

Threader::current()->countToOneMillion();

if(Threader::current()->pid())
{
    $old = 0;

    while(Threader::current()->running())
    {
        $up_to = Threader::current()->upTo();

        if(($up_to - $old)>1000)
        {
            echo($up_to."\n");
            $old = $up_to;
        }
    }

    echo("All done!\n");
}

First thing to note here is that I’m accessing the Threader class using it’s current() method rather than instantiating one using the ‘new’ keyword. As I mentioned above, this is because the Threader class implements a Singleton pattern. So when we check Threader::current()->pid(), we ensure that the code will only execute in the parent process. The child process is busy ticking away the whole time since we called countToOneMillion.

The real magic happens when we call:

if(Threader::running())

If you’ll remember, that’s where we read the status messages from the socket that are being written by the child process. This method also updates the $up_to variable in the Threader instance. We can then echo out what number we are up to.

Now something screwy happened here. I didn’t want to echo out a million numbers, so I put a sleep(30) call in so that the status would only update once every 30 seconds. However it just hung. Never went anywhere. If anyone knows why I’d love to hear it. So instead I just put in some code that only echo’s out the number it’s up to if it’s more than 1000 greater than the previous update.

Conclusion

The actual result above is identical to what could be achieved by simply echo’ing out the values in the countToOneMillion method. However the significant thing is that you can have a class which performs some time consuming task, then in the meantime you can do something else like send someone an email or SMS updating the progress.

The other significant thing is that the class performing the time consuming function doesn’t need to know anything about the environment in which it is being called. In terms of creating a reusable class that you can drop into any project this is highly desirable.

Happy forking!

Comments Archive

This post originally appeared on an older version of the Working Software website which implemented it’s own comment mechanism. This new version of our website/blog uses Disqus for comments (see below) however I wanted to preserve the comments made on the previous post here:

For the 30 sec issue: I would bet that PHP simple timed out. See here: http://www.php.net/set_time_limit

Peter Nagy (2008-06-01)

OOPS!! Don’t know how that got in there 🙂 fixed now. thanks for stopping by!

Iain Dooley (2007-06-11)

Hi. Thanks for the article with the link for my site. I’m sorry to see you’ve got the link wrong, though 🙂 Your link is: http://www.van-steenbeek.net/%EE%9B%8Fq=php_pcntl_fork And it should read: http://www.van-steenbeek.net/?q=php_pcntl_fork Again, thanks. Keep up the good work!

FST777 (2007-06-11)

Wow, good job, that’s some really interesting code, thanks for publishing.

dave (2007-06-08)

good to see you got it working, and even better that you wrote about it 😀 – ecoleman

Eric Coleman (2007-06-08)

Read More Posts

Can DialPad be used for SMS and MMS in Australia?

DailPad is a great VoIP app, but in Australia it's sadly lacking the ability to do SMS and MMS messages to mobile numbers outside the app. Perhaps because DialPad originated in the US, the need to build the SMS and MMS features for the Australian market has never made...

Can RingCentral be used for SMS and MMS in Australia?

RingCentral is a very sophisticated VoIP platform that you can use for your business telephony needs. In the US and Canada, it also supports SMS and MMS messaging directly from within the app. In Australia, however, this is not available. RingCentral users in...

Does 8×8 work for SMS and MMS in Australia?

8x8 is a provider of VoIP and messaging products that originated in the US. As such, its messaging solutions are very US centric and one of the features that hasn't made it to Australia is the ability to do SMS and MMS messages from the 8x8 app. 8x8 users are able to...

Free 12 week email course:

Don't distribute your office
How to make your remote team more effective than any in person team could ever be