Skip to content

Commit 36f1893

Browse files
committed
Async event dispatcher pkg.
0 parents  commit 36f1893

23 files changed

+1316
-0
lines changed

.gitignore

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
*~
2+
/composer.lock
3+
/composer.phar
4+
/phpunit.xml
5+
/vendor/
6+
/.idea/

.travis.yml

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
sudo: false
2+
3+
git:
4+
depth: 1
5+
6+
language: php
7+
8+
php:
9+
- '5.6'
10+
- '7.0'
11+
12+
cache:
13+
directories:
14+
- $HOME/.composer/cache
15+
16+
install:
17+
- composer self-update
18+
- composer install --prefer-source --ignore-platform-reqs
19+
20+
script:
21+
- vendor/bin/phpunit --exclude-group=functional

AsyncListener.php

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher;
4+
5+
use Enqueue\Client\Message;
6+
use Enqueue\Client\ProducerInterface;
7+
use Symfony\Component\EventDispatcher\Event;
8+
9+
class AsyncListener
10+
{
11+
/**
12+
* @var ProducerInterface
13+
*/
14+
private $producer;
15+
16+
/**
17+
* @var Registry
18+
*/
19+
private $registry;
20+
21+
/**
22+
* @var bool
23+
*/
24+
private $syncMode;
25+
26+
/**
27+
* @param ProducerInterface $producer
28+
* @param Registry $registry
29+
*/
30+
public function __construct(ProducerInterface $producer, Registry $registry)
31+
{
32+
$this->producer = $producer;
33+
$this->registry = $registry;
34+
}
35+
36+
public function resetSyncMode()
37+
{
38+
$this->syncMode = [];
39+
}
40+
41+
/**
42+
* @param string $eventName
43+
*/
44+
public function syncMode($eventName)
45+
{
46+
$this->syncMode[$eventName] = true;
47+
}
48+
49+
/**
50+
* @param Event $event
51+
* @param string $eventName
52+
*/
53+
public function onEvent(Event $event = null, $eventName)
54+
{
55+
if (false == isset($this->syncMode[$eventName])) {
56+
$transformerName = $this->registry->getTransformerNameForEvent($eventName);
57+
58+
$message = $this->registry->getTransformer($transformerName)->toMessage($eventName, $event);
59+
$message->setScope(Message::SCOPE_APP);
60+
$message->setProperty('event_name', $eventName);
61+
$message->setProperty('transformer_name', $transformerName);
62+
63+
$this->producer->sendEvent('event.'.$eventName, $message);
64+
}
65+
}
66+
}

AsyncProcessor.php

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher;
4+
5+
use Enqueue\Consumption\Result;
6+
use Enqueue\Psr\PsrContext;
7+
use Enqueue\Psr\PsrMessage;
8+
use Enqueue\Psr\PsrProcessor;
9+
10+
class AsyncProcessor implements PsrProcessor
11+
{
12+
/**
13+
* @var Registry
14+
*/
15+
private $registry;
16+
17+
/**
18+
* @var ProxyEventDispatcher
19+
*/
20+
private $eventDispatcher;
21+
22+
/**
23+
* @param Registry $registry
24+
* @param ProxyEventDispatcher $eventDispatcher
25+
*/
26+
public function __construct(Registry $registry, ProxyEventDispatcher $eventDispatcher)
27+
{
28+
$this->registry = $registry;
29+
$this->eventDispatcher = $eventDispatcher;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function process(PsrMessage $message, PsrContext $context)
36+
{
37+
if (false == $eventName = $message->getProperty('event_name')) {
38+
return Result::reject('The message is missing "event_name" property');
39+
}
40+
if (false == $transformerName = $message->getProperty('transformer_name')) {
41+
return Result::reject('The message is missing "transformer_name" property');
42+
}
43+
44+
$event = $this->registry->getTransformer($transformerName)->toEvent($eventName, $message);
45+
46+
$this->eventDispatcher->dispatchAsyncListenersOnly($eventName, $event);
47+
48+
return self::ACK;
49+
}
50+
}

ContainerAwareRegistry.php

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher;
4+
5+
use Symfony\Component\DependencyInjection\ContainerAwareInterface;
6+
use Symfony\Component\DependencyInjection\ContainerAwareTrait;
7+
8+
class ContainerAwareRegistry implements Registry, ContainerAwareInterface
9+
{
10+
use ContainerAwareTrait;
11+
12+
/**
13+
* @var string[]
14+
*/
15+
private $eventsMap;
16+
17+
/**
18+
* @var string[]
19+
*/
20+
private $transformersMap;
21+
22+
/**
23+
* @param string[] $eventsMap [eventName => transformerName]
24+
* @param string[] $transformersMap [transformerName => transformerServiceId]
25+
*/
26+
public function __construct(array $eventsMap, array $transformersMap)
27+
{
28+
$this->eventsMap = $eventsMap;
29+
$this->transformersMap = $transformersMap;
30+
}
31+
32+
/**
33+
* {@inheritdoc}
34+
*/
35+
public function getTransformerNameForEvent($eventName)
36+
{
37+
$transformerName = null;
38+
if (array_key_exists($eventName, $this->eventsMap)) {
39+
$transformerName = $this->eventsMap[$eventName];
40+
} else {
41+
foreach ($this->eventsMap as $eventNamePattern => $name) {
42+
if ('/' != $eventNamePattern[0]) {
43+
continue;
44+
}
45+
46+
if (preg_match($eventNamePattern, $eventName)) {
47+
$transformerName = $name;
48+
49+
break;
50+
}
51+
}
52+
}
53+
54+
if (empty($transformerName)) {
55+
throw new \LogicException(sprintf('There is no transformer registered for the given event %s', $eventName));
56+
}
57+
58+
return $transformerName;
59+
}
60+
61+
/**
62+
* {@inheritdoc}
63+
*/
64+
public function getTransformer($name)
65+
{
66+
if (false == array_key_exists($name, $this->transformersMap)) {
67+
throw new \LogicException(sprintf('There is no transformer named %s', $name));
68+
}
69+
70+
$transformer = $this->container->get($this->transformersMap[$name]);
71+
72+
if (false == $transformer instanceof EventTransformer) {
73+
throw new \LogicException(sprintf(
74+
'The container must return instance of %s but got %s',
75+
EventTransformer::class,
76+
is_object($transformer) ? get_class($transformer) : gettype($transformer)
77+
));
78+
}
79+
80+
return $transformer;
81+
}
82+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher\DependencyInjection;
4+
5+
use Enqueue\AsyncEventDispatcher\OldProxyEventDispatcher;
6+
use Symfony\Component\Config\FileLocator;
7+
use Symfony\Component\DependencyInjection\ContainerBuilder;
8+
use Symfony\Component\DependencyInjection\Definition;
9+
use Symfony\Component\DependencyInjection\Extension\Extension;
10+
use Symfony\Component\DependencyInjection\Loader\YamlFileLoader;
11+
use Symfony\Component\DependencyInjection\Reference;
12+
use Symfony\Component\HttpKernel\Kernel;
13+
14+
class AsyncEventDispatcherExtension extends Extension
15+
{
16+
/**
17+
* {@inheritdoc}
18+
*/
19+
public function load(array $configs, ContainerBuilder $container)
20+
{
21+
$loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config'));
22+
$loader->load('services.yml');
23+
24+
if (version_compare(Kernel::VERSION, '3.3', '<')) {
25+
$container->setDefinition('enqueue.events.async_processor', new Definition(OldProxyEventDispatcher::class, [
26+
new Reference('service_container'),
27+
new Reference('enqueue.events.registry'),
28+
new Reference('enqueue.events.event_dispatcher'),
29+
]));
30+
}
31+
}
32+
}
+99
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
<?php
2+
3+
namespace Enqueue\AsyncEventDispatcher\DependencyInjection;
4+
5+
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
6+
use Symfony\Component\DependencyInjection\ContainerBuilder;
7+
use Symfony\Component\EventDispatcher\DependencyInjection\RegisterListenersPass;
8+
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
9+
10+
class AsyncEventsPass implements CompilerPassInterface
11+
{
12+
/**
13+
* {@inheritdoc}
14+
*/
15+
public function process(ContainerBuilder $container)
16+
{
17+
if (false == $container->hasDefinition('enqueue.events.async_listener')) {
18+
return;
19+
}
20+
21+
if (false == $container->hasDefinition('enqueue.events.registry')) {
22+
return;
23+
}
24+
25+
$registeredToEvent = [];
26+
foreach ($container->findTaggedServiceIds('kernel.event_listener') as $serviceId => $tagAttributes) {
27+
foreach ($tagAttributes as $tagAttribute) {
28+
if (false == isset($tagAttribute['async'])) {
29+
continue;
30+
}
31+
32+
$event = $tagAttribute['event'];
33+
34+
$service = $container->getDefinition($serviceId);
35+
36+
$service->clearTag('kernel.event_listener');
37+
$service->addTag('enqueue.async_event_listener', $tagAttribute);
38+
39+
if (false == isset($registeredToEvent[$event])) {
40+
$container->getDefinition('enqueue.events.async_listener')
41+
->addTag('kernel.event_listener', [
42+
'event' => $event,
43+
'method' => 'onEvent',
44+
])
45+
;
46+
47+
$container->getDefinition('enqueue.events.async_processor')
48+
->addTag('enqueue.client.processor', [
49+
'topicName' => 'event.'.$event,
50+
])
51+
;
52+
53+
$registeredToEvent[$event] = true;
54+
}
55+
}
56+
}
57+
58+
foreach ($container->findTaggedServiceIds('kernel.event_subscriber') as $serviceId => $tagAttributes) {
59+
foreach ($tagAttributes as $tagAttribute) {
60+
if (false == isset($tagAttribute['async'])) {
61+
continue;
62+
}
63+
64+
$service = $container->getDefinition($serviceId);
65+
$service->clearTag('kernel.event_subscriber');
66+
$service->addTag('enqueue.async_event_subscriber', $tagAttribute);
67+
68+
/** @var EventSubscriberInterface $serviceClass */
69+
$serviceClass = $service->getClass();
70+
71+
foreach ($serviceClass::getSubscribedEvents() as $event => $data) {
72+
if (false == isset($registeredToEvent[$event])) {
73+
$container->getDefinition('enqueue.events.async_listener')
74+
->addTag('kernel.event_listener', [
75+
'event' => $event,
76+
'method' => 'onEvent',
77+
])
78+
;
79+
80+
$container->getDefinition('enqueue.events.async_processor')
81+
->addTag('enqueue.client.processor', [
82+
'topicName' => 'event.'.$event,
83+
])
84+
;
85+
86+
$registeredToEvent[$event] = true;
87+
}
88+
}
89+
}
90+
}
91+
92+
$registerListenersPass = new RegisterListenersPass(
93+
'enqueue.events.event_dispatcher',
94+
'enqueue.async_event_listener',
95+
'enqueue.async_event_subscriber'
96+
);
97+
$registerListenersPass->process($container);
98+
}
99+
}

0 commit comments

Comments
 (0)