Source code for catcher_modules.service.salesforce

import collections

import requests
from catcher.steps.external_step import ExternalStep
from catcher.steps.step import Step, update_variables


[docs]class Salesforce(ExternalStep): """ Allows you to work with `Salesforce <https://www.salesforce.com/>`_. :Config: - instance: your Salesforce instance. *Optional*. Either instance or instance_url must exist. - instance_url: full URL of your instance. *Optional*. Either instance or instance_url must exist. - username: your username *Optional*. - password: your password *Optional*. - security_token: token is usually provided when you change your password *Optional*. - organizationId: whitelisted Organization ID. *Optional*. - consumer_key: consumer key from your app for JWT auth. *Optional*. - privatekey_file: path to the private key file (with resources as root) for the JWT auth. *Optional* - client_id: used for requests tracking. *Optional*. - domain: domain to be used. *Optional*. - session: Salesforce session. *Optional*. :Input: :query: run SOQL query - soql: query to run - config: Config object :<action>: record's possible action: create/update/upsert/get/get_by_custom_id/delete/deleted/updated - <record>: <data> where **<record>** is SF record name. and **<data>** is an action param. In case of multiple params use list (see examples) :Examples: Run SOQL query :: salesforce: query: config: password='password' username='myemail@example.com' organizationId='OrgId' soql: "SELECT Id, Email FROM Contact WHERE LastName = 'Jones'" register: {contacts: '{{ OUTPUT }}'} Create new record :: salesforce: create: config: password='password' username='myemail@example.com' organizationId='OrgId' Contact: LastName: Smith Email: example@example.com Upsert a record. First param is the upsert id, second is the record :: salesforce: upsert: config: password='password' username='myemail@example.com' organizationId='OrgId' Contact: - customExtIdField__c/11999 - LastName: Smith Email: example@example.com """ valid_conf = ['instance', 'instance_url', 'username', 'password', 'security_token', 'organizationId', 'consumer_key', 'privatekey_file', 'client_id', 'domain', 'session'] sessions = {} @update_variables def action(self, includes: dict, variables: dict) -> any: body = self.simple_input(variables) method = Step.filter_predefined_keys(body) # query/<record_based> salesforce = self._do_login(body[method]['config']) if method == 'query': return variables, self._query(salesforce, body['query']) else: # record action return variables, self.deep_convert_dict(self._record(salesforce, method, body[method])) def _do_login(self, config): conf = {key: value for (key, value) in config.items() if key in self.valid_conf} if 'instance' in conf and 'instance_url' in conf: raise Exception('instance and instance_url are mutually exclusive') if 'session' in conf: conf['session'] = self.sessions.get(conf['session'], requests.Session()) from simple_salesforce import Salesforce return Salesforce(**conf) def _query(self, sf, query): import json return json.loads(json.dumps(sf.query_all(query['soql']))) # json to avoid OrderedDict def _record(self, sf, action, params: dict): del params['config'] record_name = list(params.keys())[0] if not hasattr(sf, record_name): raise AttributeError("Salesforce object doesn't have record " + record_name) record = getattr(sf, record_name) if isinstance(params[record_name], list): return getattr(record, action)(*params[record_name]) else: return getattr(record, action)(params[record_name]) @staticmethod def deep_convert_dict(layer): to_ret = layer if isinstance(layer, collections.OrderedDict): to_ret = dict(layer) try: for key, value in to_ret.items(): to_ret[key] = Salesforce.deep_convert_dict(value) except AttributeError: pass return to_ret