Skip to content

Instantly share code, notes, and snippets.

@krakjoe
Last active January 30, 2020 12:33
Show Gist options
  • Save krakjoe/9384409 to your computer and use it in GitHub Desktop.
Save krakjoe/9384409 to your computer and use it in GitHub Desktop.
Pooling

Easy pthreads Pools

The final solution !!

Since the first version of pthreads, PHP has had the ability to initialize Worker threads for users. Onto those Worker threads are stacked objects of class Stackable for execution concurrently.

The objects stacked onto workers do not have their reference counts changed, pthreads forces the user to maintain the reference counts in userland, for the extremely good reason that this enables the programmer to keep control of memory usage; and so, execute indefinitely.

This is the cause of much heartache for newcomers to pthreads; if you do not maintain references properly you will, definitely, experience segmentation faults.

To change the base implementation of Worker and or Stackable, would be to remove all power from the user.

Version 1.0.0 of pthreads comes with a Pool class, this Pool class manages references and implements an easy to use garbage collection mechanism for the user, it therefore eliminates the need to treat Stackables in a special manner, it should be idiot proof.

I'm not calling anyone an idiot !

The Pool Class

How it works

The Pool class is not thread safe; it therefore cannot be used from multiple contexts, since the underlying Worker class has restrictions that make this necessary.

Construction

How to make one!

The Pool constructor has the following prototype:

public function __construct($size, string $class, $ctor = []);
  • $size - the maimum number of Workers that can be started by this Pool.
  • $class - the name of a Worker class implemented by the programmer.
  • $ctor - an array of arguments to pass to the constructor of each Worker upon initialization.

The following code shows how to construct a Pool:

/**
 * An instance of this class shall be passed to Workers on initialization
 */
class WebLogger extends Stackable {
    public function log() {
        /* ... */
    }
}

class WebWorker extends Worker {

    public function __construct(\WebLogger $logger) {
        $this->logger = $logger;
    }
    
    public function run() {
        /* ... */
    }
    
    protected $logger;
}

$pool = new Pool(8, \WebWorker::class, [new \WebLogger()]);

Note: No threads are created at this time, this only initializes the Pool.

Execution

How to use one!

The submit method has the following prototype:

public function submit(Stackable $task)
  • $task - the Stackable to execute in a pooled Worker

The following code shows usage:

class WebWork extends Stackable {
    public function run() {
        /* ... */
    }
}

$pool->submit(new WebWork());

The next available Worker is selected (round robin), or constructed, $task is added to $pool->work and stacked onto the Worker selected.

Collection

How to keep one!

We live in a universe where are only two things are possibly inifinite, and much to the dismay of every shared hosting provider, ever; neither of those two things are RAM or CPU time.

With this in mind, you will periodically want to collect Stackables that are complete from the Pool, possibly execute some kind of final action on these objects and then allow the Pool to release the resources for that Stackable.

This is achieved by calling the collect method, which has the following prototype:

public function collect(Callable $collector)
  • $collector - a function to detect if a Stackable is done executing

$collector is called for each of the Stackable objects submitted for execution, it should accept a Stackable for an argument and return positively if the passed argument has finished executing.

$collector implicitly has the following prototype:

function (Stackable $task)

The following code show usage:

$pool->collect(function(WebWork $task){
   return $task->isComplete();
});

Growing and Shrinking

How to bend one!

The ability to grow and shrink the Pool, that is to say, increase or decrease the number of Workers available, allows the programmer much flexibility to adjust to the environment during execution.

The resize method has the following prototype:

public function resize($size)
  • $size - the new maximum number of Workers to allow

The following code shows usage:

$pool->resize(4);

At this time, if the new $size is smaller than the old, the last Workers started will be shutdown, the new maximum is then adjusted.

Shutting Down

How to stop one!

Shutting down causes all workers to be shutdown, and in so doing, execute all Stackables submitted.

The shutdown method has the following prototype:

public function shutdown()

The following code shows usage:

$pool->shutdown();

The Pool can no longer be used, however, the Stackables submitted are not released at this time.

Destruction

How to destroy one!

Destruction of the Pool will both shutdown all Workers and release all Stackables.

Destruction (__destruct) is invoked when the Pool goes out of scope, or no more references to it remain, just as a normal variable

unset explicitly invokes __destruct:

unset($pool);

Code Listing

U can haz codez!

The following code is a useless, but complete code listing, using the examples in this document:

<?php
class WebWork extends Stackable {

    public function __construct() {
        $this->complete = false;
    }

    public function run() {
        $this->worker->logger->log(
            "%s executing in Thread #%lu", 
            __CLASS__, $this->worker->getThreadId());
        usleep(100);
        $this->complete = true;
    }

    public function isComplete() {
        return $this->complete;
    }

    protected $complete;
}

class WebWorker extends Worker {

    public function __construct(WebLogger $logger) {
        $this->logger = $logger;
    }

    protected $logger;
}

class WebLogger extends Stackable {

    protected function log($message, $args = []) {
        $args = func_get_args();    

        if (($message = array_shift($args))) {
            echo vsprintf(
                "{$message}\n", $args);
        }
    }
}

$logger = new WebLogger();
$pool = new Pool(8, WebWorker::class, [$logger]);

while (@$i++<10)
    $pool->submit(new WebWork());

usleep(2000000);

$logger->log("Shrink !!");

$pool->resize(1);
$pool->collect(function(WebWork $task){
    return $task->isComplete();
});

while (@$j++<10)
    $pool->submit(new WebWork());

$pool->shutdown();	

Will output something like:

[joe@fiji pthreads]$ php-zts src/s.php
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130126518016
WebWork executing in Thread #140130117801728
WebWork executing in Thread #140130108819200
WebWork executing in Thread #140130099836672
WebWork executing in Thread #140129750480640
WebWork executing in Thread #140129742087936
WebWork executing in Thread #140129733695232
WebWork executing in Thread #140130126518016
WebWork executing in Thread #140130134910720
Shrink !!
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
WebWork executing in Thread #140130134910720
@JeremyHutchings
Copy link

We live in a universe where are only two things are possibly inifinite, and much to the dismay of every shared hosting provider, ever; neither of those two things are RAM or CPU time.

We live in a universe where there are only two things that are possibly infinite, and much to the dismay of every shared hosting provider, ever; neither of those two things are RAM or CPU time.

:)

@carbontwelve
Copy link

This looks similar to the pooling example from the repository; what is the difference between using return $task->isComplete(); here and return $work->isGarbage(); in the example? (and am I right in assuming they have the same function?)

@Steveb-p
Copy link

@carbontwelve isComplete() is a function implemented in PHP code for Pool specifically. Since pthreads 2.0.8 a special Collectable class exists which is a convenience class implementing isGarbage() and setGarbage() instead. So yes, they basically do the same.

@krakjoe, what is the optimal way to make sure that workers inside Pool have finished executing? I don't see any Pool's function which would do so? (Unless collect is supposed to do that somehow?)

@Shesharaopuli
Copy link

Hi Joe,

Thanks , your thought is really appreciated.After a long search about multithreading in PHP I came to this portion.
Could you please provide the steps for installing on windows( h'v already copied those files but Examples are not running saying Class 'Thread' not found ) and Linux as well.

please help on this.

Thanks,
Shesharao

@Omgan
Copy link

Omgan commented Dec 13, 2015

On Windows the installation is as follows:

Download the pthreads that matches your php version.
I found mine at: http://windows.php.net/downloads/pecl/releases/pthreads/
I was using PHP version 5.5 so downloaded latest version of PECL for 5.5 @ http://windows.php.net/downloads/pecl/releases/pthreads/2.0.9/

Extract the zip.
Move php_pthreads.dll to the php\ext\ directory.
Move pthreadVC2.dll to the php\ directory.
and alos in windows/system32

Open php\php.ini and add
extension=php_pthreads.dll
8 add this in all php.ini present in wamp installation folder.

you are done.

@mrwhy-orig
Copy link

Hi, thanks for this extension.
At the momement I have a little problem. May be, I'm missing something.
I try to create a thread in an object context. If I try the examples listed here, it works just fine, but not in my context.

I'm trying to achieve a parallel execution of requests in a socket server. So I want to start a thread for the exectution of lightweight but probably time consuming tasks (Timer).
The time I use threads in my code they behave like the normal procedural way and not like threads. Any Ideas?

Request -> Timer starts and echoing data to the console (20s)
Another Request -> Timer starts after the first Timer is ready.

I want to achive, that both Timer are running in parallel.

Thanks!
Björn

@ReSpawN
Copy link

ReSpawN commented Oct 25, 2016

@Steveb-p, dit you ever get an answer from Joe? He's been dealing with some personal stuff I guess.
I'm also in between documentations right now. A small portion of the 2.0 docs on PHP.net work fine but I also advised the stub.php inside examples.

@mohammedabualsoud
Copy link

Threading can only enabled on CLI script right?

@erlangparasu
Copy link

erlangparasu commented Jan 13, 2018

@krakjoe
How to use infinite $pool->submit($task) ?
May i use it inside while (true) { /* here */ } without call $pool->shutdown() ?

@sarkerzaman
Copy link

I have tried pthreads in Windows 10 with PHP version 7.0, 7.1.7, and 7.2.5 but not working. Please help, If anyone found it working with PHP 7.0+ in Windows 10.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment