Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(diagnostics): support compressed .mcstats streams, add tests for processing stream. #273

Merged
merged 8 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/extension.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ export function activate(context: vscode.ExtensionContext) {
async () => {
const fileUri = await vscode.window.showOpenDialog({
canSelectMany: false,
openLabel: 'Select diagnostics capture to replay',
openLabel: 'Open',
filters: {
'MC Stats files': ['mcstats'],
'All files': ['*'],
'MC Stats Files': ['mcstats'],
'All Files': ['*'],
},
});
if (!fileUri || fileUri.length === 0) {
Expand Down
1 change: 0 additions & 1 deletion src/panels/home-view-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ export class HomeViewProvider implements vscode.WebviewViewProvider {

private _refreshProfilerCaptures(capturesBasePath: string, newCaptureFileName?: string) {
if (!capturesBasePath) {
console.error('Captures path is invalid.');
return;
}
fs.readdir(capturesBasePath, (err, files) => {
Expand Down
3 changes: 3 additions & 0 deletions src/panels/minecraft-diagnostics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ export class MinecraftDiagnosticsPanel {
};
this._panel.webview.postMessage(message);
},
onNotification: (message: string) => {
window.showInformationMessage(message);
},
};

this._statsTracker.addStatListener(this._statsCallback);
Expand Down
72 changes: 72 additions & 0 deletions src/stats/replay-stats-provider.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import { describe, it, expect } from 'vitest';
import { ReplayStatsProvider } from './replay-stats-provider';
import { StatData, StatsListener } from './stats-provider';
import path from 'path';

describe('ReplayStatsProvider', () => {
it('should load base64-gzip encoded replay data and trigger events', async () => {
const replayFilePath = path.resolve('./test/diagnostics-replay-compressed.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let statCount = 0;
let statsCallback: StatsListener = {
onStatUpdated: (stat: StatData) => {
statCount++;
expect(stat).toBeDefined();
},
};
replay.addStatListener(statsCallback);
let results = await replay.start();
expect(results.statLinesRead).toBe(3);
expect(results.statEventsSent).toBe(3);
expect(statCount).toBeGreaterThan(0); // no idea how many are in there
});

it('should load uncompressed replay and trigger events', async () => {
const replayFilePath = path.resolve('./test/diagnostics-replay-uncompressed.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let statCount = 0;
let statsCallback: StatsListener = {
onStatUpdated: (stat: StatData) => {
statCount++;
expect(stat).toBeDefined();
},
};
replay.addStatListener(statsCallback);
let results = await replay.start();
expect(results.statLinesRead).toBe(3);
expect(results.statEventsSent).toBe(3);
expect(statCount).toBeGreaterThan(0);
});

it('should load no-header uncompressed replay and trigger events', async () => {
const replayFilePath = path.resolve('./test/diagnostics-replay-uncompressed-no-header.mcstats');
const replay = new ReplayStatsProvider(replayFilePath);
let statCount = 0;
let statsCallback: StatsListener = {
onStatUpdated: (stat: StatData) => {
statCount++;
expect(stat).toBeDefined();
},
};
replay.addStatListener(statsCallback);
let results = await replay.start();
expect(results.statLinesRead).toBe(3);
expect(results.statEventsSent).toBe(3);
expect(statCount).toBeGreaterThan(0);
});

it('should fire notification on invalid file read', async () => {
const replayFilePath = './not-a-real-file.mcstats';
const replay = new ReplayStatsProvider(replayFilePath);
let notification = '';
let statsCallback: StatsListener = {
onNotification: (message: string) => {
notification = message;
},
};
replay.addStatListener(statsCallback);
let results = await replay.start();
expect(results.statLinesRead).toBe(0);
expect(notification).toBe('Failed to read replay file.');
});
});
129 changes: 110 additions & 19 deletions src/stats/replay-stats-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,38 @@
import * as fs from 'fs';
import * as readline from 'readline';
import * as path from 'path';
import * as zlib from 'zlib';
import { StatMessageModel, StatsProvider, StatsListener } from './stats-provider';

interface ReplayStatMessageHeader {
encoding?: string;
}

export class ReplayResults {
statLinesRead: number = 0;
statEventsSent: number = 0;
}

export class ReplayStatsProvider extends StatsProvider {
private _replayFilePath: string;
private _replayStreamReader: readline.Interface | null;
private _simTickFreqency: number;
private _simTickPeriod: number;
private _simTickCurrent: number;
private _simTimeoutId: NodeJS.Timeout | null;
private _replayHeader: ReplayStatMessageHeader | undefined;
private _base64Gzipped: boolean;
private _pendingStats: StatMessageModel[];
private _replayResults: ReplayResults;
private _onComplete: ((results: ReplayResults) => void) | undefined;

// resume stream when lines drop below this threshold
private static readonly PENDING_STATS_BUFFER_MIN = 256;
// pause stream when lines exceed this threshold
private static readonly PENDING_STATS_BUFFER_MAX = ReplayStatsProvider.PENDING_STATS_BUFFER_MIN * 2;
// supported encodings
private readonly ENCODING_BASE64_GZIP = 'base64-gzip';
private readonly ENCODING_UTF8 = 'utf8';

// ticks per second (frequency)
private readonly MILLIS_PER_SECOND = 1000;
Expand All @@ -33,38 +50,53 @@ export class ReplayStatsProvider extends StatsProvider {
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._base64Gzipped = false;
this._pendingStats = [];
this._replayResults = new ReplayResults();
this._onComplete = undefined;
}

public override start() {
public override start(): Promise<ReplayResults> {
this.stop();

const fileStream = fs.createReadStream(this._replayFilePath);
this._replayStreamReader = readline.createInterface({
input: fileStream,
crlfDelay: Infinity,
});

this._replayStreamReader.on('line', line => this._onReadNextStatMessage(line));
this._replayStreamReader.on('close', () => this._onCloseStream());
this._replayStreamReader.on('line', line => this._onReadNextLineFromReplayStream(line));
this._replayStreamReader.on('close', () => this._onCloseReplayStream());
this._replayStreamReader.on('error', () => this._errorCloseReplayStream('Failed to read replay file.'));

// begin simulation
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
this._fireSpeedChanged();
this._firePauseChanged();

return new Promise<ReplayResults>(resolve => {
this._onComplete = resolve;
});
}

public override stop() {
this._fireStopped();
if (this._simTimeoutId) {
clearTimeout(this._simTimeoutId);
}
this._replayStreamReader?.close();
if (this._onComplete) {
this._onComplete(this._replayResults);
this._onComplete = undefined;
}
if (this._replayStreamReader) {
this._replayStreamReader.close();
this._replayStreamReader = null;
}
this._simTickFreqency = this.DEFAULT_SPEED;
this._simTickPeriod = this._calcSimPeriod(this._simTickFreqency);
this._simTickCurrent = 0;
this._simTimeoutId = null;
this._base64Gzipped = false;
this._pendingStats = [];
this._firePauseChanged();
}

public override pause() {
Expand Down Expand Up @@ -127,6 +159,7 @@ export class ReplayStatsProvider extends StatsProvider {
} else if (nextStatsMessage.tick === this._simTickCurrent) {
// process and remove the message, then increment sim tick
this.setStats(nextStatsMessage);
this._replayResults.statEventsSent++;
this._pendingStats.shift();
this._simTickCurrent++;
}
Expand All @@ -138,37 +171,95 @@ export class ReplayStatsProvider extends StatsProvider {
// schedule next update as long as we have pending data to process or there's still a stream to read
if (this._replayStreamReader || this._pendingStats.length > 0) {
this._simTimeoutId = setTimeout(() => this._updateSim(), this._simTickPeriod);
} else {
// no more data to process
this.stop();
}
}

private _onReadNextStatMessage(line: string) {
const statsMessageJson = JSON.parse(line);
// seed sim tick with first message
if (this._simTickCurrent === 0) {
this._simTickCurrent = statsMessageJson.tick;
private _onReadNextLineFromReplayStream(rawLine: string) {
if (this._replayHeader === undefined) {
try {
const headerJson = JSON.parse(rawLine);
if (headerJson.tick) {
this._replayHeader = {}; // no header, fall through to process this line as stat data
} else {
// first line was header, set encoding and return
this._replayHeader = headerJson as ReplayStatMessageHeader;
const encoding = this._replayHeader.encoding ?? this.ENCODING_UTF8;
this._base64Gzipped = encoding === this.ENCODING_BASE64_GZIP;
return;
}
} catch (error) {
this._errorCloseReplayStream('Failed to parse replay header.');
return;
}
}

let decodedLine = rawLine;
if (this._base64Gzipped) {
try {
const buffer = Buffer.from(rawLine, 'base64');
decodedLine = zlib.gunzipSync(buffer).toString('utf-8');
} catch (error) {
this._errorCloseReplayStream('Failed to decode replay data.');
return;
}
}

try {
const jsonLine = JSON.parse(decodedLine);
const statMessage = jsonLine as StatMessageModel;
// seed sim tick with first message
if (this._simTickCurrent === 0) {
this._simTickCurrent = statMessage.tick;
}
this._replayResults.statLinesRead++;
// add stats messages to queue
this._pendingStats.push(statMessage);
// pause stream reader if we've got enough data for now
if (this._pendingStats.length > ReplayStatsProvider.PENDING_STATS_BUFFER_MAX) {
this._replayStreamReader?.pause();
}
} catch (error) {
this._errorCloseReplayStream('Failed to process replay data.');
}
// add stats messages to queue
this._pendingStats.push(statsMessageJson as StatMessageModel);
// pause stream reader if we've got enough data for now
if (this._pendingStats.length > ReplayStatsProvider.PENDING_STATS_BUFFER_MAX) {
this._replayStreamReader?.pause();
}

private _errorCloseReplayStream(message: string) {
if (this._replayStreamReader) {
this._replayStreamReader.close();
this._replayStreamReader = null;
}
this._fireNotification(message);
}

private _onCloseStream() {
private _onCloseReplayStream() {
this._replayStreamReader = null;
}

private _fireSpeedChanged() {
this._statListeners.forEach((listener: StatsListener) => {
listener.onSpeedUpdated(this._simTickFreqency);
listener.onSpeedUpdated?.(this._simTickFreqency);
});
}

private _firePauseChanged() {
this._statListeners.forEach((listener: StatsListener) => {
// paused if no timeout id
listener.onPauseUpdated(this._simTimeoutId == null);
listener.onPauseUpdated?.(this._simTimeoutId == null);
});
}

private _fireStopped() {
this._statListeners.forEach((listener: StatsListener) => {
listener.onStopped?.();
});
}

private _fireNotification(message: string) {
this._statListeners.forEach((listener: StatsListener) => {
listener.onNotification?.(message);
});
}

Expand Down
10 changes: 6 additions & 4 deletions src/stats/stats-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ export interface StatMessageModel {
}

export interface StatsListener {
onStatUpdated: (stat: StatData) => void;
onSpeedUpdated: (speed: number) => void;
onPauseUpdated: (paused: boolean) => void;
onStatUpdated?: (stat: StatData) => void;
onSpeedUpdated?: (speed: number) => void;
onPauseUpdated?: (paused: boolean) => void;
onStopped?: () => void;
onNotification?: (message: string) => void;
}

export class StatsProvider {
Expand Down Expand Up @@ -89,7 +91,7 @@ export class StatsProvider {
values: stat.values ?? [],
tick: tick,
};
listener.onStatUpdated(statData);
listener.onStatUpdated?.(statData);

if (stat.children) {
stat.children.forEach((child: StatDataModel) => {
Expand Down
4 changes: 4 additions & 0 deletions test/diagnostics-replay-compressed.mcstats
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"encoding":"base64-gzip"}
H4sIAAAAAAAACq2RMQvCMBCF/0q5ORRaULSbQ3EVlDqIyJEebTBeS3JVRPzvJg4iiIPS8fE+vntwNxCjj1BM5rNsMs0VyLUnKGAtKOWZWHJQ4EPwUOxuwHiKbYtcWzrobuBQKNCtsbUjfmcaYnIonUuEvHyBShYj11Ce0Q4Ub2T5/r6/qxdRIRtrMVmGuAkinyxs3+J4vhGXVdnfsi06j5cRVjny3eA0rVAfU6oHjWI6TpvQxj+kT+w3+bueImPIf1IP7xkEAUoCAAA=
H4sIAAAAAAAACq2SzWrDMBCEX8XoLEzkn6TxLYfQa6ElPRQTtvJiq5ElI61dQsi7V84hGIyhLbkIlhl9O1rthZGSJ1bk2yeRr1PO6NwhK9grAe0HNJQwznwoPCs+LsxAO6oNmErjUdreBIEz2ShdOTRTT40GHZB1EaGnBdPekKJzEAfQPY49RFJeyyu/Ow5glNYQPYfyLYB8tNNdA4/jPTDZQfwb9g7Ow/cDUjn0tncSX0CeYqx6CaSsieugjv8Q32x/g0/xHt2A7jguTThaZeqlqBoH1DfjlJjlK8EzscnL+6q1SjrrUVpTeTZtJZ3qaEbYrNZ8vc1+ff/Lfs4YQohkxZM0Txc500fjOBiFfj6aH/azqd4/AwAA
H4sIAAAAAAAACq2RQQuCQBCF/4rMWQSlorx1kK5BUYeQGNZBl7YxdsdCxP/e2iGE6FB4fLyPbx5MB6LVBdL5ahnPF7MQpL0RpLATlOxOLAmE4HxwkJ46YLwObYVcGDqrumFfhKAqbQpLPGZKYrIotQ2EnHyBMhYtrS/vaBoabsRJ3ud9+CYOyNoYDDY+7r3IBWtzq3A634TLDvHfsiNah48JVllydWMVbVFdIioahaJrjkrfDn+IXthv8rGeBkaT+6SeqVlmQ0oCAAA=
3 changes: 3 additions & 0 deletions test/diagnostics-replay-uncompressed-no-header.mcstats

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions test/diagnostics-replay-uncompressed.mcstats

Large diffs are not rendered by default.

Loading