You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							406 lines
						
					
					
						
							18 KiB
						
					
					
				
			
		
		
		
			
			
			
		
		
	
	
							406 lines
						
					
					
						
							18 KiB
						
					
					
				| # -*- coding: utf-8 -*- | |
| ################################################################################ | |
| # | |
| #    Cybrosys Technologies Pvt. Ltd. | |
| # | |
| #    Copyright (C) 2024-TODAY Cybrosys Technologies(<https://www.cybrosys.com>). | |
| #    Author: Subina P (odoo@cybrosys.com) | |
| # | |
| #    You can modify it under the terms of the GNU AFFERO | |
| #    GENERAL PUBLIC LICENSE (AGPL v3), Version 3. | |
| # | |
| #    This program is distributed in the hope that it will be useful, | |
| #    but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the | |
| #    GNU AFFERO GENERAL PUBLIC LICENSE (AGPL v3) for more details. | |
| # | |
| #    You should have received a copy of the GNU AFFERO GENERAL PUBLIC LICENSE | |
| #    (AGPL v3) along with this program. | |
| #    If not, see <http://www.gnu.org/licenses/>. | |
| # | |
| ################################################################################ | |
| import json | |
| import requests | |
| from odoo import fields, models, _ | |
| from odoo.exceptions import ValidationError | |
| 
 | |
| 
 | |
| class ResCompany(models.Model): | |
|     """Inherits Res Company for including Pipedrive credential fields""" | |
|     _inherit = 'res.company' | |
| 
 | |
|     api_key = fields.Char(string='Token', | |
|                           help="It is used to connect with Pipedrive" | |
|                           ) | |
|     product_synced = fields.Boolean(string='Product Synced', readonly=True, | |
|                                     help='True if the products are synced between Odoo and Pipedrive') | |
|     contact_synced = fields.Boolean(string='Contact Synced', readonly=True, | |
|                                     help='True if the contacts are synced between Odoo and Pipedrive') | |
|     lead_synced = fields.Boolean(string='Lead Synced', readonly=True, | |
|                                  help='True if the leads are synced between Odoo and Pipedrive') | |
| 
 | |
|     def calculate_total_tax_percentage(self, product): | |
|         """Method for calculating total tax""" | |
|         total_percentage_tax = 0.0 | |
|         # Percentage Taxes | |
|         for tax in product.taxes_id.filtered( | |
|                 lambda t: t.amount_type == 'percent'): | |
|             total_percentage_tax += tax.amount | |
|         # Group Taxes | |
|         for tax in product.taxes_id.filtered( | |
|                 lambda t: t.amount_type == 'group'): | |
|             for child_tax in tax.children_tax_ids.filtered( | |
|                     lambda t: t.amount_type == 'percent'): | |
|                 total_percentage_tax += child_tax.amount | |
|         # Fixed Taxes | |
|         for tax in product.taxes_id.filtered( | |
|                 lambda t: t.amount_type == 'fixed'): | |
|             total_percentage_tax += (tax.amount / product.list_price) * 100 | |
|         # Division Taxes | |
|         for tax in product.taxes_id.filtered( | |
|                 lambda t: t.amount_type == 'division'): | |
|             total_percentage_tax += (product.list_price / tax.factor) * 100 | |
|         return total_percentage_tax | |
| 
 | |
|     def action_sync_products(self): | |
|         if not self.api_key: | |
|             raise ValidationError(_('Please Enter an API Key')) | |
|         self.get_products() | |
|         self.export_products_to_pipedrive() | |
|         self.create_webhook( | |
|             'deleted', | |
|             '/delete_pipedrive_product', | |
|             'product') | |
|         self.create_webhook( | |
|             "updated", | |
|             '/update_pipedrive_product', | |
|             'product') | |
|         self.create_webhook( | |
|             "added", | |
|             '/add_pipedrive_product', | |
|             'product') | |
|         self.product_synced = True | |
| 
 | |
|     def action_sync_contacts(self): | |
|         if not self.api_key: | |
|             raise ValidationError(_('Please Enter an API Key')) | |
|         self.get_contacts() | |
|         self.export_contacts_to_pipedrive() | |
|         self.create_webhook( | |
|             'updated', '/update_pipedrive_contact', | |
|             'person') | |
|         self.create_webhook( | |
|             'deleted', '/delete_pipedrive_contact', | |
|             'person') | |
|         self.create_webhook( | |
|             'added', '/add_pipedrive_contact', | |
|             'person') | |
|         self.contact_synced = True | |
| 
 | |
|     def action_sync_leads(self): | |
|         if not self.api_key: | |
|             raise ValidationError(_('Please Enter an API Key')) | |
|         self.get_leads() | |
|         self.export_leads_to_pipedrive() | |
|         self.lead_synced = True | |
| 
 | |
|     def get_products(self): | |
|         """Receives Products from Pipedrive""" | |
|         response = requests.get(url='https://api.pipedrive.com/v1/products', | |
|                                 params={ | |
|                                     'api_token': self.api_key, | |
|                                 }, timeout=10) | |
|         if not response.json()['success']: | |
|             raise ValidationError( | |
|                 response.json()['error'] + '. ' + response.json()[ | |
|                     'error_info']) | |
|         if response.json()['data']: | |
|             self.create_product_category() | |
|             for data in response.json()['data']: | |
|                 if not self.env['pipedrive.record'].sudo().search( | |
|                         [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'product')]): | |
|                     uom_id = 1 | |
|                     if data['unit']: | |
|                         for rec in self.env['uom.uom'].sudo().search([]).mapped( | |
|                                 'name'): | |
|                             if rec.lower() == data['unit'].lower(): | |
|                                 uom_id = self.env['uom.uom'].sudo().search( | |
|                                     [('name', '=', rec)]).id | |
|                     if data['prices'][0]['price']: | |
|                         currency = self.env['res.currency'].sudo().search( | |
|                             [('name', '=', data['prices'][0]['currency']), | |
|                              ('active', 'in', [True, False])]) | |
|                         if not currency.active: | |
|                             currency.active = True | |
|                     product = self.env['product.template'].sudo().create({ | |
|                         'name': data['name'], | |
|                         'description': data['description'], | |
|                         'uom_id': uom_id, | |
|                         'uom_po_id': uom_id, | |
|                         'list_price': data['prices'][0]['price'], | |
|                         'standard_price': data['prices'][0]['cost'], | |
|                         'taxes_id': False, | |
|                         'categ_id': self.env['pipedrive.record'].sudo().search( | |
|                             [( | |
|                                 'pipedrive_reference', '=', | |
|                                 data['category'])])[0].odoo_ref | |
|                         if self.env['pipedrive.record'].sudo().search( | |
|                             [( | |
|                                 'pipedrive_reference', '=', | |
|                                 data['category'])]) else 1 | |
|                     }) | |
|                     if product: | |
|                         self.env['pipedrive.record'].sudo().create({ | |
|                             'pipedrive_reference': data['id'], | |
|                             'record_type': 'product', | |
|                             'odoo_ref': product.id | |
|                         }) | |
|                         product.taxes_id.unlink() | |
|                         if data['tax'] != 0: | |
|                             tax = self.env['account.tax'].sudo().search( | |
|                                 [('amount_type', '=', 'percent'), | |
|                                  ('type_tax_use', '=', 'sale'), ('amount', | |
|                                                                  '=', | |
|                                                                  data['tax'])]) | |
|                             if not tax: | |
|                                 tax = self.env['account.tax'].sudo().create({ | |
|                                     'name': 'Tax ' + str(data['tax']) + '%', | |
|                                     'amount_type': 'percent', | |
|                                     'type_tax_use': 'sale', | |
|                                     'amount': data['tax'] | |
|                                 }) | |
|                             product.sudo().write({ | |
|                                 "taxes_id": [(4, tax.id)] | |
|                             }) | |
| 
 | |
|     def create_product_category(self): | |
|         """Returns product category from category_id""" | |
|         response = requests.get( | |
|             url='https://api.pipedrive.com/v1/productFields', | |
|             params={ | |
|                 'api_token': self.api_key, | |
|             }, timeout=10) | |
|         if not response.json()['success']: | |
|             raise ValidationError( | |
|                 response.json()['error'] + '. ' + response.json()[ | |
|                     'error_info']) | |
|         for rec in response.json()['data']: | |
|             if rec['key'] == 'category': | |
|                 for item in rec['options']: | |
|                     category = self.env['product.category'].sudo().search( | |
|                         [('name', '=', item['label'])]) | |
|                     if not category: | |
|                         category = self.env['product.category'].sudo().create( | |
|                             { | |
|                                 'name': item['label'] | |
|                             } | |
|                         ) | |
|                         self.env['pipedrive.record'].sudo().create({ | |
|                             'pipedrive_reference': item['id'], | |
|                             'record_type': 'categ', | |
|                             'odoo_ref': category[0].id | |
|                         }) | |
|                     elif not self.env['pipedrive.record'].sudo().search( | |
|                             [('record_type', '=', 'categ'), ('odoo_ref', '=', category[0].id)]): | |
|                         self.env['pipedrive.record'].sudo().create({ | |
|                             'pipedrive_reference': item['id'], | |
|                             'record_type': 'categ', | |
|                             'odoo_ref': category[0].id | |
|                         }) | |
| 
 | |
|     def get_contacts(self): | |
|         """Receives contacts from Pipedrive""" | |
|         response = requests.get(url='https://api.pipedrive.com/v1/persons', | |
|                                 params={ | |
|                                     'api_token': self.api_key, | |
|                                 }, timeout=10) | |
|         if not response.json()['success']: | |
|             raise ValidationError( | |
|                 response.json()['error'] + '. ' + response.json()[ | |
|                     'error_info']) | |
|         if response.json()['data']: | |
|             for data in response.json()['data']: | |
|                 if not self.env['pipedrive.record'].sudo().search( | |
|                         [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'partner')]): | |
|                     partner = self.env['res.partner'].sudo().create({ | |
|                         'name': data['name'], | |
|                         'phone': data['phone'][0]['value'], | |
|                         'email': data['email'][0]['value'] | |
|                     }) | |
|                     if partner: | |
|                         self.env['pipedrive.record'].sudo().create({ | |
|                             'pipedrive_reference': data['id'], | |
|                             'record_type': 'partner', | |
|                             'odoo_ref': partner.id | |
|                         }) | |
| 
 | |
|     def get_leads(self): | |
|         """Receives leads from Pipedrive""" | |
|         response = requests.get(url='https://api.pipedrive.com/v1/leads', | |
|                                 params={ | |
|                                     'api_token': self.api_key, | |
|                                 }, timeout=10) | |
|         if not response.json()['success']: | |
|             raise ValidationError( | |
|                 response.json()['error'] + '. ' + response.json()[ | |
|                     'error_info']) | |
|         if response.json()['data']: | |
|             for data in response.json()['data']: | |
|                 expected_revenue = 0 | |
|                 if data['value']: | |
|                     currency = self.env['res.currency'].sudo().search( | |
|                         [('name', '=', data['value']['currency']), | |
|                          ('active', 'in', [True, False])]) | |
|                     if not currency.active: | |
|                         currency.active = True | |
|                     expected_revenue = currency.compute( | |
|                         data['value']['amount'], self.env.company.currency_id) | |
|                 if not self.env['pipedrive.record'].sudo().search( | |
|                         [('pipedrive_reference', '=', str(data['id'])), ('record_type', '=', 'lead')]): | |
|                     lead = self.env['crm.lead'].sudo().create({ | |
|                         'name': data['title'], | |
|                         'type': 'opportunity', | |
|                         'expected_revenue': expected_revenue | |
|                     }) | |
|                     self.env['pipedrive.record'].sudo().create({ | |
|                         'pipedrive_reference': data['id'], | |
|                         'record_type': 'lead', | |
|                         'odoo_ref': lead.id | |
|                     }) | |
| 
 | |
|     def export_products_to_pipedrive(self): | |
|         """Export Products from Odoo to Pipedrive""" | |
|         pipedrive_products = self.env['pipedrive.record'].sudo().search( | |
|             [('record_type', '=', 'product')]).mapped('odoo_ref') | |
|         for product in self.env['product.template'].sudo().search( | |
|                 [('id', 'not in', pipedrive_products)]): | |
|             data = { | |
|                 'name': product.name, | |
|                 'unit': product.uom_id.name, | |
|                 'tax': self.calculate_total_tax_percentage(product), | |
|                 'prices': [{ | |
|                     'price': product.list_price, | |
|                     'currency': self.env.company.currency_id.name | |
|                 }] | |
|             } | |
|             response = requests.post( | |
|                 url='https://api.pipedrive.com/v1/products', | |
|                 params={ | |
|                     'api_token': self.api_key, | |
|                 }, json=data, timeout=10) | |
|             if not response.json()['success']: | |
|                 raise ValidationError( | |
|                     response.json()['error'] + '. ' + response.json()[ | |
|                         'error_info']) | |
|             self.env['pipedrive.record'].sudo().create({ | |
|                 'pipedrive_reference': response.json()['data']['id'], | |
|                 'record_type': 'product', | |
|                 'odoo_ref': product.id | |
|             }) | |
| 
 | |
|     def export_contacts_to_pipedrive(self): | |
|         """Export Contacts from Odoo to Pipedrive""" | |
|         pipedrive_partner = self.env['pipedrive.record'].sudo().search( | |
|             [('record_type', '=', 'partner')]).mapped('odoo_ref') | |
|         for partner in self.env['res.partner'].sudo().search( | |
|                 [('id', 'not in', pipedrive_partner)]): | |
|             self.create_contact(partner) | |
| 
 | |
|     def create_contact(self, partner): | |
|         """Create Persons in Pipedrive""" | |
|         data = { | |
|             'name': partner.name, | |
|             'email': partner.email, | |
|             'phone': partner.phone | |
|         } | |
|         response = requests.post( | |
|             url='https://api.pipedrive.com/v1/persons', | |
|             params={ | |
|                 'api_token': self.api_key, | |
|             }, json=data, timeout=10) | |
|         if 'success' in response.json( | |
|         ).keys() and not response.json()['success'] and 'error' in \ | |
|                 response.json( | |
|                 ).keys(): | |
|             raise ValidationError( | |
|                 response.json()['error']) | |
|         self.env['pipedrive.record'].sudo().create({ | |
|             'pipedrive_reference': response.json()['data']['id'], | |
|             'record_type': 'partner', | |
|             'odoo_ref': partner.id | |
|         }) | |
| 
 | |
|     def create_webhook(self, event_action, url, event_object): | |
|         """Method for creating contact webhook in Pipedrive""" | |
|         payload = json.dumps({ | |
|             "subscription_url": self.env['ir.config_parameter'].get_param( | |
|                 'web.base.url') + url, | |
|             "event_action": event_action, | |
|             "event_object": event_object | |
|         }) | |
|         headers = { | |
|             'Content-Type': 'application/json', | |
|             'Accept': 'application/json' | |
|         } | |
|         params = { | |
|             'api_token': self.api_key, | |
|         } | |
|         existing_webhooks_response = requests.get("https://api.pipedrive.com/v1/webhooks", params=params) | |
|         webhook_found = False | |
|         if existing_webhooks_response.status_code == 200: | |
|             existing_webhooks = existing_webhooks_response.json().get('data', []) | |
|             for webhook in existing_webhooks: | |
|                 if (webhook.get('event_action') == event_action and | |
|                         webhook.get('event_object') == event_object and | |
|                         webhook.get( | |
|                             'subscription_url') == self.env['ir.config_parameter'].get_param( | |
|                             'web.base.url') + url): | |
|                     webhook_found = True | |
|         if not webhook_found: | |
|             response = requests.request("POST", | |
|                                         "https://api.pipedrive.com/v1/" | |
|                                         "webhooks", | |
|                                         headers=headers, data=payload, | |
|                                         params=params, | |
|                                         timeout=10) | |
|             if not response.json()['success']: | |
|                 if 'error' in response.json().keys(): | |
|                     raise ValidationError( | |
|                         response.json()['error']) | |
|                 if 'message' in response.json().keys(): | |
|                     raise ValidationError( | |
|                         response.json()['message']) | |
| 
 | |
|     def export_leads_to_pipedrive(self): | |
|         """Export Leads from Odoo to Pipedrive""" | |
|         pipedrive_lead = self.env['pipedrive.record'].sudo().search( | |
|             [('record_type', '=', 'lead')]).mapped('odoo_ref') | |
|         for lead in self.env['crm.lead'].sudo().search( | |
|                 [('id', 'not in', pipedrive_lead)]): | |
|             if lead.partner_id: | |
|                 if not self.env['pipedrive.record'].sudo().search( | |
|                         [('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]): | |
|                     self.create_contact(lead.partner_id) | |
|                 data = { | |
|                     'title': lead.name, | |
|                     'person_id': int(self.env['pipedrive.record'].sudo().search( | |
|                         [('record_type', '=', 'partner'), ('odoo_ref', '=', lead.partner_id.id)]).pipedrive_reference), | |
|                     'value': { | |
|                         'amount': lead.expected_revenue, | |
|                         'currency': self.env.company.currency_id.name | |
|                     } | |
|                 } | |
|                 response = requests.post( | |
|                     url='https://api.pipedrive.com/v1/leads', | |
|                     params={ | |
|                         'api_token': self.api_key, | |
|                     }, json=data, timeout=10) | |
|                 if not response.json()['success']: | |
|                     raise ValidationError( | |
|                         response.json()['error'] + '. ' + response.json()[ | |
|                             'error_info']) | |
|                 self.env['pipedrive.record'].sudo().create({ | |
|                     'pipedrive_reference': response.json()['data']['id'], | |
|                     'record_type': 'lead', | |
|                     'odoo_ref': lead.id | |
|                 })
 | |
| 
 |