-
Notifications
You must be signed in to change notification settings - Fork 129
/
uROSimpleEventRepository.pas
137 lines (117 loc) · 4.14 KB
/
uROSimpleEventRepository.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
unit uROSimpleEventRepository;
interface
uses
uROEventRepository, uROClient, uROTypes, uROClientIntf,
uROHTTPWebsocketServer, uROSessions, Classes, SyncObjs;
type
TROSimpleWebsocketEventRepository = class(TInterfacedObject,
IROEventRepository)
private
FMessage: TROMessage;
FROServer: TROIndyHTTPWebsocketServer;
FEventCount: Integer;
protected
{IROEventRepository}
procedure AddSession(aSessionID : TGUID); overload;
procedure AddSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure RemoveSession(aSessionID : TGUID); overload;
procedure RemoveSession(aSessionID : TGUID; aEventSinkId: AnsiString); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String); overload;
procedure StoreEventData(SourceSessionID : TGUID; Data : Binary;
const ExcludeSender: Boolean;
const ExcludeSessionList: Boolean;
const SessionList: String;
const EventSinkId: AnsiString); overload;
function GetEventData(SessionID : TGUID; var TargetStream : Binary) : integer;
public
function GetEventWriter(const IID: TGUID): IROEventWriter;
property Message : TROMessage read FMessage write FMessage;
property ROServer: TROIndyHTTPWebsocketServer read FROServer write FROServer;
end;
implementation
uses
IdContext, IdIOHandlerWebsocket, Windows;
{ TSimpleEventRepository }
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.AddSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID;
aEventSinkId: AnsiString);
begin
//no session
end;
procedure TROSimpleWebsocketEventRepository.RemoveSession(aSessionID: TGUID);
begin
//no session
end;
function TROSimpleWebsocketEventRepository.GetEventWriter(
const IID: TGUID): IROEventWriter;
var
lEventWriterClass: TROEventWriterClass;
begin
lEventWriterClass := FindEventWriterClass(IID);
if not assigned(lEventWriterClass) then exit;
result := lEventWriterClass.Create(fMessage, Self) as IROEventWriter;
end;
function TROSimpleWebsocketEventRepository.GetEventData(SessionID: TGUID;
var TargetStream: Binary): integer;
begin
Result := -1;
Assert(False);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String; const EventSinkId: AnsiString);
begin
StoreEventData(SourceSessionID, Data, ExcludeSender, ExcludeSessionList, SessionList);
end;
procedure TROSimpleWebsocketEventRepository.StoreEventData(SourceSessionID: TGUID;
Data: Binary; const ExcludeSender, ExcludeSessionList: Boolean;
const SessionList: String);
var
i, iEventNr: Integer;
LContext: TIdContext;
l: TList;
ws: TIdIOHandlerWebsocket;
begin
l := ROServer.IndyServer.Contexts.LockList;
try
if l.Count <= 0 then Exit;
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
if iEventNr > 0 then
begin
InterlockedExchange(FEventCount, 0);
iEventNr := -1 * InterlockedIncrement(FEventCount); //negative = event, positive is normal RO message
end;
Assert(iEventNr < 0);
Data.Position := Data.Size;
Data.Write(C_ROWSNR, Length(C_ROWSNR));
Data.Write(iEventNr, SizeOf(iEventNr));
Data.Position := 0;
//direct write to ALL connections
for i := 0 to l.Count - 1 do
begin
LContext := TIdContext(l.Items[i]);
ws := (LContext.Connection.IOHandler as TIdIOHandlerWebsocket);
if not ws.IsWebsocket then Continue;
ws.Lock;
try
ws.Write(Data, wdtBinary);
finally
ws.Unlock;
end;
end;
finally
ROServer.IndyServer.Contexts.UnlockList;
end;
end;
end.