importrequestsimportjsonimporthashlibimporthmacimporttimefromurllib.parseimportquote# OpenSearch query exampleOPENSEARCH_HOST="http://opensearch-cn-hangzhou.aliyuncs.com"APP_NAME="product_search"# Basic keyword searchquery_params={"query":"query=title:'wireless headphones'","index_name":APP_NAME,"format":"json","start":0,"hit":10,"sort":"-price"# descending by price}# With filtersquery_with_filter={"query":"query=title:'wireless headphones'""&&filter=price>50 AND price<200""&&sort=-created_at","index_name":APP_NAME,"format":"json"}# With aggregationquery_with_agg={"query":"query=title:'wireless headphones'""&&aggregate=group_key:category,""agg_fun:count()","index_name":APP_NAME,"format":"json"}
importjsonfromdashscopeimportTextEmbeddingdefembed_and_push(products:list[dict],opensearch_client)->None:"""Generate embeddings for products and push to OpenSearch."""# Prepare texts for embedding (combine title + description)texts=[f"{p['title']}{p['description']}"forpinproducts]# Batch embed (max 25 per call)all_embeddings=[]foriinrange(0,len(texts),25):batch=texts[i:i+25]response=TextEmbedding.call(model="text-embedding-v3",input=batch,dimension=1024)all_embeddings.extend([item["embedding"]foriteminresponse.output["embeddings"]])# Prepare documents for OpenSearchdocs=[]forproduct,embeddinginzip(products,all_embeddings):doc={"cmd":"ADD","fields":{"id":product["id"],"title":product["title"],"description":product["description"],"category":product["category"],"price":product["price"],"vector_embedding":json.dumps(embedding)}}docs.append(doc)# Push to OpenSearch (batch API)opensearch_client.push(app_name="product_search",table_name="main",docs=docs)print(f"Pushed {len(docs)} documents with embeddings")# Example usageproducts=[{"id":1,"title":"Sony WH-1000XM5 Wireless Headphones","description":"Industry-leading noise cancelling with Auto NC Optimizer","category":"electronics","price":349.99},{"id":2,"title":"Apple AirPods Pro 2nd Generation","description":"Active noise cancellation with adaptive transparency","category":"electronics","price":249.99}]# embed_and_push(products, opensearch_client)
defvector_search(query_text:str,top_k:int=10)->list[dict]:"""Perform vector similarity search in OpenSearch."""# Step 1: Embed the queryquery_embedding=get_embedding(query_text)# Step 2: Build the vector queryvector_query={"vector_query":{"vector_embedding":{"vector":query_embedding,"top_k":top_k,"ef_search":200}},"index_name":"product_search","format":"json"}# Step 3: Execute and parse resultsresponse=opensearch_client.search(vector_query)results=json.loads(response)return[{"id":hit["fields"]["id"],"title":hit["fields"]["title"],"score":hit["score"],"price":hit["fields"]["price"]}forhitinresults["result"]["items"]]# This will find "bluetooth headphones" even if the query is "wireless earbuds"results=vector_search("wireless earbuds under 300 dollars")forrinresults:print(f" {r['title']} (score: {r['score']:.4f}, ${r['price']})")
User Query
|
v
[LLM Query Understanding] --> intent detection, query expansion, entity extraction
|
v
[Hybrid Search] --> keyword + vector retrieval
|
v
[LLM Re-ranking] --> re-score results based on semantic relevance
|
v
[LLM Answer Generation] --> generate a natural language answer from top results
|
v
Final Response (ranked results + generated answer)
# AI Search with query understanding enableddefai_search(query_text:str)->dict:"""Use OpenSearch AI Search with LLM-powered query understanding."""search_params={"query":query_text,"index_name":"product_search","ai_search":{"query_understanding":{"enabled":True,"intent_detection":True,"query_expansion":True,"entity_extraction":True},"reranking":{"enabled":True,"model":"ops-rerank-v1","top_k":20},"answer_generation":{"enabled":True,"model":"qwen-plus","max_tokens":500,"system_prompt":("You are a helpful product search assistant. ""Answer the user's question based on the search results provided. ""If no results match, say so clearly.")}},"format":"json","hit":10}response=opensearch_client.ai_search(search_params)result=json.loads(response)return{"answer":result.get("ai_answer",""),"expanded_query":result.get("expanded_query",""),"detected_intent":result.get("intent",""),"results":[{"title":hit["fields"]["title"],"score":hit["score"],"rerank_score":hit.get("rerank_score",None)}forhitinresult["result"]["items"]]}
# Example response from AI Searchresponse={"ai_answer":("Based on the available products, I recommend the Sony WH-1000XM5 ""($349.99) for the best noise cancellation, or the AirPods Pro 2 ""($249.99) for a more portable option. Both feature active noise ""cancellation and are highly rated for long flights."),"expanded_query":"comfortable headphones long flights noise cancelling","intent":"product_recommendation","result":{"items":[{"fields":{"title":"Sony WH-1000XM5","price":349.99},"score":0.95,"rerank_score":0.98},{"fields":{"title":"AirPods Pro 2nd Gen","price":249.99},"score":0.87,"rerank_score":0.91}]}}
fromopenaiimportOpenAI# Using DashScope-compatible APIclient=OpenAI(api_key="your-dashscope-api-key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")defrewrite_query(user_query:str)->dict:"""Use LLM to rewrite and expand the search query."""response=client.chat.completions.create(model="qwen-plus",messages=[{"role":"system","content":("You are a search query optimizer. Given a user query, output JSON with:\n""- rewritten_query: improved search query\n""- keywords: list of important keywords to match\n""- filters: extracted filters (category, price_min, price_max, brand)\n""Output ONLY valid JSON, no explanation.")},{"role":"user","content":user_query}],response_format={"type":"json_object"},temperature=0.1)returnjson.loads(response.choices[0].message.content)# Exampleresult=rewrite_query("cheap sony headphones for running")# {# "rewritten_query": "Sony sports headphones water resistant",# "keywords": ["Sony", "headphones", "sports", "running", "workout"],# "filters": {# "brand": "Sony",# "category": "sports_audio",# "price_max": 150# }# }
importrandomimporttimedefsearch_with_ab_test(query:str,user_id:str)->dict:"""Execute search with A/B test tracking."""# Deterministic variant assignment based on user_idvariant="A"ifhash(user_id)%2==0else"B"ifvariant=="A":# Control: keyword onlyresults=keyword_search(query)else:# Treatment: hybrid searchresults=hybrid_search(query,alpha=0.4)# Log for analysislog_entry={"timestamp":time.time(),"user_id":user_id,"query":query,"variant":variant,"result_ids":[r["id"]forrinresults],"result_count":len(results)}search_logger.info(json.dumps(log_entry))return{"results":results,"variant":variant}
# Function Compute trigger: runs when OpenSearch receives a new document# This is deployed as an Alibaba Cloud Function Compute functionimportjsonimportloggingfromdashscopeimportTextEmbeddinglogger=logging.getLogger()defhandler(event,context):"""
Triggered by OpenSearch document update.
Generates embedding and updates the document.
"""payload=json.loads(event)doc_id=payload["id"]title=payload.get("title","")description=payload.get("description","")# Generate embeddingtext=f"{title}{description}"response=TextEmbedding.call(model="text-embedding-v3",input=text,dimension=1024)ifresponse.status_code!=200:logger.error(f"Embedding failed for doc {doc_id}: {response.message}")return{"status":"error"}embedding=response.output["embeddings"][0]["embedding"]# Update document in OpenSearch with the new embeddingupdate_doc={"cmd":"UPDATE","fields":{"id":doc_id,"vector_embedding":json.dumps(embedding)}}opensearch_client.push(app_name="product_search",table_name="main",docs=[update_doc])logger.info(f"Updated embedding for doc {doc_id}")return{"status":"ok","doc_id":doc_id}
Client (browser/app)
|
v
Flask API ---------> DashScope (embeddings + LLM)
|
v
OpenSearch (hybrid keyword + vector search)
^
|
DTS (real-time sync from RDS)
^
|
RDS MySQL (source of truth for product data)
"""
app.py
Product search API with hybrid retrieval and LLM query understanding.
"""importjsonimporttimeimportloggingfromflaskimportFlask,request,jsonifyfromdashscopeimportTextEmbeddingfromopenaiimportOpenAIapp=Flask(__name__)logging.basicConfig(level=logging.INFO)logger=logging.getLogger(__name__)# DashScope client for LLM featuresllm_client=OpenAI(api_key="your-dashscope-api-key",base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")defget_embedding(text:str)->list[float]:"""Generate embedding for search query."""response=TextEmbedding.call(model="text-embedding-v3",input=text,dimension=1024)returnresponse.output["embeddings"][0]["embedding"]defunderstand_query(user_query:str)->dict:"""LLM-powered query understanding."""response=llm_client.chat.completions.create(model="qwen-plus",messages=[{"role":"system","content":("You are a product search query analyzer. ""Given a user query, output JSON with:\n""- search_query: optimized search terms\n""- filters: {category, brand, price_min, price_max} (omit if not specified)\n""- intent: 'product_search' | 'comparison' | 'recommendation'\n""Output ONLY valid JSON.")},{"role":"user","content":user_query}],response_format={"type":"json_object"},temperature=0.1)returnjson.loads(response.choices[0].message.content)defbuild_filter_string(filters:dict)->str:"""Convert extracted filters to OpenSearch filter syntax."""parts=[]if"category"infilters:parts.append(f"category=\"{filters['category']}\"")if"brand"infilters:parts.append(f"brand=\"{filters['brand']}\"")if"price_min"infilters:parts.append(f"price>={filters['price_min']}")if"price_max"infilters:parts.append(f"price<={filters['price_max']}")return" AND ".join(parts)ifpartselse""defhybrid_search(search_query:str,filters:str="",top_k:int=10)->list[dict]:"""Execute hybrid keyword + vector search."""query_embedding=get_embedding(search_query)# Build querykeyword_part=f"query=title:'{search_query}'^3 OR description:'{search_query}'"iffilters:keyword_part+=f"&&filter={filters}"keyword_part+="&&sort=-rating"search_params={"query":keyword_part,"vector_query":{"vector_embedding":{"vector":query_embedding,"top_k":top_k*2}},"rank_model":{"type":"rrf","parameters":{"k":60}},"index_name":"product_search","format":"json","hit":top_k}response=opensearch_client.search(search_params)returnjson.loads(response)["result"]["items"]defgenerate_answer(query:str,results:list[dict])->str:"""Generate natural language answer from search results."""ifnotresults:return"No products found matching your query."context="\n".join([f"- {r['fields']['title']}: {r['fields']['description']} "f"(${r['fields']['price']}, rating: {r['fields']['rating']})"forrinresults[:5]])response=llm_client.chat.completions.create(model="qwen-plus",messages=[{"role":"system","content":("You are a product search assistant. ""Based on the search results, give a brief, helpful answer. ""Mention specific products with prices. ""Keep it under 100 words.")},{"role":"user","content":f"Query: {query}\n\nSearch results:\n{context}"}],temperature=0.3,max_tokens=200)returnresponse.choices[0].message.content@app.route("/api/search",methods=["GET"])defsearch():"""Main search endpoint."""query=request.args.get("q","").strip()ifnotquery:returnjsonify({"error":"Missing query parameter 'q'"}),400page=int(request.args.get("page",1))page_size=min(int(request.args.get("size",10)),50)use_ai=request.args.get("ai","false").lower()=="true"start_time=time.time()try:# Step 1: Query understanding (if AI enabled)filters_str=""search_query=queryintent="product_search"ifuse_ai:understood=understand_query(query)search_query=understood.get("search_query",query)intent=understood.get("intent","product_search")filters_str=build_filter_string(understood.get("filters",{}))# Step 2: Hybrid searchraw_results=hybrid_search(search_query=search_query,filters=filters_str,top_k=page_size)# Step 3: Format resultsproducts=[{"id":hit["fields"]["id"],"title":hit["fields"]["title"],"description":hit["fields"]["description"],"category":hit["fields"]["category"],"brand":hit["fields"]["brand"],"price":hit["fields"]["price"],"rating":hit["fields"]["rating"],"stock_status":hit["fields"]["stock_status"],"image_url":hit["fields"]["image_url"],"score":hit["score"]}forhitinraw_results]# Step 4: Generate answer (if AI enabled)answer=Noneifuse_ai:answer=generate_answer(query,raw_results)elapsed=time.time()-start_timereturnjsonify({"query":query,"search_query":search_query,"intent":intent,"total":len(products),"products":products,"ai_answer":answer,"elapsed_ms":round(elapsed*1000,2)})exceptExceptionase:logger.error(f"Search error: {e}",exc_info=True)returnjsonify({"error":"Search failed","detail":str(e)}),500@app.route("/api/health",methods=["GET"])defhealth():"""Health check endpoint."""returnjsonify({"status":"ok"})if__name__=="__main__":app.run(host="0.0.0.0",port=5000,debug=False)
# Basic keyword searchcurl "http://localhost:5000/api/search?q=wireless+headphones"# Search with AI understandingcurl "http://localhost:5000/api/search?q=cheap+sony+headphones+for+running&ai=true"# Example response with AI enabled# {# "query": "cheap sony headphones for running",# "search_query": "Sony sports headphones water resistant",# "intent": "product_search",# "total": 5,# "products": [# {# "id": 42,# "title": "Sony WF-SP800N Wireless Sports Earbuds",# "description": "IP55 water resistant, noise cancelling...",# "category": "electronics",# "brand": "Sony",# "price": 128.00,# "rating": 4.3,# "score": 0.89# }# ],# "ai_answer": "For Sony running headphones on a budget, I recommend# the Sony WF-SP800N ($128) — they are IP55 water resistant with# active noise cancellation. If you want an even cheaper option,# the Sony WI-SP510 ($58) offers 15-hour battery life with an# IPX5 rating for sweat resistance.",# "elapsed_ms": 342.7# }# Filter by categorycurl "http://localhost:5000/api/search?q=laptop+stand&size=5"# Health checkcurl "http://localhost:5000/api/health"