You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

383 lines
17 KiB

# -*- coding: utf-8 -*-
################################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2023-TODAY Cybrosys Technologies(<https://www.cybrosys.com>).
# Author: Unnimaya C O (odoo@cybrosys.com)
#
# You can modify it under the terms of the GNU AFFERO
# GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
#
# You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
# (AGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
import json
import requests
from odoo import fields, models, _
from odoo.exceptions import ValidationError
class ResCompany(models.Model):
"""Inherits Res Company for including Pipedrive credential fields"""
_inherit = 'res.company'
api_key = fields.Char(string='Token',
help="It is used to connect with Pipedrive"
)
import_product = fields.Boolean(string="Import Product",
help="Check if you want to Import Products "
"from Pipedrive")
import_contact = fields.Boolean(string="Import Contact",
help="Check if you want to Import Contacts "
"from Pipedrive")
import_lead = fields.Boolean(string="Import Lead",
help="Check if you want to Import Leads from"
" Pipedrive")
export_product = fields.Boolean(string="Export Product",
help="Check if you want to Export Products "
"to Pipedrive")
export_contact = fields.Boolean(string="Export Contact",
help="Check if you want to Export Contacts"
" to Pipedrive")
export_lead = fields.Boolean(string="Export Lead",
help="Check if you want to Export Leads to"
" Pipedrive")
pipedrive_reference = fields.Char(string='Pipedrive Id',
help="Pipedrive Id of the Company")
product_webhook = fields.Boolean(string='Product Webhook',
help='True if update webhook for '
'products is created')
contact_webhook = fields.Boolean(string='Contact Webhook',
help='True if update webhook for '
'contact is created')
def calculate_total_tax_percentage(self, product):
"""Method for calculating total tax"""
total_percentage_tax = 0.0
# Percentage Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'percent'):
total_percentage_tax += tax.amount
# Group Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'group'):
for child_tax in tax.children_tax_ids.filtered(
lambda t: t.amount_type == 'percent'):
total_percentage_tax += child_tax.amount
# Fixed Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'fixed'):
total_percentage_tax += (tax.amount / product.list_price) * 100
# Division Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'division'):
total_percentage_tax += (product.list_price / tax.factor) * 100
return total_percentage_tax
def action_execute(self):
"""For executing Import and Export between Odoo and Pipedrive"""
if not self.api_key:
raise ValidationError(_('Please Enter an API Key'))
if self.import_product:
self.get_products()
if self.import_contact:
self.get_contacts()
if self.import_lead:
self.get_leads()
if self.export_product:
self.export_products_to_pipedrive()
if self.export_contact:
self.export_contacts_to_pipedrive()
if self.export_lead:
self.export_leads_to_pipedrive()
def get_products(self):
"""Receives Products from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/products',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
self.create_product_category()
for data in response.json()['data']:
pipedrive_reference = self.env['product.template'].search(
[]).mapped(
'pipedrive_reference')
if str(data['id']) not in pipedrive_reference:
if not self.product_webhook:
self.create_webhook(
'deleted',
'/delete_pipedrive_product',
'product')
self.create_webhook(
"updated",
'/update_pipedrive_product',
'product')
uom_id = 1
if data['unit']:
for rec in self.env['uom.uom'].search([]).mapped(
'name'):
if rec.lower() == data['unit'].lower():
uom_id = self.env['uom.uom'].search(
[('name', '=', rec)]).id
if data['prices'][0]['price']:
currency = self.env['res.currency'].search(
[('name', '=', data['prices'][0]['currency']),
('active', 'in', [True, False])])
if not currency.active:
currency.active = True
product = self.env['product.template'].create({
'name': data['name'],
'description': data['description'],
'uom_id': uom_id,
'uom_po_id': uom_id,
'list_price': data['prices'][0]['price'],
'standard_price': data['prices'][0]['cost'],
'taxes_id': False,
'pipedrive_reference': data['id'],
'categ_id': self.env['product.category'].search([(
'pipedrive_reference', '=', data['category'])]).id
if data['category'] else 1
})
product.taxes_id.unlink()
if data['tax'] != 0:
tax = self.env['account.tax'].search(
[('amount_type', '=', 'percent'),
('type_tax_use', '=', 'sale'), ('amount',
'=',
data['tax'])])
if not tax:
tax = self.env['account.tax'].create({
'name': 'Tax ' + str(data['tax']) + '%',
'amount_type': 'percent',
'type_tax_use': 'sale',
'amount': data['tax']
})
product.write({
"taxes_id": [(4, tax.id)]
})
def create_product_category(self):
"""Returns product category from category_id"""
response = requests.get(
url='https://api.pipedrive.com/v1/productFields',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
for rec in response.json()['data']:
if rec['key'] == 'category':
for item in rec['options']:
category = self.env['product.category'].search(
[('name', '=', item['label'])])
if not category:
self.env['product.category'].create(
{
'name': item['label'],
'pipedrive_reference': item['id']
}
)
else:
category.write({
'pipedrive_reference': item['id']
})
def get_contacts(self):
"""Receives contacts from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/persons',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
for data in response.json()['data']:
partner_id = self.env['res.partner'].search([]).mapped(
'pipedrive_reference')
if str(data['id']) not in partner_id:
self.env['res.partner'].create({
'name': data['name'],
'phone': data['phone'][0]['value'],
'email': data['email'][0]['value'],
'pipedrive_reference': data['id']
})
if not self.contact_webhook:
self.create_webhook(
'updated', '/update_pipedrive_contact',
'person')
self.create_webhook(
'deleted', '/delete_pipedrive_contact',
'person')
def get_leads(self):
"""Receives leads from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/leads',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
for data in response.json()['data']:
lead_id = self.env['crm.lead'].search([]).mapped(
'pipedrive_reference')
expected_revenue = 0
if data['value']:
currency = self.env['res.currency'].search(
[('name', '=', data['value']['currency']),
('active', 'in', [True, False])])
if not currency.active:
currency.active = True
expected_revenue = currency.compute(
data['value']['amount'], self.env.company.currency_id)
if str(data['id']) not in lead_id:
self.env['crm.lead'].create({
'name': data['title'],
'type': 'opportunity',
'expected_revenue': expected_revenue,
'pipedrive_reference': data['id']
})
def export_products_to_pipedrive(self):
"""Export Products from Odoo to Pipedrive"""
for product in self.env['product.template'].search(
[('pipedrive_reference', '=', False)]):
data = {
'name': product.name,
'unit': product.uom_id.name,
'tax': self.calculate_total_tax_percentage(product),
'prices': [{
'price': product.list_price,
'currency': self.env.company.currency_id.name
}]
}
response = requests.post(
url='https://api.pipedrive.com/v1/products',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
product.write(
{'pipedrive_reference': response.json()['data']['id']})
if not self.product_webhook:
self.create_webhook(
'updated', '/update_pipedrive_product',
'product')
self.create_webhook(
'deleted', '/delete_pipedrive_product',
'product')
def export_contacts_to_pipedrive(self):
"""Export Contacts from Odoo to Pipedrive"""
for partner in self.env['res.partner'].search(
[('pipedrive_reference', '=', False)]):
self.create_contact(partner)
def create_contact(self, partner):
"""Create Persons in Pipedrive"""
data = {
'name': partner.name,
'email': partner.email,
'phone': partner.phone
}
response = requests.post(
url='https://api.pipedrive.com/v1/persons',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
partner.sudo().write(
{'pipedrive_reference': response.json()['data']['id']})
if not self.contact_webhook:
self.create_webhook(
'updated', '/update_pipedrive_contact',
'person')
self.create_webhook(
'deleted', '/delete_pipedrive_contact',
'person')
return response.json()['data']['id']
def create_webhook(self, event_action, url, event_object):
"""Method for creating contact webhook in Pipedrive"""
payload = json.dumps({
"subscription_url": self.env['ir.config_parameter'].get_param(
'web.base.url') + url,
"event_action": event_action,
"event_object": event_object
})
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_token': self.api_key,
}
response = requests.request("POST",
"https://api.pipedrive.com/v1/"
"webhooks",
headers=headers, data=payload,
params=params,
timeout=10)
if not response.json()['success']:
if 'error' in response.json().keys():
raise ValidationError(
response.json()['error'])
if 'message' in response.json().keys():
raise ValidationError(
response.json()['message'])
if event_object == 'person':
self.contact_webhook = True
elif event_object == 'product':
self.product_webhook = True
def export_leads_to_pipedrive(self):
"""Export Leads from Odoo to Pipedrive"""
for lead in self.env['crm.lead'].search(
[('pipedrive_reference', '=', False),
('partner_id', '!=', False)]):
if not lead.partner_id.pipedrive_reference:
self.create_contact(lead.partner_id)
data = {
'title': lead.name,
'person_id': int(lead.partner_id.pipedrive_reference),
'value': {
'amount': lead.expected_revenue,
'currency': self.env.company.currency_id.name
}
}
response = requests.post(
url='https://api.pipedrive.com/v1/leads',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
lead.write(
{'pipedrive_reference': response.json()['data']['id']})