You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
556 lines
24 KiB
556 lines
24 KiB
# -*- coding: utf-8 -*-
|
|
###############################################################################
|
|
#
|
|
# Cybrosys Technologies Pvt. Ltd.
|
|
#
|
|
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>)
|
|
# Author: Sruthi Renjith (odoo@cybrosys.com)
|
|
#
|
|
# You can modify it under the terms of the GNU AFFERO
|
|
# GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
|
|
#
|
|
# You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
|
|
# (AGPL v3) along with this program.
|
|
# If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
###############################################################################
|
|
import io
|
|
import os
|
|
import pytesseract
|
|
import re
|
|
import spacy
|
|
from pdf2image import convert_from_bytes
|
|
from PIL import Image, ImageOps
|
|
from odoo import api, fields, models, _
|
|
from odoo.exceptions import ValidationError
|
|
|
|
|
|
class OCRDataTemplate(models.TransientModel):
|
|
"""Class to read document and extract the text from JPG, JPEG, PNG and
|
|
PDF files."""
|
|
|
|
_name = "ocr.data.template"
|
|
_description = "Data Retrieving Template"
|
|
_rec_name = "file_name"
|
|
|
|
image = fields.Binary(
|
|
string="Document", required=True, help="Upload .jpg, .jpeg, .png or .pdf files"
|
|
)
|
|
file_name = fields.Char(string="Document Name", help="Name of document")
|
|
image2 = fields.Image(string="Document", help="Uploaded document", store=True)
|
|
flag = fields.Boolean(
|
|
string="Flag", default=False, help="Flag to check document read or not"
|
|
)
|
|
data = fields.Text(string="Data", readonly=True, help="Content from the document")
|
|
model_name_id = fields.Many2one(
|
|
"ir.model",
|
|
string="Model",
|
|
domain="[('model', 'in', ['res.partner', 'account.move', "
|
|
"'hr.employee', 'hr.expense', 'sale.order', "
|
|
"'purchase.order'])]",
|
|
help="Model to which the data want to map",
|
|
)
|
|
model_field_ids = fields.Many2many(
|
|
"ir.model.fields",
|
|
string="Fields",
|
|
domain="[('model_id', '=', model_name_id)]",
|
|
help="Fields names to map data",
|
|
)
|
|
|
|
def data_segmentation(self, img):
|
|
"""
|
|
Function to do segmentation for the retrieved data after converting it
|
|
into image
|
|
"""
|
|
img = ImageOps.grayscale(img)
|
|
threshold_value = 176
|
|
img = img.point(lambda x: 255 if x > threshold_value else 0, "1")
|
|
img_rgb = ImageOps.invert(img.convert("RGB"))
|
|
segments = []
|
|
segment_bounds = img_rgb.getbbox()
|
|
while segment_bounds:
|
|
segment = img_rgb.crop(segment_bounds)
|
|
if segment.size[0] > 0 and segment.size[1] > 0:
|
|
segments.append(segment)
|
|
img_rgb = ImageOps.crop(img_rgb, segment_bounds)
|
|
segment_bounds = img_rgb.getbbox()
|
|
return segments
|
|
|
|
def action_get_data(self):
|
|
"""
|
|
Function to get the files in .jpg, .jpeg, .png and .pdf formats
|
|
"""
|
|
split_tup = os.path.splitext(self.file_name)
|
|
try:
|
|
# Getting the file path from ir.attachments
|
|
file_attachment = self.env["ir.attachment"].search(
|
|
[
|
|
"|",
|
|
("res_field", "!=", False),
|
|
("res_field", "=", False),
|
|
("res_id", "=", self.id),
|
|
("res_model", "=", "ocr.data.template"),
|
|
],
|
|
limit=1,
|
|
)
|
|
file_path = file_attachment._full_path(file_attachment.store_fname)
|
|
segmented_data = []
|
|
# Reading files in the format .jpg, .jpeg and .png
|
|
if (
|
|
split_tup[1] == ".jpg"
|
|
or split_tup[1] == ".jpeg"
|
|
or split_tup[1] == ".png"
|
|
):
|
|
with open(file_path, mode="rb") as f:
|
|
binary_data = f.read()
|
|
img = Image.open(io.BytesIO(binary_data))
|
|
# Calling the function to do segmentation
|
|
segmented_data = self.data_segmentation(img)
|
|
elif split_tup[1] == ".pdf":
|
|
# Reading files in the format .pdf
|
|
with open(file_path, mode="rb") as f:
|
|
pdf_data = f.read()
|
|
pages = convert_from_bytes(pdf_data)
|
|
# Making the contents in 2 or more pages into combined page
|
|
max_width = max(page.width for page in pages)
|
|
total_height = sum(page.height for page in pages)
|
|
resized_images = []
|
|
for page in pages:
|
|
resized_page = page.resize((2400, 1800))
|
|
resized_images.append(resized_page)
|
|
combined_image = Image.new("RGB", (max_width, total_height))
|
|
y_offset = 0
|
|
for resized_page in resized_images:
|
|
combined_image.paste(resized_page, (0, y_offset))
|
|
y_offset += resized_page.height
|
|
# Calling the segmentation function
|
|
segmented_data = self.data_segmentation(combined_image)
|
|
except Exception:
|
|
self.env["ocr.data.template"].search([], order="id desc", limit=1).unlink()
|
|
raise ValidationError(_("Cannot identify data"))
|
|
# Converting the segmented image into text using pytesseract
|
|
text = ""
|
|
for segment in segmented_data:
|
|
try:
|
|
text += pytesseract.image_to_string(segment) + "\n"
|
|
break
|
|
except Exception:
|
|
print("Could not convert")
|
|
raise ValidationError(_("Data cannot be read"))
|
|
# Assigning retrieved data into text field
|
|
self.data = text
|
|
self.flag = True
|
|
|
|
@api.onchange("model_name_id")
|
|
def onchange_model_name_id(self):
|
|
"""Function to update the Many2many field to empty"""
|
|
self.write({"model_field_ids": [(6, 0, [])]})
|
|
|
|
def find_person_name(self):
|
|
"""
|
|
Function to find person name from the retrieved text using 'spacy'
|
|
"""
|
|
person = ""
|
|
nlp = spacy.load("en_core_web_sm")
|
|
doc = nlp(self.data)
|
|
for entity in doc.ents:
|
|
if entity.label_ == "PERSON":
|
|
person = entity.text
|
|
break
|
|
return person
|
|
|
|
def get_order_line(self, text):
|
|
"""
|
|
Function to find product lines from retrieved data using regex
|
|
"""
|
|
product_line_list = []
|
|
quantities = []
|
|
unit_prices = []
|
|
product_regex = r"\[?(.+?)\]?\s*(.+)\n(?:HSN/SAC Code):\s+(\d+)"
|
|
quantity_regex = r"Quantity Unit\n([\d.\s\S]+)"
|
|
unit_price_regex = r"Amount\n([\d.\s\S]+)"
|
|
# Matching the pattern with the data
|
|
quantity_match = re.search(quantity_regex, text)
|
|
price_match = re.search(unit_price_regex, text)
|
|
if quantity_match:
|
|
quantity_unit_text = quantity_match.group(1)
|
|
# If matched finding a particular pattern for quantities
|
|
# form that group
|
|
quantities = re.findall(r"\d+\.\d+", quantity_unit_text)
|
|
if price_match:
|
|
price_unit_text = price_match.group(1)
|
|
# If matched finding a particular pattern for unit price
|
|
# form that group
|
|
unit_prices = re.findall(r"\d+\.\d+", price_unit_text)
|
|
# Finding the data that matches the pattern for products
|
|
products = re.findall(product_regex, text)
|
|
number_of_product = len(products)
|
|
number_of_qty = len(quantities)
|
|
number_of_price = len(unit_prices)
|
|
# Getting the products and its corresponding quantity and price
|
|
if number_of_product == number_of_qty == number_of_price:
|
|
for i in range(number_of_product):
|
|
product_line_list.append(
|
|
{
|
|
"product": products[i],
|
|
"quantity": quantities[i],
|
|
"price": unit_prices[i],
|
|
}
|
|
)
|
|
elif number_of_product == number_of_qty:
|
|
for i in range(number_of_product):
|
|
product_line_list.append(
|
|
{"product": products[i], "quantity": quantities[i]}
|
|
)
|
|
elif number_of_product == number_of_price:
|
|
for i in range(number_of_product):
|
|
product_line_list.append(
|
|
{"product": products[i], "price": unit_prices[i]}
|
|
)
|
|
elif products:
|
|
for i in range(number_of_product):
|
|
product_line_list.append({"product": products[i]})
|
|
return product_line_list
|
|
|
|
def action_process_data(self):
|
|
"""
|
|
Function to process the data after fetching it.
|
|
The fetched data are mapping into some models.
|
|
"""
|
|
phone_number = ""
|
|
email_address = ""
|
|
person = ""
|
|
phone_pattern = r"\(\d{3}\) \d{3}-\d{4}|\d{3}-\d{3}-\d{4}|\+\d{1}-\d{3}-\d{3}-\d{4}|\d{11}|P \+\d{3} \d{6}"
|
|
email_pattern = r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}"
|
|
if self.model_name_id.name == "Contact":
|
|
# Mapping the data into Contact module by fetching person name,
|
|
# phone number and email id from data
|
|
field_value = False
|
|
non_field_count = 0
|
|
for field in self.model_field_ids:
|
|
if field.name == "name" or field.name == "display_name":
|
|
person = self.find_person_name()
|
|
if not person:
|
|
raise ValidationError(_("Partner name cannot find"))
|
|
field_value = True
|
|
elif field.name == "phone":
|
|
phone = re.findall(phone_pattern, self.data)
|
|
if phone:
|
|
phone_number = phone[0]
|
|
elif field.name == "email":
|
|
email = re.findall(email_pattern, self.data)
|
|
if email:
|
|
email_address = email[0]
|
|
else:
|
|
non_field_count = 1
|
|
if not field_value and non_field_count == 1:
|
|
raise ValidationError(_("No data to map into the field"))
|
|
if person:
|
|
partner = self.env["res.partner"].search(
|
|
[("name", "=", person)], limit=1
|
|
)
|
|
if not partner:
|
|
# Creating record in res.partner
|
|
partner_record = self.env["res.partner"].create(
|
|
{"name": person, "email": email_address, "phone": phone_number}
|
|
)
|
|
else:
|
|
raise ValidationError(_("Partner already exist"))
|
|
else:
|
|
raise ValidationError(
|
|
_("Name field is not chosen to create" " partner")
|
|
)
|
|
if partner_record:
|
|
return {
|
|
"name": "Partner",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "res.partner",
|
|
"res_id": partner_record.id,
|
|
"view_id": self.env.ref("base.view_partner_form").id,
|
|
"target": "current",
|
|
}
|
|
elif self.model_name_id.name == "Journal Entry":
|
|
# Mapping data into Journal Entry. Creating a record in vendor bill
|
|
vendor_bill_flag = False
|
|
for field in self.model_field_ids:
|
|
# Taking the file path from ir.attachment
|
|
if field.name == "invoice_vendor_bill_id":
|
|
vendor_bill_flag = True
|
|
try:
|
|
file_attachment = self.env["ir.attachment"].search(
|
|
[
|
|
"|",
|
|
("res_field", "!=", False),
|
|
("res_field", "=", False),
|
|
("res_id", "=", self.id),
|
|
("res_model", "=", "ocr.data.template"),
|
|
],
|
|
limit=1,
|
|
)
|
|
file_path = file_attachment._full_path(
|
|
file_attachment.store_fname
|
|
)
|
|
with open(file_path, mode="rb") as f:
|
|
binary_data = f.read()
|
|
img = Image.open(io.BytesIO(binary_data))
|
|
# Resizing the image to improve the clarity
|
|
resized_img = img.resize(
|
|
(img.width * 2, img.height * 2), resample=Image.BICUBIC
|
|
)
|
|
except Exception:
|
|
raise ValidationError(_("Can't create vendor bill"))
|
|
# Converting the image into text using OCR python package
|
|
# pytesseract
|
|
try:
|
|
text = pytesseract.image_to_string(resized_img)
|
|
except Exception:
|
|
raise ValidationError(_("Can't create vendor bill"))
|
|
bill = self.env["digitize.bill"]
|
|
# Calling the function to create vendor bill
|
|
# from model digitize.bill
|
|
bill_record = bill.create_record(text)
|
|
return {
|
|
"name": "Bill",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "account.move",
|
|
"res_id": bill_record.id,
|
|
"view_id": self.env.ref("account.view_move_form").id,
|
|
"target": "current",
|
|
}
|
|
if not vendor_bill_flag:
|
|
raise ValidationError(_("No data to map into the field"))
|
|
elif self.model_name_id.name == "Employee":
|
|
# Mapping the data into Employee module by fetching person name,
|
|
# phone number and email
|
|
field_value = False
|
|
non_field_count = 0
|
|
for field in self.model_field_ids:
|
|
if (
|
|
field.name == "name"
|
|
or field.name == "display_name"
|
|
or field.name == "emergency_contact"
|
|
):
|
|
person = self.find_person_name()
|
|
if not person:
|
|
raise ValidationError(_("Employee name cannot find"))
|
|
field_value = True
|
|
elif (
|
|
field.name == "work_phone"
|
|
or field.name == "phone"
|
|
or field.name == "emergency_phone"
|
|
):
|
|
phone = re.findall(phone_pattern, self.data)
|
|
if phone:
|
|
phone_number = phone[0]
|
|
elif field.name == "private_email" or field.name == "work_email":
|
|
email = re.findall(email_pattern, self.data)
|
|
if email:
|
|
email_address = email[0]
|
|
else:
|
|
non_field_count = 1
|
|
if not field_value and non_field_count == 1:
|
|
raise ValidationError(_("No data to map into the field"))
|
|
if person:
|
|
partner = self.env["hr.employee"].search(
|
|
[("name", "=", person)], limit=1
|
|
)
|
|
if not partner:
|
|
# Creating a record in hr.employee by mapping the
|
|
# data into employee name, work phone and work email
|
|
employee_record = self.env["hr.employee"].create(
|
|
{
|
|
"name": person,
|
|
"work_email": email_address,
|
|
"work_phone": phone_number,
|
|
}
|
|
)
|
|
else:
|
|
raise ValidationError(_("Employee already exist"))
|
|
else:
|
|
raise ValidationError(_("Name field is not chosen to create employee"))
|
|
if employee_record:
|
|
return {
|
|
"name": "Employee",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "hr.employee",
|
|
"res_id": employee_record.id,
|
|
"view_id": self.env.ref("hr.view_employee_form").id,
|
|
"target": "current",
|
|
}
|
|
elif self.model_name_id.name == "Expense":
|
|
# Mapping the data into Expense module
|
|
expense_product = False
|
|
for field in self.model_field_ids:
|
|
if field.name == "name" or field.name == "product_id":
|
|
expense_product = True
|
|
product = self.env["product.product"].search(
|
|
[("name", "=", "BILL EXPENSE")], limit=1
|
|
)
|
|
if not product:
|
|
product = self.env["product.product"].create(
|
|
{
|
|
"name": "BILL EXPENSE",
|
|
}
|
|
)
|
|
expense_record = self.env["hr.expense"].create(
|
|
{
|
|
"product_id": product.id,
|
|
}
|
|
)
|
|
return {
|
|
"name": "Expense",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "hr.expense",
|
|
"res_id": expense_record.id,
|
|
"view_id": self.env.ref("hr_expense.hr_expense_view_form").id,
|
|
"target": "current",
|
|
}
|
|
if not expense_product:
|
|
raise ValidationError(
|
|
_("Can't create an expense without " "description or category")
|
|
)
|
|
elif self.model_name_id.name == "Sales Order":
|
|
# Mapping the data from PDF with proper format into Sale Order
|
|
sale_order = ""
|
|
partner = False
|
|
field_value = False
|
|
non_field_value = 0
|
|
for field in self.model_field_ids:
|
|
if field.name == "order_line":
|
|
field_value = True
|
|
person = self.find_person_name()
|
|
if person:
|
|
partner = self.env["hr.employee"].search(
|
|
[("name", "=", person)], limit=1
|
|
)
|
|
if not partner:
|
|
partner = self.env["hr.employee"].create(
|
|
{
|
|
"name": person,
|
|
}
|
|
)
|
|
# Calling the function to get order lines
|
|
product_line = self.get_order_line(self.data)
|
|
sale_order = self.env["sale.order"].create(
|
|
{
|
|
"partner_id": partner.id,
|
|
}
|
|
)
|
|
if product_line:
|
|
for item in product_line:
|
|
if "quantity" not in item.keys():
|
|
item.update({"quantity": 0})
|
|
if "price" not in item.keys():
|
|
item.update({"price": 0})
|
|
product = self.env["product.product"].search(
|
|
[("name", "=", item["product"])], limit=1
|
|
)
|
|
if not product:
|
|
product = self.env["product.product"].create(
|
|
{"name": item["product"]}
|
|
)
|
|
item.update({"product": product.id})
|
|
self.env["sale.order.line"].create(
|
|
{
|
|
"order_id": sale_order.id,
|
|
"product_id": item["product"],
|
|
"product_uom_qty": item["quantity"],
|
|
"price_unit": item["price"],
|
|
}
|
|
)
|
|
else:
|
|
non_field_value = 1
|
|
if sale_order:
|
|
return {
|
|
"name": "Sale order",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "sale.order",
|
|
"res_id": sale_order.id,
|
|
"view_id": self.env.ref("sale.view_order_form").id,
|
|
"target": "current",
|
|
}
|
|
if not field_value and non_field_value == 1:
|
|
raise ValidationError(_("No data to map into the field"))
|
|
elif self.model_name_id.name == "Purchase Order":
|
|
# Mapping the data from PDF with proper format into Purchase Order
|
|
purchase_order = ""
|
|
field_value = False
|
|
non_field_value = 0
|
|
partner = False
|
|
for field in self.model_field_ids:
|
|
if field.name == "order_line":
|
|
field_value = True
|
|
person = self.find_person_name()
|
|
if person:
|
|
partner = self.env["hr.employee"].search(
|
|
[("name", "=", person)], limit=1
|
|
)
|
|
if not partner:
|
|
partner = self.env["hr.employee"].create(
|
|
{
|
|
"name": person,
|
|
}
|
|
)
|
|
# Calling the function to get order lines
|
|
product_line = self.get_order_line(self.data)
|
|
purchase_order = self.env["purchase.order"].create(
|
|
{
|
|
"partner_id": partner.id,
|
|
}
|
|
)
|
|
if product_line:
|
|
for item in product_line:
|
|
if "quantity" not in item.keys():
|
|
item.update({"quantity": 0})
|
|
if "price" not in item.keys():
|
|
item.update({"price": 0})
|
|
product = self.env["product.product"].search(
|
|
[("name", "=", item["product"])], limit=1
|
|
)
|
|
if not product:
|
|
product = self.env["product.product"].create(
|
|
{"name": item["product"]}
|
|
)
|
|
item.update({"product": product.id})
|
|
self.env["purchase.order.line"].create(
|
|
{
|
|
"order_id": purchase_order.id,
|
|
"product_id": item["product"],
|
|
"product_uom_qty": item["quantity"],
|
|
"price_unit": item["price"],
|
|
}
|
|
)
|
|
else:
|
|
non_field_value = 1
|
|
if purchase_order:
|
|
return {
|
|
"name": "Purchase order",
|
|
"type": "ir.actions.act_window",
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "purchase.order",
|
|
"res_id": purchase_order.id,
|
|
"view_id": self.env.ref("purchase.purchase_order_form").id,
|
|
"target": "current",
|
|
}
|
|
if not field_value and non_field_value == 1:
|
|
raise ValidationError(_("No data to map into the field"))
|
|
|
|
@api.onchange("image")
|
|
def _onchange_image(self):
|
|
self.write({"image2": self.image})
|
|
|