-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathfmbench_rag_setup.py
437 lines (387 loc) · 19.6 KB
/
fmbench_rag_setup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
import os
import json
import boto3
import logging
from pathlib import Path
from dotenv import load_dotenv
from botocore.config import Config
from pydantic import BaseModel, Field
from langchain.schema import Document
from colorama import init, Fore, Style
from typing import List, Dict, Any, Optional, Union
from langchain_aws import ChatBedrockConverse
from langchain_community.vectorstores import FAISS
from langchain.chains import create_retrieval_chain
from langchain_core.prompts import ChatPromptTemplate
from langchain_aws.embeddings.bedrock import BedrockEmbeddings
from langchain.chains.combine_documents import create_stuff_documents_chain
from botocore.session import get_session
from botocore.credentials import RefreshableCredentials
from langchain.text_splitter import RecursiveCharacterTextSplitter
# ----------------------------
# Setup Logging with Colorama
# ----------------------------
init(autoreset=True)
class ColoredFormatter(logging.Formatter):
def format(self, record):
msg = record.msg
if isinstance(msg, list):
formatted_messages = []
for m in msg:
cname = m.__class__.__name__
if cname == 'HumanMessage':
formatted = f"{Fore.GREEN}[Human] {m.content}"
elif cname == 'AIMessage':
formatted = f"{Fore.BLUE}[AI] {m.content}"
elif cname == 'ToolMessage':
formatted = f"{Fore.YELLOW}[Tool] {m.content}"
else:
formatted = str(m)
formatted_messages.append(formatted)
record.msg = "\n".join(formatted_messages)
return super().format(record)
# Create logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# Clear existing handlers to avoid duplicates
if logger.handlers:
logger.handlers.clear()
# Custom formatter with all requested fields separated by commas
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d,%(levelname)s,p%(process)d,%(filename)s,%(lineno)d,%(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Add handler with the custom formatter
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
# ----------------------------
# Load Environment Variables
# ----------------------------
env_path = Path(__file__).resolve().parent.parent / ".env"
try:
load_dotenv(dotenv_path=env_path)
except Exception as e:
logging.error("Error loading .env file: " + str(e))
class FMBenchRagSetup(BaseModel):
"""
Pydantic model for FMBench RAG Setup that encapsulates the entire configuration and setup process
"""
region: str = Field(default="us-east-1", description="AWS region to use for Amazon Bedrock")
data_file_path: Path = Field(default=Path("data/documents_1.json"), description="Path to the documents data file")
response_model_id: str = Field(default="us.anthropic.claude-3-5-haiku-20241022-v1:0", description="Bedrock model ID to use") #us.amazon.nova-pro-v1:0" us.anthropic.claude-3-5-haiku-20241022-v1:0
embedding_model_id: str = Field(default="amazon.titan-embed-text-v1", description="Amazon Bedrock embedding model to use")
retriever_k: int = Field(default=10, description="Number of documents to retrieve")
vector_db_path: Optional[str] = Field(default=os.path.join("indexes", "fmbench_index"), description="Path to load/save FAISS vector database")
bedrock_role_arn: Optional[str] = Field(default=None, description="ARN of the IAM role to assume for Bedrock cross-account access")
# These will be initialized in the setup method
bedrock_client: Optional[Any] = Field(default=None, exclude=True)
llm: Optional[Any] = Field(default=None, exclude=True)
documents: List[Document] = Field(default_factory=list, exclude=True)
vectorstore: Optional[Any] = Field(default=None, exclude=True)
retriever: Optional[Any] = Field(default=None, exclude=True)
rag_chain: Optional[Any] = Field(default=None, exclude=True)
# Configure logger
logger: logging.Logger = Field(default_factory=lambda: logging.getLogger(__name__), exclude=True)
class Config:
arbitrary_types_allowed = True
def __init__(self, **data):
super().__init__(**data)
self.setup_logger()
# Initialize Bedrock client if not provided
if self.bedrock_client is None:
self.bedrock_client = self._create_bedrock_client()
self.logger.info("Bedrock client initialized")
def _create_bedrock_client(self):
"""Create a Bedrock client, optionally with cross-account role assumption"""
config = Config(
retries = {
'max_attempts': 10,
'mode': 'adaptive'
}
)
# If a role ARN is provided, use cross-account access
if self.bedrock_role_arn:
self.logger.info(f"Initializing Bedrock client with cross-account role: {self.bedrock_role_arn}")
def get_credentials():
sts_client = boto3.client('sts')
assumed_role = sts_client.assume_role(
RoleArn=self.bedrock_role_arn,
RoleSessionName='bedrock-cross-account-session',
# Don't set DurationSeconds when role chaining
)
return {
'access_key': assumed_role['Credentials']['AccessKeyId'],
'secret_key': assumed_role['Credentials']['SecretAccessKey'],
'token': assumed_role['Credentials']['SessionToken'],
'expiry_time': assumed_role['Credentials']['Expiration'].isoformat()
}
session = get_session()
refresh_creds = RefreshableCredentials.create_from_metadata(
metadata=get_credentials(),
refresh_using=get_credentials,
method='sts-assume-role'
)
# Create a new session with refreshable credentials
session._credentials = refresh_creds
boto3_session = boto3.Session(botocore_session=session)
return boto3_session.client("bedrock-runtime", region_name=self.region, config=config)
else:
self.logger.info(f"Initializing Bedrock client for region: {self.region}")
return boto3.client("bedrock-runtime", region_name=self.region, config=config)
def setup_logger(self):
"""Set up the logger with proper formatting"""
# Clear existing handlers to avoid duplicates
if self.logger.handlers:
self.logger.handlers.clear()
# Custom formatter with all requested fields separated by commas
formatter = logging.Formatter(
"%(asctime)s.%(msecs)03d,%(levelname)s,p%(process)d,%(filename)s,%(lineno)d,%(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Add handler with the custom formatter
handler = logging.StreamHandler()
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
return self.logger
def setup(self):
"""Set up the RAG system with all components"""
# Initialize the LLM
self.llm = ChatBedrockConverse(
client=self.bedrock_client,
model=self.response_model_id
)
# Initialize embeddings model
embeddings_model = BedrockEmbeddings(
client=self.bedrock_client,
model_id=self.embedding_model_id
)
# Check if we should load an existing vector store
if self.vector_db_path and os.path.exists(self.vector_db_path):
self.logger.info(f"Loading vector store from {self.vector_db_path}")
self.vectorstore = FAISS.load_local(self.vector_db_path, embeddings_model, allow_dangerous_deserialization=True)
self.logger.info(f"Successfully loaded vector store from {self.vector_db_path}")
else:
self.logger.info(f"vector store path {self.vector_db_path} does not exist")
# Load documents and create vector store from scratch
documents_data = json.loads(self.data_file_path.read_text())
self.logger.info(f"Loaded {len(documents_data)} documents from {self.data_file_path}")
# Create text splitter
# Create specialized text splitter based on content type
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000, # Increased chunk size for better context
chunk_overlap=400, # More overlap to maintain context across chunks
separators=[
# Headers (preserve full sections)
"\n# ", "\n## ", "\n### ", "\n#### ",
# Lists and model information
"\n- ", "\n* ", "\n1. ",
# Tables often containing model data
"\n|", "|\n",
# YAML and code blocks
"\n---\n", "\n```", "```\n",
# Paragraphs and other breaks
"\n\n", "\n", " ",
# Fallback
""
],
keep_separator=True,
strip_whitespace=False,
length_function=len,
is_separator_regex=False
)
# Convert to Document objects and split into chunks
docs = []
for doc in documents_data:
content = doc["content"]
# Initialize metadata with file information
metadata = {
"filename": doc["filename"],
"path": doc["path"],
"directory": doc["directory"],
"extension": doc["extension"]
}
# Add any additional metadata if present
if "metadata" in doc:
metadata.update(doc["metadata"])
# Add content type detection
# Detect content type
if content.strip().startswith('---') or '.yaml' in metadata.get('source', '').lower() or '.yml' in metadata.get('source', '').lower():
metadata['content_type'] = 'yaml'
# Preserve indentation for YAML
content = '\n'.join(line for line in content.splitlines())
elif '```' in content:
metadata['content_type'] = 'markdown_with_code'
# Count code blocks
code_blocks = content.count('```')
metadata['code_blocks_count'] = code_blocks // 2 # Divide by 2 since each block has opening and closing
elif any(heading.startswith('#') for heading in content.splitlines()):
metadata['content_type'] = 'markdown_with_headers'
# Extract heading level information
max_heading_level = max(
(len(line.split()[0]) for line in content.splitlines() if line.startswith('#')),
default=0
)
metadata['max_heading_level'] = max_heading_level
else:
metadata['content_type'] = 'markdown'
docs.append(Document(
page_content=content,
metadata=metadata
))
self.documents = text_splitter.split_documents(docs)
# Create vector store
self.logger.info(f"Creating new vector store with {len(self.documents)} documents")
self.vectorstore = FAISS.from_documents(
documents=self.documents,
embedding=embeddings_model
)
# Save vector store if path is specified
if self.vector_db_path:
os.makedirs(os.path.dirname(os.path.abspath(self.vector_db_path)), exist_ok=True)
self.logger.info(f"Saving vector store to {self.vector_db_path}")
self.vectorstore.save_local(self.vector_db_path)
# Create retriever
self.retriever = self.vectorstore.as_retriever(
search_kwargs={'k': self.retriever_k}
)
# Create prompt template
system_prompt = (
"You are a friendly and helpful AI assistant that answers questions about the "
"Foundation Model Benchmarking Tool (FMBench). "
"Use the provided context to answer the question concisely. Do not use your prior knowledge to answer any question."
"If you don't know, say I do not know the answer to this question, please consult FMBench documentation.\n\n"
"When responding, consider the content type:\n\n"
"1. For YAML configuration (content_type: yaml):\n"
" - Preserve proper YAML indentation and structure\n"
" - Use --- as document separators when appropriate\n"
" - Include necessary comments to explain configuration options\n"
" - Follow YAML best practices for complex nested structures\n\n"
" - Always include YAML text between ```yaml and ```\n\n"
"2. For markdown with code (content_type: markdown_with_code):\n"
" - Keep code blocks intact with proper language tags\n"
" - Ensure code formatting and indentation is preserved\n"
" - Provide explanatory text around code examples\n\n"
"3. For markdown with headers (content_type: markdown_with_headers):\n"
" - Maintain the hierarchical structure of the content\n"
" - Use appropriate header levels in responses\n"
" - Preserve the logical flow of documentation\n\n"
"4. For plain markdown (content_type: markdown):\n"
" - Format response with clear paragraph structure\n"
" - Use appropriate markdown formatting\n\n"
"5. Remember to always include citations i.e. links to the original content that you have in the metadata in your final response\n\n"
"Context: {context}\n\n"
"Remember to validate syntax in your responses and maintain proper formatting "
"based on the content type. Use appropriate data types and structures."
)
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
("human", "{input}")
])
# Create the chain
qa_chain = create_stuff_documents_chain(self.llm, prompt)
self.rag_chain = create_retrieval_chain(self.retriever, qa_chain)
self.logger.info("RAG setup complete")
return self
def create_index(self):
"""Create a vector index from documents and save it to the specified path"""
if not self.vector_db_path:
raise ValueError("vector_db_path must be set to create and save an index")
# Initialize embeddings model
embeddings_model = BedrockEmbeddings(
client=self.bedrock_client,
model_id=self.embedding_model_id
)
# Load documents
documents_data = json.loads(self.data_file_path.read_text())
self.logger.info(f"Loaded {len(documents_data)} documents from {self.data_file_path}")
# Create text splitter
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=4000, # Increased chunk size for better context
chunk_overlap=400, # More overlap to maintain context across chunks
separators=[
# Headers (preserve full sections)
"\n# ", "\n## ", "\n### ", "\n#### ",
# Lists and model information
"\n- ", "\n* ", "\n1. ",
# Tables often containing model data
"\n|", "|\n",
# YAML and code blocks
"\n---\n", "\n```", "```\n",
# Paragraphs and other breaks
"\n\n", "\n", " ",
# Fallback
""
],
keep_separator=True,
strip_whitespace=False,
length_function=len,
is_separator_regex=False
)
# Convert to Document objects with content type detection
docs = []
for doc in documents_data:
content = doc["content"]
# Initialize metadata with file information
metadata = {
"filename": doc["filename"],
"path": doc["path"],
"directory": doc["directory"],
"extension": doc["extension"]
}
# Add any additional metadata if present
if "metadata" in doc:
metadata.update(doc["metadata"])
# Add content type detection
# Detect content type
if content.strip().startswith('---') or '.yaml' in metadata.get('source', '').lower() or '.yml' in metadata.get('source', '').lower():
metadata['content_type'] = 'yaml'
# Preserve indentation for YAML
content = '\n'.join(line for line in content.splitlines())
elif '```' in content:
metadata['content_type'] = 'markdown_with_code'
# Count code blocks
code_blocks = content.count('```')
metadata['code_blocks_count'] = code_blocks // 2 # Divide by 2 since each block has opening and closing
elif any(heading.startswith('#') for heading in content.splitlines()):
metadata['content_type'] = 'markdown_with_headers'
# Extract heading level information
max_heading_level = max(
(len(line.split()[0]) for line in content.splitlines() if line.startswith('#')),
default=0
)
metadata['max_heading_level'] = max_heading_level
else:
metadata['content_type'] = 'markdown'
docs.append(Document(
page_content=content,
metadata=metadata
))
self.documents = text_splitter.split_documents(docs)
# Create vector store and save it
self.logger.info(f"Creating vector store with {len(self.documents)} documents")
self.vectorstore = FAISS.from_documents(
documents=self.documents,
embedding=embeddings_model
)
# Create directory if it doesn't exist
os.makedirs(os.path.dirname(os.path.abspath(self.vector_db_path)), exist_ok=True)
self.logger.info(f"Saving vector store to {self.vector_db_path}")
self.vectorstore.save_local(self.vector_db_path)
self.logger.info(f"Vector index created and saved to {self.vector_db_path}")
return self
def query(self, question: str) -> Dict[str, Any]:
"""Run a query through the RAG system"""
if not self.rag_chain:
self.logger.warning("RAG chain not initialized, running setup first")
self.setup()
self.logger.info(f"Processing query: {question}")
result = self.rag_chain.invoke({"input": question})
self.logger.info(f"\n\nresult={result}\n\n")
# Build citations from document paths instead of URLs
citations = "Source(s): " + "\n".join([d.metadata['path'] for d in result['context']])
self.logger.info(f"citations={citations}")
answer = f"{result['answer']}\n\n{citations}"
self.logger.info(f"answer={answer}")
return answer