Exactly Once MessagingΒΆ
In the default implementation of the Postgres LISTEN/NOTIFY protocol,
multiple processes listening to the same channel will result in each process acting upon
each notification sent through that channel. This behaviour is often undesirable, so
pgpubsub offers users the option to define channels which allow one, and only one,
listening process to act upon each notification. We can achieve this simply by defining
lock_notifications = True on our channel object. This is the desired notification
processing behaviour for our AuthorTriggerChannel, where we want to create exactly one
Post whenever an Author row is inserted:
from dataclasses import dataclass
from pgpubsub.channel import TriggerChannel
from pgpubsub.tests.models import Author
@dataclass
class AuthorTriggerChannel(TriggerChannel):
model = Author
lock_notifications = True
Note
When we change the value of lock_notifications on a trigger based
channel, we must perform a migrate command after the change.
Enabling lock_notifications on a channel has the following effect:
Whenever a notification is sent through that channel (either via the
pgpubsub.notifyfunction or thepgpubsub.triggers.Notifytrigger), apgpubsub.models.Notificationobject is inserted into the database. This stored notification contains the same JSON payload as the transient Postgres notification. Note that since Postgres notify events are atomic with respect to their transaction, the notification is sent if and only if aNotificationis stored.When a process listening to that channel detects an incoming Postgres notification, it fetches and obtains a lock upon any stored
Notificationobject with the same payload. This is achieved as follows:notification = ( Notification.objects.select_for_update( skip_locked=True).filter( channel=self.notification.channel, payload=self.notification.payload, ).first() )
The fact that
select_for_updatein the above applies a lock onnotificationensures that no other process listening to the same channel can retrieve this notification object. Moreover, the use ofskip_locked=Truemeans that any process which cannot obtain the lock does not wait for the lock to release. This allows other processes to freely skip this notification and poll for others, whilst the one which did obtain the lock continues carries on to pass its notification into the listener callback. If the callback then successfully completes, the storedNotificationis removed from the database.