Skip to content

Commit

Permalink
Nested transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed May 7, 2023
1 parent ef4fc6f commit 8a7d4ac
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 52 deletions.
16 changes: 13 additions & 3 deletions src/Internal/ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,11 @@ public function useDatabase(string $database): Future
});
}

/** @see 14.6.4 COM_QUERY */
/**
* @see 14.6.4 COM_QUERY
*
* @return Future<MysqlConnectionResult|MysqlCommandResult>
*/
public function query(string $query): Future
{
return $this->startCommand(function () use ($query): void {
Expand All @@ -334,7 +338,11 @@ public function query(string $query): Future
});
}

/** @see 14.7.4 COM_STMT_PREPARE */
/**
* @see 14.7.4 COM_STMT_PREPARE
*
* @return Future<MysqlConnectionStatement>
*/
public function prepare(string $query): Future
{
return $this->startCommand(function () use ($query): void {
Expand Down Expand Up @@ -738,7 +746,9 @@ private function parseOk(string $packet): void
private function handleOk(string $packet): void
{
$this->parseOk($packet);
$this->dequeueDeferred()->complete(new MysqlCommandResult($this->metadata->affectedRows, $this->metadata->insertId));
$this->dequeueDeferred()->complete(
new MysqlCommandResult($this->metadata->affectedRows, $this->metadata->insertId),
);
$this->ready();
}

Expand Down
22 changes: 22 additions & 0 deletions src/Internal/MysqlNestedTransaction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
<?php declare(strict_types=1);

namespace Amp\Mysql\Internal;

use Amp\Mysql\MysqlResult;
use Amp\Mysql\MysqlStatement;
use Amp\Mysql\MysqlTransaction;
use Amp\Sql\Common\NestedTransaction;

/**
* @internal
* @extends NestedTransaction<MysqlResult, MysqlStatement, MysqlTransaction>
*/
class MysqlNestedTransaction extends NestedTransaction implements MysqlTransaction
{
use MysqlTransactionDelegate;

protected function getTransaction(): MysqlTransaction
{
return $this->transaction;
}
}
37 changes: 3 additions & 34 deletions src/Internal/MysqlPooledTransaction.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,17 @@
use Amp\Mysql\MysqlStatement;
use Amp\Mysql\MysqlTransaction;
use Amp\Sql\Common\PooledTransaction;
use Amp\Sql\Result;
use Amp\Sql\Statement;

/**
* @internal
* @extends PooledTransaction<MysqlResult, MysqlStatement, MysqlTransaction>
*/
final class MysqlPooledTransaction extends PooledTransaction implements MysqlTransaction
{
protected function createStatement(Statement $statement, \Closure $release): MysqlStatement
{
\assert($statement instanceof MysqlStatement);
return new MysqlPooledStatement($statement, $release);
}

protected function createResult(Result $result, \Closure $release): MysqlResult
{
\assert($result instanceof MysqlResult);
return new MysqlPooledResult($result, $release);
}

/**
* Changes return type to this library's Result type.
*/
public function query(string $sql): MysqlResult
{
return parent::query($sql);
}

/**
* Changes return type to this library's Statement type.
*/
public function prepare(string $sql): MysqlStatement
{
return parent::prepare($sql);
}
use MysqlTransactionDelegate;

/**
* Changes return type to this library's Result type.
*/
public function execute(string $sql, array $params = []): MysqlResult
protected function getTransaction(): MysqlTransaction
{
return parent::execute($sql, $params);
return $this->transaction;
}
}
51 changes: 51 additions & 0 deletions src/Internal/MysqlTransactionDelegate.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php declare(strict_types=1);

namespace Amp\Mysql\Internal;

use Amp\Mysql\MysqlResult;
use Amp\Mysql\MysqlStatement;
use Amp\Mysql\MysqlTransaction;
use Amp\Sql\Result;
use Amp\Sql\Statement;

/** @internal */
trait MysqlTransactionDelegate
{
abstract protected function getTransaction(): MysqlTransaction;

protected function createStatement(Statement $statement, \Closure $release): MysqlStatement
{
\assert($statement instanceof MysqlStatement);
return new MysqlPooledStatement($statement, $release);
}

protected function createResult(Result $result, \Closure $release): MysqlResult
{
\assert($result instanceof MysqlResult);
return new MysqlPooledResult($result, $release);
}

/**
* Changes return type to this library's Result type.
*/
public function query(string $sql): MysqlResult
{
return parent::query($sql);
}

/**
* Changes return type to this library's Statement type.
*/
public function prepare(string $sql): MysqlStatement
{
return parent::prepare($sql);
}

/**
* Changes return type to this library's Result type.
*/
public function execute(string $sql, array $params = []): MysqlResult
{
return parent::execute($sql, $params);
}
}
55 changes: 55 additions & 0 deletions src/MysqlNestableTransaction.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php declare(strict_types=1);

namespace Amp\Mysql;

use Amp\Sql\Common\NestableTransaction;
use Amp\Sql\Transaction;
use Amp\Sql\TransactionIsolation;
use Amp\Sql\TransactionIsolationLevel;

/**
* @extends NestableTransaction<MysqlResult, MysqlStatement, MysqlTransaction>
*/
class MysqlNestableTransaction extends NestableTransaction implements MysqlLink
{
protected function createNestedTransaction(
Transaction $transaction,
\Closure $release,
string $identifier,
): Transaction {
return new Internal\MysqlNestedTransaction($transaction, $release, $identifier);
}

/**
* Changes return type to this library's Transaction type.
*/
public function beginTransaction(
TransactionIsolation $isolation = TransactionIsolationLevel::Committed
): MysqlTransaction {
return parent::beginTransaction($isolation);
}

/**
* Changes return type to this library's Result type.
*/
public function query(string $sql): MysqlResult
{
return parent::query($sql);
}

/**
* Changes return type to this library's Statement type.
*/
public function prepare(string $sql): MysqlStatement
{
return parent::prepare($sql);
}

/**
* Changes return type to this library's Result type.
*/
public function execute(string $sql, array $params = []): MysqlResult
{
return parent::execute($sql, $params);
}
}
3 changes: 1 addition & 2 deletions src/SocketMysqlConnection.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ private function __construct(Internal\ConnectionProcessor $processor)

$busy = &$this->busy;
$this->release = static function () use (&$busy): void {
\assert($busy instanceof DeferredFuture);
$busy->complete();
$busy?->complete();
$busy = null;
};
}
Expand Down
58 changes: 45 additions & 13 deletions test/LinkTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@

namespace Amp\Mysql\Test;

use Amp\Future;
use Amp\Mysql\MysqlColumnDefinition;
use Amp\Mysql\MysqlDataType;
use Amp\Mysql\MysqlLink;
use Amp\Mysql\MysqlResult;
use Amp\Sql\QueryError;
use Amp\Sql\Result;
use Amp\Sql\SqlException;
use function Amp\async;
use function Amp\delay;

abstract class LinkTest extends MysqlTestCase
{
Expand Down Expand Up @@ -113,7 +116,7 @@ public function testNextResultBeforeConsumption()
$result->getNextResult();
}

public function testQueryWithUnconsumedTupleResult()
public function testQueryWithUnconsumedTupleResult(): void
{
$db = $this->getLink();

Expand All @@ -128,7 +131,7 @@ public function testQueryWithUnconsumedTupleResult()
$this->assertInstanceOf(Result::class, $result);
}

public function testUnconsumedMultiResult()
public function testUnconsumedMultiResult(): void
{
$db = $this->getLink(true);

Expand All @@ -145,7 +148,34 @@ public function testUnconsumedMultiResult()
self::assertSame([['a' => 5, 'b' => 6]], $got);
}

public function testPrepared()
public function testSimultaneousQuery(): void
{
$db = $this->getLink(true);

$future1 = async(function () use ($db): void {
$result = $db->query("SELECT a FROM main");
$got = [];
foreach ($result as $row) {
$got[] = $row['a'];
delay(0.1);
}
self::assertSame(\range(1, \count($got)), $got);
});

$future2 = async(function () use ($db): void {
$result = $db->query("SELECT b FROM main");
$got = [];
foreach ($result as $row) {
$got[] = $row['b'];
delay(0.1);
}
self::assertSame(\range(2, \count($got) + 1), $got);
});

Future\await([$future1, $future2]);
}

public function testPrepared(): void
{
$db = $this->getLink(true);

Expand Down Expand Up @@ -231,7 +261,7 @@ public function testPrepared()
$this->assertInstanceOf(MysqlResult::class, $result);
}

public function testPrepareWithInvalidQuery()
public function testPrepareWithInvalidQuery(): void
{
$this->expectException(QueryError::class);
$this->expectExceptionMessage('You have an error in your SQL syntax');
Expand All @@ -243,7 +273,7 @@ public function testPrepareWithInvalidQuery()
$statement->execute(); // Some implementations do not throw until execute() is called.
}

public function testBindWithInvalidParamId()
public function testBindWithInvalidParamId(): void
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Parameter 1 is not defined for this prepared statement');
Expand All @@ -257,7 +287,7 @@ public function testBindWithInvalidParamId()
$statement->execute(); // Some implementations do not throw until execute() is called.
}

public function testBindWithInvalidParamName()
public function testBindWithInvalidParamName(): void
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Named parameter :b is not defined for this prepared statement');
Expand All @@ -271,7 +301,7 @@ public function testBindWithInvalidParamName()
$statement->execute(); // Some implementations do not throw until execute() is called.
}

public function testStatementExecuteWithTooFewParams()
public function testStatementExecuteWithTooFewParams(): void
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Parameter 1 missing for executing prepared statement');
Expand All @@ -282,7 +312,7 @@ public function testStatementExecuteWithTooFewParams()
$stmt->execute([1]);
}

public function testExecute()
public function testExecute(): void
{
$db = $this->getLink();

Expand All @@ -303,7 +333,7 @@ public function testExecute()
$this->assertInstanceOf(MysqlResult::class, $result);
}

public function testExecuteWithInvalidQuery()
public function testExecuteWithInvalidQuery(): void
{
$this->expectException(QueryError::class);
$this->expectExceptionMessage('You have an error in your SQL syntax');
Expand All @@ -315,7 +345,7 @@ public function testExecuteWithInvalidQuery()
$db->close();
}

public function testExecuteWithTooFewParams()
public function testExecuteWithTooFewParams(): void
{
$this->expectException(\Error::class);
$this->expectExceptionMessage('Parameter 1 missing for executing prepared statement');
Expand All @@ -327,7 +357,7 @@ public function testExecuteWithTooFewParams()
$db->close();
}

public function testPreparedWithNegativeValue()
public function testPreparedWithNegativeValue(): void
{
$db = $this->getLink();

Expand All @@ -345,7 +375,7 @@ public function testPreparedWithNegativeValue()
$db->close();
}

public function testTransaction()
public function testTransaction(): void
{
$db = $this->getLink();

Expand All @@ -356,6 +386,8 @@ public function testTransaction()
$this->assertInstanceOf(MysqlResult::class, $result);
$this->assertGreaterThan(5, $result->getLastInsertId());

$statement->close();

$result = $transaction->query("SELECT * FROM main WHERE a = 6");

$got = [];
Expand Down Expand Up @@ -390,7 +422,7 @@ public function testTransaction()
/**
* @depends testTransaction
*/
public function testInsertSelect()
public function testInsertSelect(): void
{
$db = $this->getLink();

Expand Down
Loading

0 comments on commit 8a7d4ac

Please sign in to comment.