-
Notifications
You must be signed in to change notification settings - Fork 1
/
arize.py
155 lines (124 loc) · 5.8 KB
/
arize.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from opentelemetry import trace as trace_api
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk import trace as trace_sdk
from opentelemetry.sdk.trace.export import (
SimpleSpanProcessor,
)
import os
from langchain_openai import ChatOpenAI
from langchain.prompts import (
ChatPromptTemplate,
HumanMessagePromptTemplate,
SystemMessagePromptTemplate,
)
from langchain.output_parsers import (
ResponseSchema,
StructuredOutputParser,
BooleanOutputParser,
)
import json
from datetime import datetime
from pathlib import Path
from openinference.instrumentation.langchain import LangChainInstrumentor
from phoenix.otel import register
def save_call_to_json(
call_text: str, response_data: dict, filename: str = "emergency_calls.json"
) -> None:
# Create entry with timestamp
try:
call_entry = {
"timestamp": datetime.now().isoformat(),
"call_text": call_text,
"priority": response_data["priority"],
"department": response_data["department"],
"summary": response_data["summary"],
"confidence": response_data["confidence"],
}
except: # noqa: E722
print("bad stuff")
# Load existing data or create new list
file_path = Path(filename)
if file_path.exists():
try:
with open(file_path, "r") as f:
calls = json.load(f)
except json.JSONDecodeError:
calls = []
else:
calls = []
# Append new call and save
calls.append(call_entry)
with open(file_path, "w") as f:
json.dump(calls, f, indent=2)
# COLLECTOR_HOST = os.getenv("COLLECTOR_HOST", "localhost")
# endpoint = f"http://{COLLECTOR_HOST}:6006/v1"
# tracer_provider = trace_sdk.TracerProvider()
# tracer_provider.add_span_processor(
# SimpleSpanProcessor(OTLPSpanExporter(f"{endpoint}/traces"))
# )
# trace_api.set_tracer_provider(tracer_provider)
# tracer = trace_api.get_tracer(__name__)
tracer_provider = register(
project_name="911evaluator", # Default is 'default'
endpoint="http://localhost:6006/v1/traces",
)
LangChainInstrumentor().instrument(tracer_provider=tracer_provider)
# tracer_provider = register(
# project_name="my-llm-app", # Default is 'default'
# endpoint="http://localhost:6006/v1/traces",
# )
response_schemas = [
ResponseSchema(name="priority", description="The priority of the call"),
ResponseSchema(name="summary", description="A summary of the call"),
ResponseSchema(
name="department", description="The department the call dispatched to"
),
ResponseSchema(name="confidence", description="The confidence in the priority"),
]
output_parser = StructuredOutputParser.from_response_schemas(response_schemas)
format_instructions = output_parser.get_format_instructions()
user_prompt = HumanMessagePromptTemplate.from_template("{call_text}")
system_prompt = SystemMessagePromptTemplate.from_template("""
You are an operator for a emergency call line, you simply need to determine, rapidly, if the text you see qualifies as 1 of three possible options.
you will provide will be in the form of a JSON, such as (priority: 'RED' ) (summary: 'car suspicious hoodie' ) ( confidence: '80' )
1: RED - the words you are hearing qualify as needing rapid emergency services, and the operator will need to read text that says "This is an emergency CODE RED" - for example, if the person is bleeding, saying words like "please help" "coming after me" "knife" "gun" - things of that nature, that raise an immediate alarm.
2: ORANGE - the words you are hearing qualify as in the middle, where a sense of urgency is involved, but might not qualify as needing an ambulance deployed to the location immediately. This might happen if the person is saying "umm" or "wait" or if you are unsure if it's an immediate emergency, or if it could be something non-serious.
3: GREEN - the nature of the call doesn't sound immediately dangerous, as in, the person calling is reporting a missing cat, or issuing a noise complaint about their neighbor, which doesn't require any type of immediate response. Example might be, the person is calling about a suspicious person, or car in their neighborhood.
In addition, you will provide the department this call will be dispatched to.
1: EMS - the words you are hearing relate to people or living animals' physical health, such as bleeding out , injury, and coma.
2: FIRDEPT - the words you are hearing relate to the fire hazard in public space and require fire department to operate.
3: POLICEDEPT - the words you are hearing relate to the violence, community safety, and everything concerning policing.
An example output you will provide will be in the form of a JSON, such as
(priority: 'GREEN' ) (summary: 'cat tree lost' ) (department: 'POLICEDEPT') ( confidence: '60' )
Respond in JSON format with these fields:
{{
"priority": "PRIORITY_LEVEL",
"summary": "summary of event",
"confidence": "confidence of event",
"department": "DEPARTMENT"
}}
{format_instructions}""")
prompt_911 = ChatPromptTemplate.from_messages([system_prompt, user_prompt])
# Initialize the language model
llm_streaming = ChatOpenAI(
model_name="gpt-4o",
api_key=os.environ["OPENAI_API_KEY"],
temperature=1.0,
streaming=False,
)
def process_call(call_text: str) -> dict:
# Get the response from the language model
print("whatev")
response = llm_streaming.invoke(
prompt_911.format_messages(
call_text=call_text, format_instructions=format_instructions
)
)
print("whatev3")
# Parse the response
parsed_response = output_parser.parse(response.content)
print("whatev1")
# Save to JSON file
save_call_to_json(call_text, parsed_response)
print("whatev2")
return parsed_response