fastlane / services /elasticsearch.py
Hugo Guarin
Update space
c169262
raw
history blame
3.92 kB
import os
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
from models.product import Product
from models.customer import Customer
from models.order import Order
cloud_id = os.getenv("ELASTICSEARCH_CLOUD_ID")
api_key = os.getenv("ELASTICSEARCH_API_KEY")
try:
es = Elasticsearch(cloud_id=cloud_id, api_key=api_key)
except ConnectionError as e:
print("Error:", e)
def search_elasticsearch(index, query):
response = es.search(index=index, body={
"query": {
"multi_match": {
"query": query,
"fields": ["title", "description", "category", "tags"]
}
}
})
return response['hits']['hits']
def search_products_by_keywords(encoded_query):
body_query = {
"_source": ["ProductName", "Description", "Gender", "Price (INR)", "PrimaryColor"],
"knn": {
"field": "DescriptionVector",
"query_vector": encoded_query,
"k": 4,
"num_candidates": 500
}
}
response = es.search(index='products', body=body_query)
return response['hits']['hits']
def search_products_by_filters(filters):
response = es.search(index='products', body={
"query": {
"bool": {
"filter": filters
}
}
})
return response['hits']['hits']
def get_product_details(product_id):
response = es.get(index='products', id=product_id)
return response['_source']
def index_product(product: Product):
es.index(index="products", id=product.product_id, body=product.dict())
def index_customer(customer: Customer):
es.index(index="customers", id=customer.customer_id, body=customer.dict())
def get_customer(customer_id: str):
response = es.get(index="customers", id=customer_id)
return response['_source']
def update_customer(customer_id: str, updates: dict):
es.update(index="customers", id=customer_id, body={"doc": updates})
def index_order(order: Order):
es.index(index="orders", id=order.order_id, body=order.dict())
def track_order(order_id: str):
response = es.get(index="orders", id=order_id)
return response['_source']
def get_order_details(order_id):
s_order = Search(using=es, index='orders').query(
'match', **{'Order ID': order_id})
s_details = Search(using=es, index='order_details').query(
'match', **{'Order ID': order_id})
order_response = s_order.execute()
details_response = s_details.execute()
if order_response.hits.total.value > 0 and details_response.hits.total.value > 0:
order = order_response.hits[0].to_dict()
details = [detail.to_dict() for detail in details_response.hits]
return {
"order": order,
"details": details
}
else:
return "Order not found"
def add_to_cart(customer_id: str, product_id: str, quantity: int):
cart = es.get(index="carts", id=customer_id)['_source']
if 'items' not in cart:
cart['items'] = []
cart['items'].append({"product_id": product_id, "quantity": quantity})
es.index(index="carts", id=customer_id, body=cart)
return cart
def view_cart(customer_id: str):
response = es.get(index="carts", id=customer_id)
return response['_source']
def remove_from_cart(customer_id: str, product_id: str):
cart = es.get(index="carts", id=customer_id)['_source']
cart['items'] = [item for item in cart['items']
if item['product_id'] != product_id]
es.index(index="carts", id=customer_id, body=cart)
return cart
def checkout(customer_id: str):
cart = es.get(index="carts", id=customer_id)['_source']
order = {
"customer_id": customer_id,
"items": cart['items'],
"status": "processing"
}
es.index(index="orders", body=order)
es.delete(index="carts", id=customer_id)
return order