You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
383 lines
17 KiB
383 lines
17 KiB
# -*- coding: utf-8 -*-
|
|
################################################################################
|
|
#
|
|
# Cybrosys Technologies Pvt. Ltd.
|
|
#
|
|
# Copyright (C) 2023-TODAY Cybrosys Technologies(<https://www.cybrosys.com>).
|
|
# Author: Unnimaya C O (odoo@cybrosys.com)
|
|
#
|
|
# You can modify it under the terms of the GNU AFFERO
|
|
# GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
|
|
#
|
|
# You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
|
|
# (AGPL v3) along with this program.
|
|
# If not, see <http://www.gnu.org/licenses/>.
|
|
#
|
|
################################################################################
|
|
import json
|
|
import requests
|
|
from odoo import fields, models, _
|
|
from odoo.exceptions import ValidationError
|
|
|
|
|
|
class ResCompany(models.Model):
|
|
"""Inherits Res Company for including Pipedrive credential fields"""
|
|
_inherit = 'res.company'
|
|
|
|
api_key = fields.Char(string='Token',
|
|
help="It is used to connect with Pipedrive"
|
|
)
|
|
import_product = fields.Boolean(string="Import Product",
|
|
help="Check if you want to Import Products "
|
|
"from Pipedrive")
|
|
import_contact = fields.Boolean(string="Import Contact",
|
|
help="Check if you want to Import Contacts "
|
|
"from Pipedrive")
|
|
import_lead = fields.Boolean(string="Import Lead",
|
|
help="Check if you want to Import Leads from"
|
|
" Pipedrive")
|
|
export_product = fields.Boolean(string="Export Product",
|
|
help="Check if you want to Export Products "
|
|
"to Pipedrive")
|
|
export_contact = fields.Boolean(string="Export Contact",
|
|
help="Check if you want to Export Contacts"
|
|
" to Pipedrive")
|
|
export_lead = fields.Boolean(string="Export Lead",
|
|
help="Check if you want to Export Leads to"
|
|
" Pipedrive")
|
|
pipedrive_reference = fields.Char(string='Pipedrive Id',
|
|
help="Pipedrive Id of the Company")
|
|
product_webhook = fields.Boolean(string='Product Webhook',
|
|
help='True if update webhook for '
|
|
'products is created')
|
|
contact_webhook = fields.Boolean(string='Contact Webhook',
|
|
help='True if update webhook for '
|
|
'contact is created')
|
|
|
|
def calculate_total_tax_percentage(self, product):
|
|
"""Method for calculating total tax"""
|
|
total_percentage_tax = 0.0
|
|
# Percentage Taxes
|
|
for tax in product.taxes_id.filtered(
|
|
lambda t: t.amount_type == 'percent'):
|
|
total_percentage_tax += tax.amount
|
|
# Group Taxes
|
|
for tax in product.taxes_id.filtered(
|
|
lambda t: t.amount_type == 'group'):
|
|
for child_tax in tax.children_tax_ids.filtered(
|
|
lambda t: t.amount_type == 'percent'):
|
|
total_percentage_tax += child_tax.amount
|
|
# Fixed Taxes
|
|
for tax in product.taxes_id.filtered(
|
|
lambda t: t.amount_type == 'fixed'):
|
|
total_percentage_tax += (tax.amount / product.list_price) * 100
|
|
# Division Taxes
|
|
for tax in product.taxes_id.filtered(
|
|
lambda t: t.amount_type == 'division'):
|
|
total_percentage_tax += (product.list_price / tax.factor) * 100
|
|
return total_percentage_tax
|
|
|
|
def action_execute(self):
|
|
"""For executing Import and Export between Odoo and Pipedrive"""
|
|
if not self.api_key:
|
|
raise ValidationError(_('Please Enter an API Key'))
|
|
if self.import_product:
|
|
self.get_products()
|
|
if self.import_contact:
|
|
self.get_contacts()
|
|
if self.import_lead:
|
|
self.get_leads()
|
|
if self.export_product:
|
|
self.export_products_to_pipedrive()
|
|
if self.export_contact:
|
|
self.export_contacts_to_pipedrive()
|
|
if self.export_lead:
|
|
self.export_leads_to_pipedrive()
|
|
|
|
def get_products(self):
|
|
"""Receives Products from Pipedrive"""
|
|
response = requests.get(url='https://api.pipedrive.com/v1/products',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
if response.json()['data']:
|
|
self.create_product_category()
|
|
for data in response.json()['data']:
|
|
pipedrive_reference = self.env['product.template'].search(
|
|
[]).mapped(
|
|
'pipedrive_reference')
|
|
if str(data['id']) not in pipedrive_reference:
|
|
if not self.product_webhook:
|
|
self.create_webhook(
|
|
'deleted',
|
|
'/delete_pipedrive_product',
|
|
'product')
|
|
self.create_webhook(
|
|
"updated",
|
|
'/update_pipedrive_product',
|
|
'product')
|
|
uom_id = 1
|
|
if data['unit']:
|
|
for rec in self.env['uom.uom'].search([]).mapped(
|
|
'name'):
|
|
if rec.lower() == data['unit'].lower():
|
|
uom_id = self.env['uom.uom'].search(
|
|
[('name', '=', rec)]).id
|
|
if data['prices'][0]['price']:
|
|
currency = self.env['res.currency'].search(
|
|
[('name', '=', data['prices'][0]['currency']),
|
|
('active', 'in', [True, False])])
|
|
if not currency.active:
|
|
currency.active = True
|
|
product = self.env['product.template'].create({
|
|
'name': data['name'],
|
|
'description': data['description'],
|
|
'uom_id': uom_id,
|
|
'uom_po_id': uom_id,
|
|
'list_price': data['prices'][0]['price'],
|
|
'standard_price': data['prices'][0]['cost'],
|
|
'taxes_id': False,
|
|
'pipedrive_reference': data['id'],
|
|
'categ_id': self.env['product.category'].search([(
|
|
'pipedrive_reference', '=', data['category'])]).id
|
|
if data['category'] else 1
|
|
})
|
|
product.taxes_id.unlink()
|
|
if data['tax'] != 0:
|
|
tax = self.env['account.tax'].search(
|
|
[('amount_type', '=', 'percent'),
|
|
('type_tax_use', '=', 'sale'), ('amount',
|
|
'=',
|
|
data['tax'])])
|
|
if not tax:
|
|
tax = self.env['account.tax'].create({
|
|
'name': 'Tax ' + str(data['tax']) + '%',
|
|
'amount_type': 'percent',
|
|
'type_tax_use': 'sale',
|
|
'amount': data['tax']
|
|
})
|
|
product.write({
|
|
"taxes_id": [(4, tax.id)]
|
|
})
|
|
|
|
def create_product_category(self):
|
|
"""Returns product category from category_id"""
|
|
response = requests.get(
|
|
url='https://api.pipedrive.com/v1/productFields',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
for rec in response.json()['data']:
|
|
if rec['key'] == 'category':
|
|
for item in rec['options']:
|
|
category = self.env['product.category'].search(
|
|
[('name', '=', item['label'])])
|
|
if not category:
|
|
self.env['product.category'].create(
|
|
{
|
|
'name': item['label'],
|
|
'pipedrive_reference': item['id']
|
|
}
|
|
)
|
|
else:
|
|
category.write({
|
|
'pipedrive_reference': item['id']
|
|
})
|
|
|
|
def get_contacts(self):
|
|
"""Receives contacts from Pipedrive"""
|
|
response = requests.get(url='https://api.pipedrive.com/v1/persons',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
if response.json()['data']:
|
|
for data in response.json()['data']:
|
|
partner_id = self.env['res.partner'].search([]).mapped(
|
|
'pipedrive_reference')
|
|
if str(data['id']) not in partner_id:
|
|
self.env['res.partner'].create({
|
|
'name': data['name'],
|
|
'phone': data['phone'][0]['value'],
|
|
'email': data['email'][0]['value'],
|
|
'pipedrive_reference': data['id']
|
|
})
|
|
if not self.contact_webhook:
|
|
self.create_webhook(
|
|
'updated', '/update_pipedrive_contact',
|
|
'person')
|
|
self.create_webhook(
|
|
'deleted', '/delete_pipedrive_contact',
|
|
'person')
|
|
|
|
def get_leads(self):
|
|
"""Receives leads from Pipedrive"""
|
|
response = requests.get(url='https://api.pipedrive.com/v1/leads',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
if response.json()['data']:
|
|
for data in response.json()['data']:
|
|
lead_id = self.env['crm.lead'].search([]).mapped(
|
|
'pipedrive_reference')
|
|
expected_revenue = 0
|
|
if data['value']:
|
|
currency = self.env['res.currency'].search(
|
|
[('name', '=', data['value']['currency']),
|
|
('active', 'in', [True, False])])
|
|
if not currency.active:
|
|
currency.active = True
|
|
expected_revenue = currency.compute(
|
|
data['value']['amount'], self.env.company.currency_id)
|
|
if str(data['id']) not in lead_id:
|
|
self.env['crm.lead'].create({
|
|
'name': data['title'],
|
|
'type': 'opportunity',
|
|
'expected_revenue': expected_revenue,
|
|
'pipedrive_reference': data['id']
|
|
})
|
|
|
|
def export_products_to_pipedrive(self):
|
|
"""Export Products from Odoo to Pipedrive"""
|
|
for product in self.env['product.template'].search(
|
|
[('pipedrive_reference', '=', False)]):
|
|
data = {
|
|
'name': product.name,
|
|
'unit': product.uom_id.name,
|
|
'tax': self.calculate_total_tax_percentage(product),
|
|
'prices': [{
|
|
'price': product.list_price,
|
|
'currency': self.env.company.currency_id.name
|
|
}]
|
|
}
|
|
response = requests.post(
|
|
url='https://api.pipedrive.com/v1/products',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, json=data, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
product.write(
|
|
{'pipedrive_reference': response.json()['data']['id']})
|
|
if not self.product_webhook:
|
|
self.create_webhook(
|
|
'updated', '/update_pipedrive_product',
|
|
'product')
|
|
self.create_webhook(
|
|
'deleted', '/delete_pipedrive_product',
|
|
'product')
|
|
|
|
def export_contacts_to_pipedrive(self):
|
|
"""Export Contacts from Odoo to Pipedrive"""
|
|
for partner in self.env['res.partner'].search(
|
|
[('pipedrive_reference', '=', False)]):
|
|
self.create_contact(partner)
|
|
|
|
def create_contact(self, partner):
|
|
"""Create Persons in Pipedrive"""
|
|
data = {
|
|
'name': partner.name,
|
|
'email': partner.email,
|
|
'phone': partner.phone
|
|
}
|
|
response = requests.post(
|
|
url='https://api.pipedrive.com/v1/persons',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, json=data, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
partner.sudo().write(
|
|
{'pipedrive_reference': response.json()['data']['id']})
|
|
if not self.contact_webhook:
|
|
self.create_webhook(
|
|
'updated', '/update_pipedrive_contact',
|
|
'person')
|
|
self.create_webhook(
|
|
'deleted', '/delete_pipedrive_contact',
|
|
'person')
|
|
return response.json()['data']['id']
|
|
|
|
def create_webhook(self, event_action, url, event_object):
|
|
"""Method for creating contact webhook in Pipedrive"""
|
|
payload = json.dumps({
|
|
"subscription_url": self.env['ir.config_parameter'].get_param(
|
|
'web.base.url') + url,
|
|
"event_action": event_action,
|
|
"event_object": event_object
|
|
})
|
|
headers = {
|
|
'Content-Type': 'application/json',
|
|
'Accept': 'application/json'
|
|
}
|
|
params = {
|
|
'api_token': self.api_key,
|
|
}
|
|
response = requests.request("POST",
|
|
"https://api.pipedrive.com/v1/"
|
|
"webhooks",
|
|
headers=headers, data=payload,
|
|
params=params,
|
|
timeout=10)
|
|
if not response.json()['success']:
|
|
if 'error' in response.json().keys():
|
|
raise ValidationError(
|
|
response.json()['error'])
|
|
if 'message' in response.json().keys():
|
|
raise ValidationError(
|
|
response.json()['message'])
|
|
if event_object == 'person':
|
|
self.contact_webhook = True
|
|
elif event_object == 'product':
|
|
self.product_webhook = True
|
|
|
|
def export_leads_to_pipedrive(self):
|
|
"""Export Leads from Odoo to Pipedrive"""
|
|
for lead in self.env['crm.lead'].search(
|
|
[('pipedrive_reference', '=', False),
|
|
('partner_id', '!=', False)]):
|
|
if not lead.partner_id.pipedrive_reference:
|
|
self.create_contact(lead.partner_id)
|
|
data = {
|
|
'title': lead.name,
|
|
'person_id': int(lead.partner_id.pipedrive_reference),
|
|
'value': {
|
|
'amount': lead.expected_revenue,
|
|
'currency': self.env.company.currency_id.name
|
|
}
|
|
}
|
|
response = requests.post(
|
|
url='https://api.pipedrive.com/v1/leads',
|
|
params={
|
|
'api_token': self.api_key,
|
|
}, json=data, timeout=10)
|
|
if not response.json()['success']:
|
|
raise ValidationError(
|
|
response.json()['error'] + '. ' + response.json()[
|
|
'error_info'])
|
|
lead.write(
|
|
{'pipedrive_reference': response.json()['data']['id']})
|
|
|