|
import os |
|
from elasticsearch import Elasticsearch |
|
from elasticsearch_dsl import Search |
|
from models.product import Product |
|
from models.customer import Customer |
|
from models.order import Order |
|
|
|
cloud_id = os.getenv("ELASTICSEARCH_CLOUD_ID") |
|
api_key = os.getenv("ELASTICSEARCH_API_KEY") |
|
|
|
try: |
|
es = Elasticsearch(cloud_id=cloud_id, api_key=api_key) |
|
|
|
except ConnectionError as e: |
|
print("Error:", e) |
|
|
|
|
|
def search_elasticsearch(index, query): |
|
response = es.search(index=index, body={ |
|
"query": { |
|
"multi_match": { |
|
"query": query, |
|
"fields": ["title", "description", "category", "tags"] |
|
} |
|
} |
|
}) |
|
return response['hits']['hits'] |
|
|
|
|
|
def search_products_by_keywords(encoded_query): |
|
body_query = { |
|
"_source": ["ProductName", "Description", "Gender", "Price (INR)", "PrimaryColor"], |
|
"knn": { |
|
"field": "DescriptionVector", |
|
"query_vector": encoded_query, |
|
"k": 4, |
|
"num_candidates": 500 |
|
} |
|
} |
|
|
|
response = es.search(index='products', body=body_query) |
|
return response['hits']['hits'] |
|
|
|
|
|
def search_products_by_filters(filters): |
|
response = es.search(index='products', body={ |
|
"query": { |
|
"bool": { |
|
"filter": filters |
|
} |
|
} |
|
}) |
|
return response['hits']['hits'] |
|
|
|
|
|
def get_product_details(product_id): |
|
response = es.get(index='products', id=product_id) |
|
return response['_source'] |
|
|
|
|
|
def index_product(product: Product): |
|
es.index(index="products", id=product.product_id, body=product.dict()) |
|
|
|
|
|
def index_customer(customer: Customer): |
|
es.index(index="customers", id=customer.customer_id, body=customer.dict()) |
|
|
|
|
|
def get_customer(customer_id: str): |
|
response = es.get(index="customers", id=customer_id) |
|
return response['_source'] |
|
|
|
|
|
def update_customer(customer_id: str, updates: dict): |
|
es.update(index="customers", id=customer_id, body={"doc": updates}) |
|
|
|
|
|
def index_order(order: Order): |
|
es.index(index="orders", id=order.order_id, body=order.dict()) |
|
|
|
|
|
def track_order(order_id: str): |
|
response = es.get(index="orders", id=order_id) |
|
return response['_source'] |
|
|
|
|
|
def get_order_details(order_id): |
|
s_order = Search(using=es, index='orders').query( |
|
'match', **{'Order ID': order_id}) |
|
s_details = Search(using=es, index='order_details').query( |
|
'match', **{'Order ID': order_id}) |
|
order_response = s_order.execute() |
|
details_response = s_details.execute() |
|
|
|
if order_response.hits.total.value > 0 and details_response.hits.total.value > 0: |
|
order = order_response.hits[0].to_dict() |
|
details = [detail.to_dict() for detail in details_response.hits] |
|
return { |
|
"order": order, |
|
"details": details |
|
} |
|
else: |
|
return "Order not found" |
|
|
|
|
|
def add_to_cart(customer_id: str, product_id: str, quantity: int): |
|
cart = es.get(index="carts", id=customer_id)['_source'] |
|
if 'items' not in cart: |
|
cart['items'] = [] |
|
cart['items'].append({"product_id": product_id, "quantity": quantity}) |
|
es.index(index="carts", id=customer_id, body=cart) |
|
return cart |
|
|
|
|
|
def view_cart(customer_id: str): |
|
response = es.get(index="carts", id=customer_id) |
|
return response['_source'] |
|
|
|
|
|
def remove_from_cart(customer_id: str, product_id: str): |
|
cart = es.get(index="carts", id=customer_id)['_source'] |
|
cart['items'] = [item for item in cart['items'] |
|
if item['product_id'] != product_id] |
|
es.index(index="carts", id=customer_id, body=cart) |
|
return cart |
|
|
|
|
|
def checkout(customer_id: str): |
|
cart = es.get(index="carts", id=customer_id)['_source'] |
|
order = { |
|
"customer_id": customer_id, |
|
"items": cart['items'], |
|
"status": "processing" |
|
} |
|
es.index(index="orders", body=order) |
|
es.delete(index="carts", id=customer_id) |
|
return order |
|
|