-
Notifications
You must be signed in to change notification settings - Fork 2
/
compassion.js
500 lines (453 loc) · 19.9 KB
/
compassion.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
const url = require('url')
const stream = require('stream')
const Reactor = require('reactor')
const { Timer, Calendar } = require('happenstance')
const Kibitzer = require('kibitz')
const Keyify = require('keyify')
const ua = require('./ua')
const discover = require('./discover')
const embark = require('./embark')
// Node.js API.
const assert = require('assert')
// Exceptions that report nested exceptions that you can catch by type.
const Interrupt = require('interrupt')
// An `async`/`await` in-process message queue.
const { Queue } = require('avenue')
// Like SQL COALESCE, return the first defined value.
const { coalesce } = require('extant')
// An `async`/`await` map of future values.
const Cubbyhole = require('cubbyhole')
const Staccato = require('staccato')
const { Player, Recorder } = require('transcript')
const recorder = Recorder.create(() => '0')
const Verbatim = require('verbatim')
// Whenever you see this, you want to refactor it so that initialization takes
// place in the constructor, with the constructor accepting the first entry,
// snapshot and backlog of broadcasts. Don't do it. This of this as a state
// machine and the iniital state is waiting for an entry. Construction is not
// about initializing the application, it's about hooking up all the message
// plumbing which is already a challenge. You don't want to introduce what is
// essentially a factory object or function.
// It's in for a penny, in for a pound. That is why there is an initialize
// function which you're going to hate because you're going to imagine the
// GitHub Issues filling up with Redditors clucking like wet hens about
// immutability, but this code is righteous and pure.
//
class Compassion {
static Error = Interrupt.create('Compassion.Error', {
SNAPSHOT_STREAM_ERROR: 'error occurred while processing snapshot stream'
})
// Construct a `Compassion`.
//
constructor (destructible, { id, entry, kibitzer, ua, consumer }) {
// A bouquet of `Promise`s monitored by this `Compassion` instance.
this.destructible = destructible
this.id = id
this._ua = ua
// The Paxos event log.
this.log = new Queue
// Network events received by this `Compassion` instance.
this.events = new Queue
// Network events consumed by this `Compassion` instance.
this.consumed = new Queue
// Whether or not this `Compassion` instance has destructed.
this.destroyed = false
// Current Paxos government.
this._government = null
// Prevent calling application snapshot until after arrival.
this.snapshots = new Cubbyhole
// Mark ourselves as destroyed on destruction.
this.destructible.destruct(() => this.destroyed = true)
// Outbound messages.
this._messages = new Queue().shifter().paired
// **TODO** Let the messages swim to exit. Why? We're leaving. The log
// is supposed to just truncate anyway.
// Construct our application passing ourselves as the first argument.
this.application = consumer
this.application.initialize(this)
this._kibitzer = kibitzer
const log = kibitzer.paxos.log.shifter().async
destructible.destruct(() => log.destroy())
this.destructible.ephemeral('consume', async () => {
for await (const entry of log) {
await this._entry(entry)
}
})
}
enqueue (body) {
this._messages.queue.push({
module: 'compassion',
method: 'entry',
from: {
id: this.id,
arrived: this._government.arrived.promise[this.id]
},
body: body
})
}
async _entry (entry, queue, broadcast) {
Compassion.Error.assert(entry != null, 'entry eos')
this.events.push({ type: 'entry', id: this.id, entry })
this.log.push(entry)
if (entry.method == 'government') {
this._government = entry.government
const properties = entry.properties
// TODO What about a goverment that is neither an arrival or
// departure? What if in the future we rebalance and change the
// leader somehow. Governments already do change after arrival as
// members enter the government.
if (entry.body.arrive) {
const arrival = entry.body.arrive
if (entry.body.promise == '1/0') {
await this.application.bootstrap({
self: { id: this.id, arrived: this._government.arrived.promise[this. id] },
government: this._government
})
} else if (arrival.id == this.id) {
const subDestructible = this.destructible.ephemeral('snapshot')
const leader = this._government.properties[this._government.majority[0]].url
const promise = this._government.promise
const stream = await this._ua.stream(leader, './snapshot', { promise })
const staccato = new Staccato(stream)
const snapshot = new Queue().shifter().paired
// TODO destructible.error() ?
subDestructible.ephemeral('stream', async () => {
const errors = []
stream.on('error', errors.push.bind(errors))
await new Promise(resolve => stream.once('close', resolve))
Compassion.Error.assert(errors.length == 0, 'SNAPSHOT_STREAM_ERROR', errors)
})
// TODO CRC32 or FNV.
subDestructible.ephemeral('snapshot', async () => {
const player = new Player(function () { return '0' })
for await (const buffer of staccato.readable) {
for (const message of player.split(buffer)) {
snapshot.queue.push(Verbatim.deserialize(message.parts))
}
}
snapshot.queue.push(null)
subDestructible.destroy()
})
await this.application.join({
method: 'join',
self: { id: this.id, arrived: this._government.arrived.promise[this.id] },
shifter: snapshot.shifter,
entry: entry.body,
government: this._government
})
await subDestructible.done
}
if (arrival.id == this.id) {
this.destructible.durable('enqueue', async () => {
for await (const message of this._messages.shifter) {
this._kibitzer.publish(message)
}
})
this.destructible.destruct(() => this._messages.shifter.destroy())
}
await this.application.arrive({
self: { id: this.id, arrived: this._government.arrived.promise[this.id] },
arrival: entry.body,
government: this._government
})
this.snapshots.resolve(entry.promise, true)
} else if (entry.body.departed) {
this.snapshots.remove(entry.body.departed.promise)
await this.application.depart({
self: {
id: this.id,
arrived: this._government.arrived.promise[this.id]
},
method: 'depart',
body: entry.body,
government: this._government
})
} else {
// TODO Here is where we would do an optional government change.
}
if (entry.body.acclimate != null) {
await this.application.acclimated({
promise: entry.body.promise,
self: {
id: this.id,
arrived: this._government.arrived.promise[this.id]
},
method: 'acclimated',
body: entry.body,
government: this._government
})
}
this._kibitzer.paxos.acclimate()
} else {
// Bombs on a flush!
// Paxos body, Islander body, Compassion body, user body.
assert(entry.body.body.body)
// Reminder that if you ever want to do queued instead async then the
// queue should be external and a property of the object the
// compassion operates.
//
const envelope = entry.body.body
assert.equal(envelope.method, 'entry', 'expected entry method')
await this.application.entry({
promise: entry.promise,
self: { id: this.id, arrived: this._government.arrived.promise[this.id] },
method: 'entry',
from: envelope.from,
government: this._government,
entry: envelope.body
})
}
this.consumed.push(entry)
}
}
class Chaperon {
constructor () {
this.destructible = null
this._applications = {}
this._republic = Date.now()
this._population = []
}
_bound (destructible, { address, port }, { applications, census }) {
this.destructible = destructible
this._createdAt = Date.now()
this._applications = applications
this._calender = new Calendar
// TODO For now, we can crash restart on unrecoverable.
for (const application in applications) {
const id = `${application}/${address}:${port}`
const scheduleKey = Keyify.stringify({ application, id })
const subDestructible = destructible.durable(`application.${application}`)
subDestructible.destruct(() => this._calender.unschedule(scheduleKey))
const kibitzer = new Kibitzer(destructible.durable('kibitz'), {
id: id,
// TODO Make configurable.
ping: 1000,
timeout: 3000,
ua: {
send: async envelope => {
return ua.json(envelope.to.url, './kibitz', envelope)
}
}
})
const compassion = new Compassion(subDestructible.durable('compassion'), {
id: id,
ua: ua,
kibitzer: kibitzer,
consumer: applications[application]
})
subDestructible.destruct(() => {
kibitzer.paxos.log.push(null)
kibitzer.paxos.pinged.push(null)
kibitzer.paxos.outbox.push(null)
kibitzer.played.push(null)
kibitzer.islander.outbox.push(null)
})
this._applications[application] = {
application: applications[application],
compassion: compassion,
kibitzer: kibitzer
}
this._events = new Queue
this._calender.on('data', data => this._events.push(data.body))
const timer = new Timer(this._calender)
destructible.destruct(() => timer.destroy())
destructible.destruct(() => this._calender.clear())
destructible.destruct(() => this._events.push(null))
destructible.durable('chaperon', this._chaperon(this._events.shifter()))
const properties = applications[application].application.properties || {}
this._calender.schedule(Date.now(), scheduleKey, {
// TODO Configure location for proxies and such.
name: 'discover', application, id, properties
})
destructible.durable('census', async () => {
for await (const population of census) {
this._population = population
this._events.push({ name: 'census' })
}
})
}
}
async _chaperon (events) {
for await (const event of events) {
if (event.name == 'census') {
for (const { key, body } of this._calender.calendar()) {
this._calender.schedule(Date.now(), key, body)
}
continue
}
const { application, id } = event
const islanders = []
for (const location of this._population) {
const islander = await ua.json(location, `./compassion/${application}/paxos`)
if (islander != null) {
islanders.push({
id: islander.id,
government: islander.government,
cookie: islander.cookie,
url: url.resolve(location, `./compassion/${application}/`),
createdAt: this._createdAt
})
}
}
const complete = islanders.length == this._population.length && islanders.length != 0
const scheduleKey = Keyify.stringify({ application, id })
let action = null
switch (event.name) {
case 'discover':
action = discover(id, islanders, complete)
break
case 'embark':
action = embark(islanders, event.republic, complete)
break
case 'recoverable':
action = recoverable(id, islanders)
break
}
switch (action.action) {
case 'bootstrap': {
// **TODO** Remove defensive copy.
const properties = { ...event.properties, url: action.url }
this._applications[application].kibitzer.bootstrap(Date.now(), properties)
}
break
case 'join': {
this._applications[application].kibitzer.join(action.republic)
const properties = { ...event.properties, url: action.url }
this._calender.schedule(Date.now(), scheduleKey, {
name: 'embark',
application: event.application,
id: event.id,
url: action.url,
republic: action.republic,
properties: properties
})
}
break
case 'embark': {
// Schedule a subsequent embarkation. Once our Paxos object
// starts receiving message, the embarkation is cleared and
// replaced with a recoverable check.
this._calender.schedule(Date.now() + 5000 /* this._ping.chaperon */, scheduleKey, event)
// If this fails, we don't care. Embarkation is designed to be
// asynchronous in the macro. You send a message. Maybe it gets
// there, maybe it doesn't. You'll know when the Paxos object
// starts working. Until then, keep sending embarkation
// requests. The leader knows how to deal with duplicate or
// in-process requests.
await ua.json(action.url, './embark', {
republic: event.republic,
id: this._applications[application].kibitzer.paxos.id,
cookie: this._applications[application].kibitzer.paxos.cookie,
properties: event.properties
})
}
break
case 'retry': {
this._calender.schedule(Date.now() + 5000 /* this._ping.chaperon */, event.key, event)
}
break
case 'unrecoverable': {
// TODO Okay, we restart these things if we cannot recover
// consensus.
application.destructible.destroy()
}
break
}
}
}
async index () {
}
_getApplication404 (application) {
if (! this._applications[application]) {
throw 404
}
return this._applications[application]
}
async paxos ({ params, url }) {
const application = this._applications[params.application]
if (application != null) {
return {
id: application.kibitzer.paxos.id,
government: application.kibitzer.paxos.government,
cookie: coalesce(application.kibitzer.paxos.cookie),
createdAt: application.createdAt
}
}
return null
}
async embark ({ params: { application }, body: { id, republic, cookie, properties } }) {
return this._getApplication404(application).kibitzer.embark(republic, id, cookie, properties)
}
async kibitz ({ params: { application }, body }) {
return this._getApplication404(application).kibitzer.request(body)
}
async _snapshot ({ params, body: { promise } }, reply) {
const got = this._getApplication404(params.application)
const application = got.application
await got.compassion.snapshots.get(promise)
const snapshot = new Queue().shifter().paired
const subDestructible = this.destructible.ephemeral(`snapshot.send.${params.application}.${promise.replace('/', '.')}`)
const through = new stream.PassThrough({ emitClose: true })
subDestructible.durable('stream', async () => {
const errors = []
through.on('error', errors.push.bind(errors))
await new Promise(resolve => through.once('close', resolve))
Compassion.Error.assert(errors.length == 0, 'SNAPSHOT_STREAM_ERROR', errors)
})
subDestructible.durable('send', async () => {
const staccato = new Staccato(through)
for await (const object of snapshot.shifter) {
await staccato.writable.write([ recorder([ Verbatim.serialize(object) ]) ])
}
await staccato.writable.end()
through.destroy()
subDestructible.destroy()
})
subDestructible.durable('snapshot', async () => {
application.snapshot({ promise, queue: snapshot.queue })
subDestructible.destroy()
})
reply.code(200)
reply.header('content-type', 'application/octet-stream')
reply.send(through)
await subDestructible.done
return null
}
}
exports.listen = async function (destructible, options) {
const compassion = new Chaperon(destructible, options)
const reactor = new Reactor([{
path: '/',
method: 'get',
f: compassion.index.bind(compassion)
}, {
path: '/compassion/:application/paxos',
method: 'get',
f: compassion.paxos.bind(compassion)
}, {
path: '/compassion/:application/embark',
method: 'post',
f: compassion.embark.bind(compassion)
}, {
path: '/compassion/:application/kibitz',
method: 'post',
f: compassion.kibitz.bind(compassion)
}, {
path: '/compassion/:application/snapshot',
method: 'post',
raw: true,
f: compassion._snapshot.bind(compassion)
}])
await reactor.fastify.listen(options.bind)
reactor.on('reply', ({ code, stack, url, path }) => {
if (stack != null) {
console.log(code, url, path, stack)
} else if (Math.floor(code / 100) != 2) {
console.log(code, url, path)
}
})
destructible.destruct(() => destructible.ephemeral('close', () => reactor.fastify.close()))
const address = reactor.fastify.server.address()
compassion._bound(destructible, address, options)
return address
}