You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							406 lines
						
					
					
						
							18 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							406 lines
						
					
					
						
							18 KiB
						
					
					
				
								# -*- coding: utf-8 -*-
							 | 
						|
								################################################################################
							 | 
						|
								#
							 | 
						|
								#    Cybrosys Technologies Pvt. Ltd.
							 | 
						|
								#
							 | 
						|
								#    Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>).
							 | 
						|
								#    Author: Subina P (odoo@cybrosys.com)
							 | 
						|
								#
							 | 
						|
								#    You can modify it under the terms of the GNU AFFERO
							 | 
						|
								#    GENERAL PUBLIC LICENSE (AGPL v3), Version 3.
							 | 
						|
								#
							 | 
						|
								#    This program is distributed in the hope that it will be useful,
							 | 
						|
								#    but WITHOUT ANY WARRANTY; without even the implied warranty of
							 | 
						|
								#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
							 | 
						|
								#    GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details.
							 | 
						|
								#
							 | 
						|
								#    You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE
							 | 
						|
								#    (AGPL v3) along with this program.
							 | 
						|
								#    If not, see <http://www.gnu.org/licenses/>.
							 | 
						|
								#
							 | 
						|
								################################################################################
							 | 
						|
								import json
							 | 
						|
								import requests
							 | 
						|
								from odoo import fields, models, _
							 | 
						|
								from odoo.exceptions import ValidationError
							 | 
						|
								
							 | 
						|
								
							 | 
						|
								class ResCompany(models.Model):
							 | 
						|
								    """Inherits Res Company for including Pipedrive credential fields"""
							 | 
						|
								    _inherit = 'res.company'
							 | 
						|
								
							 | 
						|
								    api_key = fields.Char(string='Token',
							 | 
						|
								                          help="It is used to connect with Pipedrive"
							 | 
						|
								                          )
							 | 
						|
								    product_synced = fields.Boolean(string='Product Synced', readonly=True,
							 | 
						|
								                                    help='True if the products are synced between Odoo and Pipedrive')
							 | 
						|
								    contact_synced = fields.Boolean(string='Contact Synced', readonly=True,
							 | 
						|
								                                    help='True if the contacts are synced between Odoo and Pipedrive')
							 | 
						|
								    lead_synced = fields.Boolean(string='Lead Synced', readonly=True,
							 | 
						|
								                                 help='True if the leads are synced between Odoo and Pipedrive')
							 | 
						|
								
							 | 
						|
								    def calculate_total_tax_percentage(self, product):
							 | 
						|
								        """Method for calculating total tax"""
							 | 
						|
								        total_percentage_tax = 0.0
							 | 
						|
								        # Percentage Taxes
							 | 
						|
								        for tax in product.taxes_id.filtered(
							 | 
						|
								                lambda t: t.amount_type == 'percent'):
							 | 
						|
								            total_percentage_tax += tax.amount
							 | 
						|
								        # Group Taxes
							 | 
						|
								        for tax in product.taxes_id.filtered(
							 | 
						|
								                lambda t: t.amount_type == 'group'):
							 | 
						|
								            for child_tax in tax.children_tax_ids.filtered(
							 | 
						|
								                    lambda t: t.amount_type == 'percent'):
							 | 
						|
								                total_percentage_tax += child_tax.amount
							 | 
						|
								        # Fixed Taxes
							 | 
						|
								        for tax in product.taxes_id.filtered(
							 | 
						|
								                lambda t: t.amount_type == 'fixed'):
							 | 
						|
								            total_percentage_tax += (tax.amount / product.list_price) * 100
							 | 
						|
								        # Division Taxes
							 | 
						|
								        for tax in product.taxes_id.filtered(
							 | 
						|
								                lambda t: t.amount_type == 'division'):
							 | 
						|
								            total_percentage_tax += (product.list_price / tax.factor) * 100
							 | 
						|
								        return total_percentage_tax
							 | 
						|
								
							 | 
						|
								    def action_sync_products(self):
							 | 
						|
								        if not self.api_key:
							 | 
						|
								            raise ValidationError(_('Please Enter an API Key'))
							 | 
						|
								        self.get_products()
							 | 
						|
								        self.export_products_to_pipedrive()
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            'deleted',
							 | 
						|
								            '/delete_pipedrive_product',
							 | 
						|
								            'product')
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            "updated",
							 | 
						|
								            '/update_pipedrive_product',
							 | 
						|
								            'product')
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            "added",
							 | 
						|
								            '/add_pipedrive_product',
							 | 
						|
								            'product')
							 | 
						|
								        self.product_synced = True
							 | 
						|
								
							 | 
						|
								    def action_sync_contacts(self):
							 | 
						|
								        if not self.api_key:
							 | 
						|
								            raise ValidationError(_('Please Enter an API Key'))
							 | 
						|
								        self.get_contacts()
							 | 
						|
								        self.export_contacts_to_pipedrive()
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            'updated', '/update_pipedrive_contact',
							 | 
						|
								            'person')
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            'deleted', '/delete_pipedrive_contact',
							 | 
						|
								            'person')
							 | 
						|
								        self.create_webhook(
							 | 
						|
								            'added', '/add_pipedrive_contact',
							 | 
						|
								            'person')
							 | 
						|
								        self.contact_synced = True
							 | 
						|
								
							 | 
						|
								    def action_sync_leads(self):
							 | 
						|
								        if not self.api_key:
							 | 
						|
								            raise ValidationError(_('Please Enter an API Key'))
							 | 
						|
								        self.get_leads()
							 | 
						|
								        self.export_leads_to_pipedrive()
							 | 
						|
								        self.lead_synced = True
							 | 
						|
								
							 | 
						|
								    def get_products(self):
							 | 
						|
								        """Receives Products from Pipedrive"""
							 | 
						|
								        response = requests.get(url='https://api.pipedrive.com/v1/products',
							 | 
						|
								                                params={
							 | 
						|
								                                    'api_token': self.api_key,
							 | 
						|
								                                }, timeout=10)
							 | 
						|
								        if not response.json()['success']:
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                    'error_info'])
							 | 
						|
								        if response.json()['data']:
							 | 
						|
								            self.create_product_category()
							 | 
						|
								            for data in response.json()['data']:
							 | 
						|
								                if not self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                        [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'product')]):
							 | 
						|
								                    uom_id = 1
							 | 
						|
								                    if data['unit']:
							 | 
						|
								                        for rec in self.env['uom.uom'].sudo().search([]).mapped(
							 | 
						|
								                                'name'):
							 | 
						|
								                            if rec.lower() == data['unit'].lower():
							 | 
						|
								                                uom_id = self.env['uom.uom'].sudo().search(
							 | 
						|
								                                    [('name', '=', rec)]).id
							 | 
						|
								                    if data['prices'][0]['price']:
							 | 
						|
								                        currency = self.env['res.currency'].sudo().search(
							 | 
						|
								                            [('name', '=', data['prices'][0]['currency']),
							 | 
						|
								                             ('active', 'in', [True, False])])
							 | 
						|
								                        if not currency.active:
							 | 
						|
								                            currency.active = True
							 | 
						|
								                    product = self.env['product.template'].sudo().create({
							 | 
						|
								                        'name': data['name'],
							 | 
						|
								                        'description': data['description'],
							 | 
						|
								                        'uom_id': uom_id,
							 | 
						|
								                        'uom_po_id': uom_id,
							 | 
						|
								                        'list_price': data['prices'][0]['price'],
							 | 
						|
								                        'standard_price': data['prices'][0]['cost'],
							 | 
						|
								                        'taxes_id': False,
							 | 
						|
								                        'categ_id': self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                            [(
							 | 
						|
								                                'pipedrive_reference', '=',
							 | 
						|
								                                data['category'])])[0].odoo_ref
							 | 
						|
								                        if self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                            [(
							 | 
						|
								                                'pipedrive_reference', '=',
							 | 
						|
								                                data['category'])]) else 1
							 | 
						|
								                    })
							 | 
						|
								                    if product:
							 | 
						|
								                        self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                            'pipedrive_reference': data['id'],
							 | 
						|
								                            'record_type': 'product',
							 | 
						|
								                            'odoo_ref': product.id
							 | 
						|
								                        })
							 | 
						|
								                        product.taxes_id.unlink()
							 | 
						|
								                        if data['tax'] != 0:
							 | 
						|
								                            tax = self.env['account.tax'].sudo().search(
							 | 
						|
								                                [('amount_type', '=', 'percent'),
							 | 
						|
								                                 ('type_tax_use', '=', 'sale'), ('amount',
							 | 
						|
								                                                                 '=',
							 | 
						|
								                                                                 data['tax'])])
							 | 
						|
								                            if not tax:
							 | 
						|
								                                tax = self.env['account.tax'].sudo().create({
							 | 
						|
								                                    'name': 'Tax ' + str(data['tax']) + '%',
							 | 
						|
								                                    'amount_type': 'percent',
							 | 
						|
								                                    'type_tax_use': 'sale',
							 | 
						|
								                                    'amount': data['tax']
							 | 
						|
								                                })
							 | 
						|
								                            product.sudo().write({
							 | 
						|
								                                "taxes_id": [(4, tax.id)]
							 | 
						|
								                            })
							 | 
						|
								
							 | 
						|
								    def create_product_category(self):
							 | 
						|
								        """Returns product category from category_id"""
							 | 
						|
								        response = requests.get(
							 | 
						|
								            url='https://api.pipedrive.com/v1/productFields',
							 | 
						|
								            params={
							 | 
						|
								                'api_token': self.api_key,
							 | 
						|
								            }, timeout=10)
							 | 
						|
								        if not response.json()['success']:
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                    'error_info'])
							 | 
						|
								        for rec in response.json()['data']:
							 | 
						|
								            if rec['key'] == 'category':
							 | 
						|
								                for item in rec['options']:
							 | 
						|
								                    category = self.env['product.category'].sudo().search(
							 | 
						|
								                        [('name', '=', item['label'])])
							 | 
						|
								                    if not category:
							 | 
						|
								                        category = self.env['product.category'].sudo().create(
							 | 
						|
								                            {
							 | 
						|
								                                'name': item['label']
							 | 
						|
								                            }
							 | 
						|
								                        )
							 | 
						|
								                        self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                            'pipedrive_reference': item['id'],
							 | 
						|
								                            'record_type': 'categ',
							 | 
						|
								                            'odoo_ref': category[0].id
							 | 
						|
								                        })
							 | 
						|
								                    elif not self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                            [('record_type', '=', 'categ'), ('odoo_ref', '=', category[0].id)]):
							 | 
						|
								                        self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                            'pipedrive_reference': item['id'],
							 | 
						|
								                            'record_type': 'categ',
							 | 
						|
								                            'odoo_ref': category[0].id
							 | 
						|
								                        })
							 | 
						|
								
							 | 
						|
								    def get_contacts(self):
							 | 
						|
								        """Receives contacts from Pipedrive"""
							 | 
						|
								        response = requests.get(url='https://api.pipedrive.com/v1/persons',
							 | 
						|
								                                params={
							 | 
						|
								                                    'api_token': self.api_key,
							 | 
						|
								                                }, timeout=10)
							 | 
						|
								        if not response.json()['success']:
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                    'error_info'])
							 | 
						|
								        if response.json()['data']:
							 | 
						|
								            for data in response.json()['data']:
							 | 
						|
								                if not self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                        [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'partner')]):
							 | 
						|
								                    partner = self.env['res.partner'].sudo().create({
							 | 
						|
								                        'name': data['name'],
							 | 
						|
								                        'phone': data['phone'][0]['value'],
							 | 
						|
								                        'email': data['email'][0]['value']
							 | 
						|
								                    })
							 | 
						|
								                    if partner:
							 | 
						|
								                        self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                            'pipedrive_reference': data['id'],
							 | 
						|
								                            'record_type': 'partner',
							 | 
						|
								                            'odoo_ref': partner.id
							 | 
						|
								                        })
							 | 
						|
								
							 | 
						|
								    def get_leads(self):
							 | 
						|
								        """Receives leads from Pipedrive"""
							 | 
						|
								        response = requests.get(url='https://api.pipedrive.com/v1/leads',
							 | 
						|
								                                params={
							 | 
						|
								                                    'api_token': self.api_key,
							 | 
						|
								                                }, timeout=10)
							 | 
						|
								        if not response.json()['success']:
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                    'error_info'])
							 | 
						|
								        if response.json()['data']:
							 | 
						|
								            for data in response.json()['data']:
							 | 
						|
								                expected_revenue = 0
							 | 
						|
								                if data['value']:
							 | 
						|
								                    currency = self.env['res.currency'].sudo().search(
							 | 
						|
								                        [('name', '=', data['value']['currency']),
							 | 
						|
								                         ('active', 'in', [True, False])])
							 | 
						|
								                    if not currency.active:
							 | 
						|
								                        currency.active = True
							 | 
						|
								                    expected_revenue = currency.compute(
							 | 
						|
								                        data['value']['amount'], self.env.company.currency_id)
							 | 
						|
								                if not self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                        [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'lead')]):
							 | 
						|
								                    lead = self.env['crm.lead'].sudo().create({
							 | 
						|
								                        'name': data['title'],
							 | 
						|
								                        'type': 'opportunity',
							 | 
						|
								                        'expected_revenue': expected_revenue
							 | 
						|
								                    })
							 | 
						|
								                    self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                        'pipedrive_reference': data['id'],
							 | 
						|
								                        'record_type': 'lead',
							 | 
						|
								                        'odoo_ref': lead.id
							 | 
						|
								                    })
							 | 
						|
								
							 | 
						|
								    def export_products_to_pipedrive(self):
							 | 
						|
								        """Export Products from Odoo to Pipedrive"""
							 | 
						|
								        pipedrive_products = self.env['pipedrive.record'].sudo().search(
							 | 
						|
								            [('record_type', '=', 'product')]).mapped('odoo_ref')
							 | 
						|
								        for product in self.env['product.template'].sudo().search(
							 | 
						|
								                [('id', 'not in', pipedrive_products)]):
							 | 
						|
								            data = {
							 | 
						|
								                'name': product.name,
							 | 
						|
								                'unit': product.uom_id.name,
							 | 
						|
								                'tax': self.calculate_total_tax_percentage(product),
							 | 
						|
								                'prices': [{
							 | 
						|
								                    'price': product.list_price,
							 | 
						|
								                    'currency': self.env.company.currency_id.name
							 | 
						|
								                }]
							 | 
						|
								            }
							 | 
						|
								            response = requests.post(
							 | 
						|
								                url='https://api.pipedrive.com/v1/products',
							 | 
						|
								                params={
							 | 
						|
								                    'api_token': self.api_key,
							 | 
						|
								                }, json=data, timeout=10)
							 | 
						|
								            if not response.json()['success']:
							 | 
						|
								                raise ValidationError(
							 | 
						|
								                    response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                        'error_info'])
							 | 
						|
								            self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                'pipedrive_reference': response.json()['data']['id'],
							 | 
						|
								                'record_type': 'product',
							 | 
						|
								                'odoo_ref': product.id
							 | 
						|
								            })
							 | 
						|
								
							 | 
						|
								    def export_contacts_to_pipedrive(self):
							 | 
						|
								        """Export Contacts from Odoo to Pipedrive"""
							 | 
						|
								        pipedrive_partner = self.env['pipedrive.record'].sudo().search(
							 | 
						|
								            [('record_type', '=', 'partner')]).mapped('odoo_ref')
							 | 
						|
								        for partner in self.env['res.partner'].sudo().search(
							 | 
						|
								                [('id', 'not in', pipedrive_partner)]):
							 | 
						|
								            self.create_contact(partner)
							 | 
						|
								
							 | 
						|
								    def create_contact(self, partner):
							 | 
						|
								        """Create Persons in Pipedrive"""
							 | 
						|
								        data = {
							 | 
						|
								            'name': partner.name,
							 | 
						|
								            'email': partner.email,
							 | 
						|
								            'phone': partner.phone
							 | 
						|
								        }
							 | 
						|
								        response = requests.post(
							 | 
						|
								            url='https://api.pipedrive.com/v1/persons',
							 | 
						|
								            params={
							 | 
						|
								                'api_token': self.api_key,
							 | 
						|
								            }, json=data, timeout=10)
							 | 
						|
								        if 'success' in response.json(
							 | 
						|
								        ).keys() and not response.json()['success'] and 'error' in \
							 | 
						|
								                response.json(
							 | 
						|
								                ).keys():
							 | 
						|
								            raise ValidationError(
							 | 
						|
								                response.json()['error'])
							 | 
						|
								        self.env['pipedrive.record'].sudo().create({
							 | 
						|
								            'pipedrive_reference': response.json()['data']['id'],
							 | 
						|
								            'record_type': 'partner',
							 | 
						|
								            'odoo_ref': partner.id
							 | 
						|
								        })
							 | 
						|
								
							 | 
						|
								    def create_webhook(self, event_action, url, event_object):
							 | 
						|
								        """Method for creating contact webhook in Pipedrive"""
							 | 
						|
								        payload = json.dumps({
							 | 
						|
								            "subscription_url": self.env['ir.config_parameter'].get_param(
							 | 
						|
								                'web.base.url') + url,
							 | 
						|
								            "event_action": event_action,
							 | 
						|
								            "event_object": event_object
							 | 
						|
								        })
							 | 
						|
								        headers = {
							 | 
						|
								            'Content-Type': 'application/json',
							 | 
						|
								            'Accept': 'application/json'
							 | 
						|
								        }
							 | 
						|
								        params = {
							 | 
						|
								            'api_token': self.api_key,
							 | 
						|
								        }
							 | 
						|
								        existing_webhooks_response = requests.get("https://api.pipedrive.com/v1/webhooks", params=params)
							 | 
						|
								        webhook_found = False
							 | 
						|
								        if existing_webhooks_response.status_code == 200:
							 | 
						|
								            existing_webhooks = existing_webhooks_response.json().get('data', [])
							 | 
						|
								            for webhook in existing_webhooks:
							 | 
						|
								                if (webhook.get('event_action') == event_action and
							 | 
						|
								                        webhook.get('event_object') == event_object and
							 | 
						|
								                        webhook.get(
							 | 
						|
								                            'subscription_url') == self.env['ir.config_parameter'].get_param(
							 | 
						|
								                            'web.base.url') + url):
							 | 
						|
								                    webhook_found = True
							 | 
						|
								        if not webhook_found:
							 | 
						|
								            response = requests.request("POST",
							 | 
						|
								                                        "https://api.pipedrive.com/v1/"
							 | 
						|
								                                        "webhooks",
							 | 
						|
								                                        headers=headers, data=payload,
							 | 
						|
								                                        params=params,
							 | 
						|
								                                        timeout=10)
							 | 
						|
								            if not response.json()['success']:
							 | 
						|
								                if 'error' in response.json().keys():
							 | 
						|
								                    raise ValidationError(
							 | 
						|
								                        response.json()['error'])
							 | 
						|
								                if 'message' in response.json().keys():
							 | 
						|
								                    raise ValidationError(
							 | 
						|
								                        response.json()['message'])
							 | 
						|
								
							 | 
						|
								    def export_leads_to_pipedrive(self):
							 | 
						|
								        """Export Leads from Odoo to Pipedrive"""
							 | 
						|
								        pipedrive_lead = self.env['pipedrive.record'].sudo().search(
							 | 
						|
								            [('record_type', '=', 'lead')]).mapped('odoo_ref')
							 | 
						|
								        for lead in self.env['crm.lead'].sudo().search(
							 | 
						|
								                [('id', 'not in', pipedrive_lead)]):
							 | 
						|
								            if lead.partner_id:
							 | 
						|
								                if not self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                        [('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]):
							 | 
						|
								                    self.create_contact(lead.partner_id)
							 | 
						|
								                data = {
							 | 
						|
								                    'title': lead.name,
							 | 
						|
								                    'person_id': int(self.env['pipedrive.record'].sudo().search(
							 | 
						|
								                        [('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]).pipedrive_reference),
							 | 
						|
								                    'value': {
							 | 
						|
								                        'amount': lead.expected_revenue,
							 | 
						|
								                        'currency': self.env.company.currency_id.name
							 | 
						|
								                    }
							 | 
						|
								                }
							 | 
						|
								                response = requests.post(
							 | 
						|
								                    url='https://api.pipedrive.com/v1/leads',
							 | 
						|
								                    params={
							 | 
						|
								                        'api_token': self.api_key,
							 | 
						|
								                    }, json=data, timeout=10)
							 | 
						|
								                if not response.json()['success']:
							 | 
						|
								                    raise ValidationError(
							 | 
						|
								                        response.json()['error'] + '. ' + response.json()[
							 | 
						|
								                            'error_info'])
							 | 
						|
								                self.env['pipedrive.record'].sudo().create({
							 | 
						|
								                    'pipedrive_reference': response.json()['data']['id'],
							 | 
						|
								                    'record_type': 'lead',
							 | 
						|
								                    'odoo_ref': lead.id
							 | 
						|
								                })
							 | 
						|
								
							 |