-
Notifications
You must be signed in to change notification settings - Fork 158
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add support for streaming replication protocol #880
base: master
Are you sure you want to change the base?
add support for streaming replication protocol #880
Conversation
Codecov Report
@@ Coverage Diff @@
## master #880 +/- ##
==========================================
- Coverage 93.32% 93.22% -0.11%
==========================================
Files 12 12
Lines 1574 1771 +197
Branches 187 214 +27
==========================================
+ Hits 1469 1651 +182
- Misses 73 77 +4
- Partials 32 43 +11
Continue to review full report at Codecov.
|
@Pliner The current approach of using a session-scoped fixture leads to extensive thrashing of container startups and teardowns, which results in some test runs exceeding the current 15 minutes time limit per CI job. The streaming replication protocol tests had to be parameterized to avoid a lot of boilerplate code - duplicating each test for physical and logical replication cursors. The question is:
|
@asvetlov @jettify |
@pianoyeg94 Sorry for the late reply. I've increased timeout from 15 to 20 minutes f543f8c#diff-b803fcb7f17ed9235f1e5cb1fcd2f5d3b2838429d4368ae4c57ce4436577f03fR26. |
@Pliner |
👍 Also I hope to have capacity to review next week. |
@Pliner |
@Pliner |
@Pliner All test runs were successful, except one flaky test on python 3.8 "test_execute_timeout" (this is an old test, and it just failed by coincidence). Also, currently, codecov's settings require me to write tests that are useless and enforce an antipattern of over-testing things.
P.S: |
What do these changes do?
Add support for logical and physical streaming replication protocol.
This feature has been extensively battle-tested in
a production environment during the last four months
without any failures or throughput issues.
It's currently used by one of our worker services running under Uvloop,
implementing the "Transactional Outbox/Transaction log tailing" pattern
to provide guaranteed delivery of events/messages to RabbitMQ.
This feature also gives us the ability to reliably/atomically update business entities
within our database and publish events related to those entities within a single transaction.
This helps with maintaining data consistency across a distributed system.
Unfortunately, RabbitMQ doesn't have plugins that provide such
functionality, unlike, for example, Kafka with its Debezium plugin.
And this is not the only scenario in which the streaming replication protocol can be used.
For example, just off the top of my head, this can be used to continuously
index data from Postgres into an Elasticsearch cluster and such.
Due to the nature of this feature, some additional configurations had to be dynamically applied to Postgres container setups
during unit-testing:
a) Configuring Postgres through command-line arguments and mounting a pg_hba.conf file to support logical and
physical replication during testing;
b) Mounting of pre-built logical decoding plugins (one 'wal2json.so' for each version of Postgres).
If this feature is going to be considered for merging,
I would be happy to extend my pull request by writing some example scripts
and adding them to the "examples" directory.
Plus, fully document the feature within Sphinx.
Are there changes in behavior for the user?
If a user doesn't require this new functionality, then nothing changes for him.
The public API of aiopg remains fully backward-compatible.
Also, internally, new code is structured in such a way that it behaves almost like a plugin -
only a small part of the current codebase had to be adjusted
(just a few lines of code in the Connection's
_cursor()
factory method).On the other hand, if a user would like to take advantage of this new functionality,
all he or she has to do is pass in psycopg2's
LogicalReplicationConnection/PhysicalReplicationConnection type
to aiopg's
connect()
function as aconnection_factory
parameter.After this, the user gets access to the whole method set provided by
psycopg2 replication objects,
plus an additional
message_stream()
asynchronous generator, which is very convenient to use.And of course, all cursor methods are fully asynchronous and non-blocking,
compatible with both vanilla asyncio's selector-based event loops
(tested on Unix and Windows) and Uvloop.
Related issue number
#287
Checklist
CHANGES
folder<issue_id>.<type>
(e.g.588.bugfix
)issue_id
change it to the pr id after creating the PR.feature
: Signifying a new feature..bugfix
: Signifying a bug fix..doc
: Signifying a documentation improvement..removal
: Signifying a deprecation or removal of public API..misc
: A ticket has been closed, but it is not of interest to users.Fix issue with non-ascii contents in doctest text files.