Skip to content

Commit

Permalink
Merge pull request #6 from mcg-web/abstract-promise
Browse files Browse the repository at this point in the history
Abstract Promise
  • Loading branch information
mcg-web authored Nov 15, 2016
2 parents a82efd7 + 28306ac commit 8f6b811
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 128 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ cache:
before_install:
- if [[ "$TRAVIS_PHP_VERSION" != "5.6" && "$TRAVIS_PHP_VERSION" != "hhvm" ]]; then phpenv config-rm xdebug.ini || true; fi
- composer selfupdate
- composer require "guzzlehttp/promises"

install: composer update --prefer-dist --no-interaction

Expand Down
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data sources such as databases or web services via batching and caching.

## Requirements

This library require [React/Promise](https://github.com/reactphp/promise) and PHP >= 5.5 to works.
This library require PHP >= 5.5 to works.

## Getting Started

Expand All @@ -31,8 +31,9 @@ Create loaders by providing a batch loading instance.
use Overblog\DataLoader\DataLoader;

$myBatchGetUsers = function ($keys) { /* ... */ };
$promiseFactory = new MyPromiseFactory();

$userLoader = new DataLoader($myBatchGetUsers);
$userLoader = new DataLoader($myBatchGetUsers, $promiseFactory);
```

A batch loading callable / callback accepts an Array of keys, and returns a Promise which
Expand Down Expand Up @@ -122,11 +123,12 @@ Each `DataLoaderPHP` instance contains a unique memoized cache. Use caution when
used in long-lived applications or those which serve many users with different
access permissions and consider creating a new instance per web request.

##### `new DataLoader(callable $batchLoadFn [, Option $options])`
##### `new DataLoader(callable $batchLoadFn, PromiseFactoryInterface $promiseFactory [, Option $options])`

Create a new `DataLoaderPHP` given a batch loading instance and options.

- *$batchLoadFn*: A callable / callback which accepts an Array of keys, and returns a Promise which resolves to an Array of values.
- *$promiseFactory*: Any object that implements `McGWeb\PromiseFactory\PromiseFactoryInterface`. (see [McGWeb/Promise-Factory](https://github.com/mcg-web/promise-factory))
- *$options*: An optional object of options:

- *batch*: Default `true`. Set to `false` to disable batching, instead
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
"name": "overblog/dataloader-php",
"type": "library",
"license": "MIT",
"description": "DataLoaderPhp is a generic utility to be used as part of your application's data fetching layer to provide a simplified and consistent API over various remote data sources such as databases or web services via batching and caching.",
"keywords": ["dataLoader", "caching", "batching"],
"autoload": {
"psr-4": {
"Overblog\\DataLoader\\": "src/"
Expand All @@ -14,7 +16,7 @@
},
"require": {
"php": "^5.5|^7.0",
"react/promise": "^2.4"
"mcg-web/promise-factory": "^0.2"
},
"require-dev": {
"phpunit/phpunit": "^4.1|^5.1"
Expand Down
129 changes: 60 additions & 69 deletions src/DataLoader.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

namespace Overblog\DataLoader;

use React\Promise\Promise;
use McGWeb\PromiseFactory\PromiseFactoryInterface;

class DataLoader
{
Expand All @@ -31,7 +31,7 @@ class DataLoader
private $promiseCache;

/**
* @var Promise[]
* @var array
*/
private $queue = [];

Expand All @@ -40,9 +40,15 @@ class DataLoader
*/
private static $instances = [];

public function __construct(callable $batchLoadFn, Option $options = null)
/**
* @var PromiseFactoryInterface
*/
private $promiseFactory;

public function __construct(callable $batchLoadFn, PromiseFactoryInterface $promiseFactory, Option $options = null)
{
$this->batchLoadFn = $batchLoadFn;
$this->promiseFactory = $promiseFactory;
$this->options = $options ?: new Option();
$this->promiseCache = $this->options->getCacheMap();
self::$instances[] = $this;
Expand All @@ -53,7 +59,7 @@ public function __construct(callable $batchLoadFn, Option $options = null)
*
* @param string $key
*
* @return Promise
* @return mixed return a Promise
*/
public function load($key)
{
Expand All @@ -70,33 +76,34 @@ public function load($key)
return $cachedPromise;
}
}
$promise = null;

// Otherwise, produce a new Promise for this value.
$promise = new Promise(
function ($resolve, $reject) use (&$promise, $key, $shouldBatch) {
$this->queue[] = [
'key' => $key,
'resolve' => $resolve,
'reject' => $reject,
'promise' => &$promise,
];

// Determine if a dispatch of this queue should be scheduled.
// A single dispatch should be scheduled per queue at the time when the
// queue changes from "empty" to "full".
if (count($this->queue) === 1) {
if (!$shouldBatch) {
// Otherwise dispatch the (queue of one) immediately.
$this->dispatchQueue();
}
}
},
function (callable $resolve, callable $reject) {
$promise = $this->getPromiseFactory()->create(
$resolve,
$reject,
function () {
// Cancel/abort any running operations like network connections, streams etc.

$reject(new \RuntimeException('DataLoader destroyed before promise complete.'));
});
throw new \RuntimeException('DataLoader destroyed before promise complete.');
}
);

$this->queue[] = [
'key' => $key,
'resolve' => $resolve,
'reject' => $reject,
'promise' => $promise,
];

// Determine if a dispatch of this queue should be scheduled.
// A single dispatch should be scheduled per queue at the time when the
// queue changes from "empty" to "full".
if (count($this->queue) === 1) {
if (!$shouldBatch) {
// Otherwise dispatch the (queue of one) immediately.
$this->dispatchQueue();
}
}
// If caching, cache this promise.
if ($shouldCache) {
$this->promiseCache->set($cacheKey, $promise);
Expand All @@ -118,14 +125,14 @@ function (callable $resolve, callable $reject) {
* ]);
* @param array $keys
*
* @return Promise
* @return mixed return a Promise
*/
public function loadMany($keys)
{
if (!is_array($keys) && !$keys instanceof \Traversable) {
throw new \InvalidArgumentException(sprintf('The "%s" method must be called with Array<key> but got: %s.', __METHOD__, gettype($keys)));
}
return \React\Promise\all(array_map(
return $this->getPromiseFactory()->createAll(array_map(
function ($key) {
return $this->load($key);
},
Expand Down Expand Up @@ -178,7 +185,7 @@ public function prime($key, $value)
if (!$this->promiseCache->has($cacheKey)) {
// Cache a rejected promise if the value is an Error, in order to match
// the behavior of load(key).
$promise = $value instanceof \Exception ? \React\Promise\reject($value) : \React\Promise\resolve($value);
$promise = $value instanceof \Exception ? $this->getPromiseFactory()->createReject($value) : $this->getPromiseFactory()->createResolve($value);

$this->promiseCache->set($cacheKey, $promise);
}
Expand All @@ -191,13 +198,12 @@ public function __destruct()
if ($this->needProcess()) {
foreach ($this->queue as $data) {
try {
/** @var Promise $promise */
$promise = $data['promise'];
$promise->cancel();
$this->getPromiseFactory()->cancel($data['promise']);
} catch (\Exception $e) {
// no need to do nothing if cancel failed
}
}
$this->await();
}
foreach (self::$instances as $i => $instance) {
if ($this !== $instance) {
Expand All @@ -215,10 +221,16 @@ protected function needProcess()
protected function process()
{
if ($this->needProcess()) {
$this->getPromiseFactory()->await();
$this->dispatchQueue();
}
}

protected function getPromiseFactory()
{
return $this->promiseFactory;
}

/**
* @param $promise
* @param bool $unwrap controls whether or not the value of the promise is returned for a fulfilled promise or if an exception is thrown if the promise is rejected
Expand All @@ -227,48 +239,28 @@ protected function process()
*/
public static function await($promise = null, $unwrap = true)
{
self::awaitInstances();

if (null === $promise) {
return null;
}
$resolvedValue = null;
$exception = null;

if (!is_callable([$promise, 'then'])) {
throw new \InvalidArgumentException(sprintf('The "%s" method must be called with a Promise ("then" method).', __METHOD__));
}

$promise->then(function ($values) use (&$resolvedValue) {
$resolvedValue = $values;
}, function ($reason) use (&$exception) {
$exception = $reason;
});
if ($exception instanceof \Exception) {
if (!$unwrap) {
return $exception;
}
throw $exception;
if (empty(self::$instances)) {
throw new \RuntimeException('Found no active DataLoader instance.');
}
self::awaitInstances();

return $resolvedValue;
return self::$instances[0]->getPromiseFactory()->await($promise, $unwrap);
}

private static function awaitInstances()
{
$dataLoaders = self::$instances;
if (!empty($dataLoaders)) {
$wait = true;

while ($wait) {
foreach ($dataLoaders as $dataLoader) {
if (!$dataLoader || !$dataLoader->needProcess()) {
$wait = false;
continue;
}
$wait = true;
$dataLoader->process();

$wait = true;

while ($wait) {
foreach ($dataLoaders as $dataLoader) {
if (!$dataLoader || !$dataLoader->needProcess()) {
$wait = false;
continue;
}
$wait = true;
$dataLoader->process();
}
}
}
Expand Down Expand Up @@ -322,7 +314,6 @@ private function dispatchQueueBatch(array $queue)

// Call the provided batchLoadFn for this loader with the loader queue's keys.
$batchLoadFn = $this->batchLoadFn;
/** @var Promise $batchPromise */
$batchPromise = $batchLoadFn($keys);

// Assert the expected response from batchLoadFn
Expand Down Expand Up @@ -374,7 +365,7 @@ function ($values) use ($keys, $queue) {
/**
* Do not cache individual loads if the entire batch dispatch fails,
* but still reject each request so they do not hang.
* @param Promise[] $queue
* @param array $queue
* @param \Exception $error
*/
private function failedDispatch($queue, \Exception $error)
Expand Down
25 changes: 19 additions & 6 deletions tests/AbuseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

use Overblog\DataLoader\DataLoader;

class AbuseTest extends \PHPUnit_Framework_TestCase
class AbuseTest extends TestCase
{
/**
* @group provides-descriptive-error-messages-for-api-abuse
Expand Down Expand Up @@ -75,7 +75,7 @@ public function testBatchFunctionMustReturnAPromiseNotAValue()
public function testBatchFunctionMustReturnAPromiseOfAnArrayNotNull()
{
DataLoader::await(self::idLoader(function () {
return \React\Promise\resolve();
return self::$promiseFactory->createResolve(null);
})->load(1));
}

Expand All @@ -87,20 +87,33 @@ public function testBatchFunctionMustReturnAPromiseOfAnArrayNotNull()
public function testBatchFunctionMustPromiseAnArrayOfCorrectLength()
{
DataLoader::await(self::idLoader(function () {
return \React\Promise\resolve([]);
return self::$promiseFactory->createResolve([]);
})->load(1));
}

/**
* @group provides-descriptive-error-messages-for-api-abuse
* @expectedException \InvalidArgumentException
* @expectedExceptionMessage The "Overblog\DataLoader\DataLoader::await" method must be called with a Promise ("then" method).
* @expectedExceptionMessage ::await" method must be called with a Promise ("then" method).
* @runInSeparateProcess
*/
public function testAwaitPromiseMustHaveAThenMethod()
{
self::idLoader();
DataLoader::await([]);
}

/**
* @group provides-descriptive-error-messages-for-api-abuse
* @expectedException \RuntimeException
* @expectedExceptionMessage Found no active DataLoader instance.
* @runInSeparateProcess
*/
public function testAwaitWithoutNoInstance()
{
DataLoader::await();
}

/**
* @param callable $batchLoadFn
* @return DataLoader
Expand All @@ -109,10 +122,10 @@ private static function idLoader(callable $batchLoadFn = null)
{
if (null === $batchLoadFn) {
$batchLoadFn = function ($keys) {
return \React\Promise\all($keys);
return self::$promiseFactory->createAll($keys);
};
}

return new DataLoader($batchLoadFn);
return new DataLoader($batchLoadFn, self::$promiseFactory);
}
}
Loading

0 comments on commit 8f6b811

Please sign in to comment.