Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox prisma adapter #231

Open
wants to merge 28 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0353847
AP-5046 WIP prisma adapter.
kamilwylegala Sep 9, 2024
14fbfba
WIP prisma adapter.
kamilwylegala Sep 9, 2024
d55405e
Working test.
kamilwylegala Sep 11, 2024
e93bdad
WIP
kamilwylegala Sep 11, 2024
963014a
Working test for saving outbox entries.
kamilwylegala Nov 26, 2024
7444fab
failing test for updating.
kamilwylegala Nov 26, 2024
9c971fc
Bulk update + insert.
kamilwylegala Nov 27, 2024
5ad155f
Failed entries handling.
kamilwylegala Nov 27, 2024
d9e3d41
fetching entries up to the retry count limit.
kamilwylegala Nov 27, 2024
c61533d
Narrowed down types.
kamilwylegala Nov 27, 2024
552e0b6
lint fix
kamilwylegala Nov 27, 2024
f143384
Use generated db client from test dir.
kamilwylegala Nov 27, 2024
432875b
Build includes building test prisma client.
kamilwylegala Nov 27, 2024
ccf29a6
Fixed import.
kamilwylegala Nov 27, 2024
3f39f92
prisma main dependency.
kamilwylegala Nov 27, 2024
0b7d718
prisma client dev dependency.
kamilwylegala Nov 27, 2024
c2108e3
Build before lint.
kamilwylegala Nov 28, 2024
a7a9cb0
Ignore db client in biome.
kamilwylegala Nov 28, 2024
1b3c72c
Peer prisma.
kamilwylegala Nov 28, 2024
77b1d26
inferred type
kamilwylegala Nov 28, 2024
856ee17
debugging ci
kamilwylegala Nov 28, 2024
45d7310
keep prisma outside test folder in root
kamilwylegala Nov 28, 2024
00ca6d0
Fixed import in spec.
kamilwylegala Nov 28, 2024
33b954b
temp ts ignore.
kamilwylegala Nov 28, 2024
829c9ac
wait for db.
kamilwylegala Nov 28, 2024
1f39dbf
wait for db.
kamilwylegala Nov 28, 2024
8b818c1
wait for db.
kamilwylegala Nov 28, 2024
ab58167
Redundant docker start.
kamilwylegala Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions packages/outbox-prisma-adapter/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# outbox-prisma-adapter

This package provides a Prisma adapter for the Outbox pattern.

### Development

#### Tests

To run the tests, you need to have a PostgreSQL database running. You can use the following command to start a PostgreSQL database using Docker:

```sh
docker-compose up -d
```

Then update Prisma client:
```sh
npx prisma generate --schema=./test/schema.prisma
```
10 changes: 10 additions & 0 deletions packages/outbox-prisma-adapter/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
services:

postgres:
image: postgres:16.2
environment:
POSTGRES_USER: prisma
POSTGRES_PASSWORD: prisma
POSTGRES_DB: prisma
ports:
- 5432:5432
1 change: 1 addition & 0 deletions packages/outbox-prisma-adapter/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './lib/outbox-prisma-adapter'
67 changes: 67 additions & 0 deletions packages/outbox-prisma-adapter/lib/outbox-prisma-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import type { OutboxAccumulator, OutboxEntry } from '@message-queue-toolkit/outbox-core'
import type { OutboxStorage } from '@message-queue-toolkit/outbox-core/dist/lib/storage'
import { type CommonEventDefinition, getMessageType } from '@message-queue-toolkit/schemas'
import type { PrismaClient } from '@prisma/client'

export class OutboxPrismaAdapter<SupportedEvents extends CommonEventDefinition[]>
implements OutboxStorage<SupportedEvents>
{
constructor(
private readonly prisma: PrismaClient,
private readonly modelName: string,
) {}

createEntry(
outboxEntry: OutboxEntry<SupportedEvents[number]>,
): Promise<OutboxEntry<SupportedEvents[number]>> {
const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]

const messageType = getMessageType(outboxEntry.event)
return prismaModel.create({
data: {
id: outboxEntry.id,
type: messageType,
created: outboxEntry.created,
updated: outboxEntry.updated,
data: outboxEntry.data,
status: outboxEntry.status,
},
})
}

async flush(outboxAccumulator: OutboxAccumulator<SupportedEvents>): Promise<void> {
const entries = await outboxAccumulator.getEntries()

const prismaModel: PrismaClient[typeof this.modelName] = this.prisma[this.modelName]

for (const entry of entries) {
await prismaModel.upsert({
Copy link
Owner

@kibertoad kibertoad Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

upserts often have suboptimal performance with plenty of locking, can we somehow simplify this to be bulk inserts and bulk updates?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely. I'm working on making tests green. Once I cover all cases, Bulk update/insert would do the job.

Actually upsert has some weird behavior, maybe let's switch to bulk inserts/updates right away.

where: {
id: entry.id,
},
update: {
status: 'SUCCESS',
updated: new Date(),
},
create: {
id: entry.id,
type: getMessageType(entry.event),
created: entry.created,
updated: new Date(),
data: entry.data,
status: 'SUCCESS',
},
})
}
}

getEntries(maxRetryCount: number): Promise<OutboxEntry<SupportedEvents[number]>[]> {
return this.prisma[this.modelName].findMany({
where: {
retryCount: {
lte: maxRetryCount,
},
},
})
}
}
66 changes: 66 additions & 0 deletions packages/outbox-prisma-adapter/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{
"name": "@message-queue-toolkit/outbox-prisma-adapter",
"version": "0.1.0",
"private": false,
"license": "MIT",
"description": "OutboxStorage implementation for @message-queue-toolkit/outbox-core package.",
"maintainers": [
{
"name": "Igor Savin",
"email": "[email protected]"
}
],
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "del-cli dist && tsc",
"build:release": "del-cli dist && del-cli coverage && npm run lint && tsc --project tsconfig.release.json",
"test": "vitest",
"test:coverage": "npm test -- --coverage",
"test:ci": "npm run docker:start:dev && npm run test:coverage && npm run docker:stop:dev",
"lint": "biome check . && tsc --project tsconfig.json --noEmit",
"lint:fix": "biome check --write .",
"docker:start:dev": "docker compose up -d",
"docker:stop:dev": "docker compose down",
"prepublishOnly": "npm run build:release"
},
"peerDependencies": {
"@message-queue-toolkit/core": ">=14.0.0",
"@message-queue-toolkit/outbox-core": ">=0.1.0",
"@message-queue-toolkit/schemas": ">=4.0.0",
"@prisma/client": "^5.19.1"
},
"devDependencies": {
"@biomejs/biome": "1.8.3",
"@kibertoad/biome-config": "^1.2.1",
"@types/node": "^22.0.0",
"@vitest/coverage-v8": "^2.0.4",
"del-cli": "^5.1.0",
"prisma": "^5.19.1",
"typescript": "^5.5.3",
"uuidv7": "^1.0.2",
"vitest": "^2.0.4",
"zod": "^3.23.8"
},
"homepage": "https://github.com/kibertoad/message-queue-toolkit",
"repository": {
"type": "git",
"url": "git://github.com/kibertoad/message-queue-toolkit.git"
},
"keywords": [
"message",
"queue",
"queues",
"abstract",
"common",
"utils",
"notification",
"outbox",
"pattern"
],
"files": [
"README.md",
"LICENSE",
"dist/*"
]
}
212 changes: 212 additions & 0 deletions packages/outbox-prisma-adapter/test/outbox-prisma-adapter.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { InMemoryOutboxAccumulator, type OutboxEntry } from '@message-queue-toolkit/outbox-core'
import {
type CommonEventDefinition,
enrichMessageSchemaWithBase,
} from '@message-queue-toolkit/schemas'
import { PrismaClient } from '@prisma/client'
import { uuidv7 } from 'uuidv7'
import { afterAll, beforeAll, describe, expect, it } from 'vitest'
import { z } from 'zod'
import { OutboxPrismaAdapter } from '../lib/outbox-prisma-adapter'

const events = {
created: {
...enrichMessageSchemaWithBase(
'entity.created',
z.object({
message: z.string(),
}),
),
},
} satisfies Record<string, CommonEventDefinition>

type SupportedEvents = (typeof events)[keyof typeof events][]

describe('outbox-prisma-adapter', () => {
let prisma: PrismaClient
let outboxPrismaAdapter: OutboxPrismaAdapter<SupportedEvents>

beforeAll(async () => {
prisma = new PrismaClient()

outboxPrismaAdapter = new OutboxPrismaAdapter<SupportedEvents>(prisma, 'OutboxEntry')

await prisma.$queryRaw`create schema if not exists prisma;`
await prisma.$queryRaw`
CREATE TABLE prisma.outbox_entry (
id UUID PRIMARY KEY,
type TEXT NOT NULL,
created TIMESTAMP NOT NULL,
updated TIMESTAMP,
retry_count INT NOT NULL DEFAULT 0,
data JSONB NOT NULL,
status TEXT NOT NULL
)
`
})

afterAll(async () => {
await prisma.$queryRaw`DROP TABLE prisma.outbox_entry;`
await prisma.$queryRaw`DROP SCHEMA prisma;`
await prisma.$disconnect()
})

it('creates entry in DB via outbox storage implementation', async () => {
await outboxPrismaAdapter.createEntry({
id: uuidv7(),
event: events.created,
status: 'CREATED',
data: {
id: uuidv7(),
payload: {
message: 'TEST EVENT',
},
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>)

const entries = await outboxPrismaAdapter.getEntries(10)

expect(entries).toEqual([
{
id: expect.any(String),
type: 'entity.created',
created: expect.any(Date),
updated: expect.any(Date),
retryCount: 0,
data: {
id: expect.any(String),
payload: {
message: 'TEST EVENT',
},
metadata: {},
timestamp: expect.any(String),
},
status: 'CREATED',
},
])
})

it('should insert successful entries from accumulator', async () => {
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()

const entry1 = {
id: uuidv7(),
event: events.created,
status: 'CREATED',
data: {
id: uuidv7(),
payload: {
message: 'TEST EVENT',
},
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>
accumulator.add(entry1)

const entry2 = {
id: uuidv7(),
event: events.created,
status: 'CREATED',
data: {
id: uuidv7(),
payload: {
message: 'TEST EVENT 2',
},
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>
accumulator.add(entry2)

await outboxPrismaAdapter.flush(accumulator)

const entriesAfterFlush = await outboxPrismaAdapter.getEntries(10)

expect(entriesAfterFlush).toMatchObject([
{
id: entry1.id,
status: 'SUCCESS',
},
{
id: entry2.id,
status: 'SUCCESS',
},
])
})

it("should update successful entries' status to 'SUCCESS'", async () => {
const accumulator = new InMemoryOutboxAccumulator<SupportedEvents>()

const entry1 = {
id: uuidv7(),
event: events.created,
status: 'CREATED',
data: {
id: uuidv7(),
payload: {
message: 'TEST EVENT',
},
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>
accumulator.add(entry1)

const entry2 = {
id: uuidv7(),
event: events.created,
status: 'CREATED',
data: {
id: uuidv7(),
payload: {
message: 'TEST EVENT 2',
},
metadata: {},
timestamp: new Date().toISOString(),
},
retryCount: 0,
created: new Date(),
} satisfies OutboxEntry<SupportedEvents[number]>
accumulator.add(entry2)

await outboxPrismaAdapter.createEntry(entry1)
await outboxPrismaAdapter.createEntry(entry2)

const beforeFlush = await outboxPrismaAdapter.getEntries(10)
expect(beforeFlush).toMatchObject([
{
id: entry1.id,
status: 'CREATED',
},
{
id: entry2.id,
status: 'CREATED',
},
])

outboxPrismaAdapter.flush(accumulator)

const afterFlush = await outboxPrismaAdapter.getEntries(10)
expect(afterFlush).toMatchObject([
{
id: entry1.id,
status: 'SUCCESS',
},
{
id: entry2.id,
status: 'SUCCESS',
},
])
})
})
Loading
Loading