You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

406 lines
18 KiB

# -*- coding: utf-8 -*-
################################################################################
#
# Cybrosys Technologies Pvt. Ltd.
#
# Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>).
# Author: Subina P (odoo@cybrosys.com)
#
# You can modify it under the terms of the GNU AFFERO
# GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
#
# You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
# (AGPL v3) along with this program.
# If not, see <http://www.gnu.org/licenses/>.
#
################################################################################
import json
import requests
from odoo import fields, models, _
from odoo.exceptions import ValidationError
class ResCompany(models.Model):
"""Inherits Res Company for including Pipedrive credential fields"""
_inherit = 'res.company'
api_key = fields.Char(string='Token',
help="It is used to connect with Pipedrive"
)
product_synced = fields.Boolean(string='Product Synced', readonly=True,
help='True if the products are synced between Odoo and Pipedrive')
contact_synced = fields.Boolean(string='Contact Synced', readonly=True,
help='True if the contacts are synced between Odoo and Pipedrive')
lead_synced = fields.Boolean(string='Lead Synced', readonly=True,
help='True if the leads are synced between Odoo and Pipedrive')
def calculate_total_tax_percentage(self, product):
"""Method for calculating total tax"""
total_percentage_tax = 0.0
# Percentage Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'percent'):
total_percentage_tax += tax.amount
# Group Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'group'):
for child_tax in tax.children_tax_ids.filtered(
lambda t: t.amount_type == 'percent'):
total_percentage_tax += child_tax.amount
# Fixed Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'fixed'):
total_percentage_tax += (tax.amount / product.list_price) * 100
# Division Taxes
for tax in product.taxes_id.filtered(
lambda t: t.amount_type == 'division'):
total_percentage_tax += (product.list_price / tax.factor) * 100
return total_percentage_tax
def action_sync_products(self):
if not self.api_key:
raise ValidationError(_('Please Enter an API Key'))
self.get_products()
self.export_products_to_pipedrive()
self.create_webhook(
'deleted',
'/delete_pipedrive_product',
'product')
self.create_webhook(
"updated",
'/update_pipedrive_product',
'product')
self.create_webhook(
"added",
'/add_pipedrive_product',
'product')
self.product_synced = True
def action_sync_contacts(self):
if not self.api_key:
raise ValidationError(_('Please Enter an API Key'))
self.get_contacts()
self.export_contacts_to_pipedrive()
self.create_webhook(
'updated', '/update_pipedrive_contact',
'person')
self.create_webhook(
'deleted', '/delete_pipedrive_contact',
'person')
self.create_webhook(
'added', '/add_pipedrive_contact',
'person')
self.contact_synced = True
def action_sync_leads(self):
if not self.api_key:
raise ValidationError(_('Please Enter an API Key'))
self.get_leads()
self.export_leads_to_pipedrive()
self.lead_synced = True
def get_products(self):
"""Receives Products from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/products',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
self.create_product_category()
for data in response.json()['data']:
if not self.env['pipedrive.record'].sudo().search(
[('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'product')]):
uom_id = 1
if data['unit']:
for rec in self.env['uom.uom'].sudo().search([]).mapped(
'name'):
if rec.lower() == data['unit'].lower():
uom_id = self.env['uom.uom'].sudo().search(
[('name', '=', rec)]).id
if data['prices'][0]['price']:
currency = self.env['res.currency'].sudo().search(
[('name', '=', data['prices'][0]['currency']),
('active', 'in', [True, False])])
if not currency.active:
currency.active = True
product = self.env['product.template'].sudo().create({
'name': data['name'],
'description': data['description'],
'uom_id': uom_id,
'uom_po_id': uom_id,
'list_price': data['prices'][0]['price'],
'standard_price': data['prices'][0]['cost'],
'taxes_id': False,
'categ_id': self.env['pipedrive.record'].sudo().search(
[(
'pipedrive_reference', '=',
data['category'])])[0].odoo_ref
if self.env['pipedrive.record'].sudo().search(
[(
'pipedrive_reference', '=',
data['category'])]) else 1
})
if product:
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': data['id'],
'record_type': 'product',
'odoo_ref': product.id
})
product.taxes_id.unlink()
if data['tax'] != 0:
tax = self.env['account.tax'].sudo().search(
[('amount_type', '=', 'percent'),
('type_tax_use', '=', 'sale'), ('amount',
'=',
data['tax'])])
if not tax:
tax = self.env['account.tax'].sudo().create({
'name': 'Tax ' + str(data['tax']) + '%',
'amount_type': 'percent',
'type_tax_use': 'sale',
'amount': data['tax']
})
product.sudo().write({
"taxes_id": [(4, tax.id)]
})
def create_product_category(self):
"""Returns product category from category_id"""
response = requests.get(
url='https://api.pipedrive.com/v1/productFields',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
for rec in response.json()['data']:
if rec['key'] == 'category':
for item in rec['options']:
category = self.env['product.category'].sudo().search(
[('name', '=', item['label'])])
if not category:
category = self.env['product.category'].sudo().create(
{
'name': item['label']
}
)
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': item['id'],
'record_type': 'categ',
'odoo_ref': category[0].id
})
elif not self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'categ'), ('odoo_ref', '=', category[0].id)]):
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': item['id'],
'record_type': 'categ',
'odoo_ref': category[0].id
})
def get_contacts(self):
"""Receives contacts from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/persons',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
for data in response.json()['data']:
if not self.env['pipedrive.record'].sudo().search(
[('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'partner')]):
partner = self.env['res.partner'].sudo().create({
'name': data['name'],
'phone': data['phone'][0]['value'],
'email': data['email'][0]['value']
})
if partner:
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': data['id'],
'record_type': 'partner',
'odoo_ref': partner.id
})
def get_leads(self):
"""Receives leads from Pipedrive"""
response = requests.get(url='https://api.pipedrive.com/v1/leads',
params={
'api_token': self.api_key,
}, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
if response.json()['data']:
for data in response.json()['data']:
expected_revenue = 0
if data['value']:
currency = self.env['res.currency'].sudo().search(
[('name', '=', data['value']['currency']),
('active', 'in', [True, False])])
if not currency.active:
currency.active = True
expected_revenue = currency.compute(
data['value']['amount'], self.env.company.currency_id)
if not self.env['pipedrive.record'].sudo().search(
[('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'lead')]):
lead = self.env['crm.lead'].sudo().create({
'name': data['title'],
'type': 'opportunity',
'expected_revenue': expected_revenue
})
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': data['id'],
'record_type': 'lead',
'odoo_ref': lead.id
})
def export_products_to_pipedrive(self):
"""Export Products from Odoo to Pipedrive"""
pipedrive_products = self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'product')]).mapped('odoo_ref')
for product in self.env['product.template'].sudo().search(
[('id', 'not in', pipedrive_products)]):
data = {
'name': product.name,
'unit': product.uom_id.name,
'tax': self.calculate_total_tax_percentage(product),
'prices': [{
'price': product.list_price,
'currency': self.env.company.currency_id.name
}]
}
response = requests.post(
url='https://api.pipedrive.com/v1/products',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': response.json()['data']['id'],
'record_type': 'product',
'odoo_ref': product.id
})
def export_contacts_to_pipedrive(self):
"""Export Contacts from Odoo to Pipedrive"""
pipedrive_partner = self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'partner')]).mapped('odoo_ref')
for partner in self.env['res.partner'].sudo().search(
[('id', 'not in', pipedrive_partner)]):
self.create_contact(partner)
def create_contact(self, partner):
"""Create Persons in Pipedrive"""
data = {
'name': partner.name,
'email': partner.email,
'phone': partner.phone
}
response = requests.post(
url='https://api.pipedrive.com/v1/persons',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if 'success' in response.json(
).keys() and not response.json()['success'] and 'error' in \
response.json(
).keys():
raise ValidationError(
response.json()['error'])
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': response.json()['data']['id'],
'record_type': 'partner',
'odoo_ref': partner.id
})
def create_webhook(self, event_action, url, event_object):
"""Method for creating contact webhook in Pipedrive"""
payload = json.dumps({
"subscription_url": self.env['ir.config_parameter'].get_param(
'web.base.url') + url,
"event_action": event_action,
"event_object": event_object
})
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'
}
params = {
'api_token': self.api_key,
}
existing_webhooks_response = requests.get("https://api.pipedrive.com/v1/webhooks", params=params)
webhook_found = False
if existing_webhooks_response.status_code == 200:
existing_webhooks = existing_webhooks_response.json().get('data', [])
for webhook in existing_webhooks:
if (webhook.get('event_action') == event_action and
webhook.get('event_object') == event_object and
webhook.get(
'subscription_url') == self.env['ir.config_parameter'].get_param(
'web.base.url') + url):
webhook_found = True
if not webhook_found:
response = requests.request("POST",
"https://api.pipedrive.com/v1/"
"webhooks",
headers=headers, data=payload,
params=params,
timeout=10)
if not response.json()['success']:
if 'error' in response.json().keys():
raise ValidationError(
response.json()['error'])
if 'message' in response.json().keys():
raise ValidationError(
response.json()['message'])
def export_leads_to_pipedrive(self):
"""Export Leads from Odoo to Pipedrive"""
pipedrive_lead = self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'lead')]).mapped('odoo_ref')
for lead in self.env['crm.lead'].sudo().search(
[('id', 'not in', pipedrive_lead)]):
if lead.partner_id:
if not self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]):
self.create_contact(lead.partner_id)
data = {
'title': lead.name,
'person_id': int(self.env['pipedrive.record'].sudo().search(
[('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]).pipedrive_reference),
'value': {
'amount': lead.expected_revenue,
'currency': self.env.company.currency_id.name
}
}
response = requests.post(
url='https://api.pipedrive.com/v1/leads',
params={
'api_token': self.api_key,
}, json=data, timeout=10)
if not response.json()['success']:
raise ValidationError(
response.json()['error'] + '. ' + response.json()[
'error_info'])
self.env['pipedrive.record'].sudo().create({
'pipedrive_reference': response.json()['data']['id'],
'record_type': 'lead',
'odoo_ref': lead.id
})