Skip to content

Commit

Permalink
corrections, added Thread Channels, example/test
Browse files Browse the repository at this point in the history
- some testing under PHPunit works best with `@runInSeparateProcess`
- skip channel test under PHPunit long stall before segfault, no segfault under Windows though
- update `Thread` class to properly handle multi arguments, docblock comment about cancel
- these additions in reference to amphp/ext-uv#46
  • Loading branch information
TheTechsTech committed Mar 19, 2023
1 parent 6296fc4 commit d5784ce
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 8 deletions.
41 changes: 41 additions & 0 deletions Threads/TChannel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Async\Threads;

/**
* @codeCoverageIgnore
*/
final class TChannel
{
/** @var array<resource,resource> */
protected $resource = [];

public function __destruct()
{
\fclose($this->resource[1]);
\fclose($this->resource[0]);
$this->resource = null;
}

public function __construct()
{
$this->resource = \stream_socket_pair((\stripos(\PHP_OS, "win") === 0 ? \STREAM_PF_INET : \STREAM_PF_UNIX),
\STREAM_SOCK_STREAM,
\STREAM_IPPROTO_IP
);
}

public function send(string $message)
{
$result = \fwrite($this->resource[1], $message . "\n", \strlen($message . "\n"));
\usleep(1);
return $result;
}

public function recv()
{
return \trim(\fgets($this->resource[0]));
}
}
5 changes: 2 additions & 3 deletions Threads/TWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@

use Async\Threads\Thread;

/**
* @codeCoverageIgnore
*/
final class TWorker
{
protected ?Thread $threads = null;
Expand All @@ -28,6 +25,8 @@ public function __construct(Thread $thread, $tid)

/**
* This method will sends a cancellation request to the thread.
* - WILL SKIP `then` callback handlers, _immediately_ execute `catch` handlers.
* - WILL NOT stop a thread execution, `uv_cancel` not implemented.
*
* @return void
*/
Expand Down
9 changes: 8 additions & 1 deletion Threads/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,12 @@ public function create($tid, callable $task, ...$args): ?TWorker
$lock = \uv_mutex_init();
$isCancelled = false;
try {
$result = $task(...$args);
$arguments = \reset($args);
if (!\is_array($arguments))
$result = $task($arguments);
else
$result = $task(...$arguments);

if ($thread->isCancelled($tid)) {
$isCancelled = true;
\uv_mutex_lock($lock);
Expand Down Expand Up @@ -198,6 +203,8 @@ public function create($tid, callable $task, ...$args): ?TWorker

/**
* This method will sends a cancellation request to the thread.
* - WILL SKIP `then` callback handlers, _immediately_ execute `catch` handlers.
* - WILL NOT stop a thread execution, `uv_cancel` not implemented.
*
* @param string|int $tid Thread ID
* @return void
Expand Down
34 changes: 34 additions & 0 deletions examples/mutli_thread_channels.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

include 'vendor/autoload.php';

use Async\Threads\Thread;

$thread = new Thread();
[$_read, $_write] = \stream_socket_pair((\stripos(\PHP_OS, "win") === 0 ? \STREAM_PF_INET : \STREAM_PF_UNIX),
\STREAM_SOCK_STREAM,
\STREAM_IPPROTO_IP
);

$thread->create_ex(function ($write) {
echo "[queue1] ";
$result = \fwrite($write, "Thread 1\n");
\usleep(1);
return $result;
}, $_write)->then(function (int $output) {
print "Thread 1 returned: " . $output . PHP_EOL;
})->catch(function (\Throwable $e) {
print $e->getMessage() . PHP_EOL;
});

$t2 = $thread->create_ex(function ($read) {
echo "[queue2] ";
echo "Thread 2 Got " . \fgets($read);
return 'finish';
}, $_read)->then(function (string $output) {
print "Thread 2 returned: " . $output . PHP_EOL;
})->catch(function (\Throwable $exception) {
print $exception->getMessage() . PHP_EOL;
});

$thread->join();
8 changes: 4 additions & 4 deletions tests/ThreadTestMulti.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace TQueue\Tests;
namespace Async\Tests;

use Async\Threads\Thread;
use PHPUnit\Framework\TestCase;
Expand All @@ -13,10 +13,11 @@ protected function setUp(): void
$this->markTestSkipped('Test skipped "uv_loop_new" and "PHP ZTS" missing. currently buggy - zend_mm_heap corrupted');
}

/**
* @runInSeparateProcess
*/
public function testIt_can_handle_multi()
{
if ('\\' === \DIRECTORY_SEPARATOR)
$this->markTestSkipped('buggy');
$thread = new Thread();
$counter = 0;
$t5 = $thread->create_ex(function () {
Expand All @@ -29,7 +30,6 @@ public function testIt_can_handle_multi()
});

$t6 = $thread->create_ex(function () {
usleep(50);
return 4;
})->then(function (int $result) use (&$counter) {
$counter += $result;
Expand Down
45 changes: 45 additions & 0 deletions tests/ThreadTestMultiChannel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Async\Tests;

use Async\Threads\Thread;
use Async\Threads\TChannel;
use PHPUnit\Framework\TestCase;

class ThreadTestMultiChannel extends TestCase
{
protected function setUp(): void
{
if (!\ZEND_THREAD_SAFE && !\function_exists('uv_loop_new'))
$this->markTestSkipped('Test skipped "uv_loop_new" and "PHP ZTS" missing. currently buggy - zend_mm_heap corrupted');
}

/**
* @runInSeparateProcess
*/
public function testIt_can_handle_multi_channel()
{
$this->markTestSkipped('buggy, long stall before segmentation fault');

$thread = new Thread();
$channel = new TChannel();

$thread->create_ex(function ($write) {
return $write->send('Thread 1');
}, $channel)->then(function (int $result) {
$this->assertEquals(9, $result);
})->catch(function (\Throwable $e) {
print $e->getMessage() . PHP_EOL;
});

$t2 = $thread->create_ex(function ($read) {
return $read->recv();
}, $channel)->then(function (string $result) {
$this->assertEquals('Thread 1', $result);
})->catch(function (\Throwable $exception) {
print $exception->getMessage() . PHP_EOL;
});

$thread->join();
}
}

0 comments on commit d5784ce

Please sign in to comment.