Source code for catcher_modules.database.mongo

from catcher.steps.external_step import ExternalStep
from catcher.steps.step import update_variables


[docs]class Mongo(ExternalStep): """ Allows you to interact with `MongoDB <https://www.mongodb.com/>`_ NoSQL database. :Input: :conf: mongodb configuration. Can be a single line, object or object with url as a parameter (for Airflow connection) `string <https://docs.mongodb.com/manual/reference/connection-string/>`_ url or kv object. **Required**. - database: name of the database to connect to - username: database user. Must be RFC 2396 encoded when in URI. - host: database host - password: user's password. Must be RFC 2396 encoded when in URI. - port: database port - authSource: The database to authenticate on. Default is database. See `pymongo <http://api.mongodb.com/python/current/api/pymongo/mongo_client.html>`_ for more options. :collection: collection to use. **Required** :command: String. Use this if you have to run command without any parameters. Where command's value is your command to run, like `command: find_one`. `Optional` :<command>: Object. Use this when you have command with parameters. Where <command> key is your command name and it's value is parameter object (list or dict). `Optional` Either <command> or command should exist. :next: Run other operation just after your operation. Can be string like `next: count` or object with params `next: {'sort': 'author'}. You can chain multiple next (see example). `Optional` :list_params: Pass command params as different arguments. Useful when pymongo command takes several arguments (both `*args` and `**kwargs`). `*args` will be set in case of params in list while `**kwargs` will be sent in case of dict. See examples for more info. :Examples: Find one document. Use **command** key when no params. :: mongo: request: conf: database: test username: test password: test host: localhost port: 27017 collection: 'your_collection' command: 'find_one' register: {document: '{{ OUTPUT }}'} Use object configuration with extra fields (for Airflow connection). This step will ignore everything except url. Inventory.yaml :: mongo_conf: url: 'mongodb://username:password@host' type: 'mongo' extra: '{"key":"value"}' mongo step itself :: mongo: request: conf: '{{ mongo_conf }}' collection: 'your_collection' command: 'find_one' See more info about connections population in Catcher-Airflow `docs https://catcher-modules.readthedocs.io/en/latest/source/airflow.html`_ Alternatively you can use ``conf: '{{ mongo_conf.url }}'``. Insert into test, using string configuration :: mongo: request: conf: 'mongodb://username:password@host' collection: 'your_collection' insert_one: 'author': 'Mike' 'text': 'My first blog post!' 'tags': ['mongodb', 'python', 'pymongo'] 'date': '{{ NOW_DT }}' Find specific document :: mongo: request: conf: database: test username: test password: test host: localhost port: 27017 collection: 'your_collection' find_one: {'author': 'Mike'} register: {document: '{{ OUTPUT }}'} To find multiple documents just use **find** instead of **find_one**. Bulk insert :: mongo: request: conf: '{{ mongo_conf }}' collection: 'your_collection' insert_many: - {'foo': 'baz'} - {'foo': 'bar'} Chaining operations: db.collection.find().sort().count() :: mongo: request: conf: database: test username: test password: test host: localhost port: 27017 collection: 'your_collection' find: {'author': 'Mike'} next: sort: 'author' next: 'count' register: {document: '{{ OUTPUT }}'} Will run every next operation on previous one. You can chain more than one operation. Run operation with list parameters (`**kwargs`). Is useful when calling commands with additional arguments. :: mongo: request: conf: database: test username: test password: test host: localhost port: 27017 collection: 'your_collection' find: filter: {'author': 'Mike'} projection: {'_id': False} list_params: true # pass list arguments as separate params register: {document: '{{ OUTPUT }}'} Run operation with list parameters (`*args`). Run map-reduce. :: mongo: request: conf: database: test username: test password: test host: localhost port: 27017 collection: 'your_collection' map_reduce: - 'function () { this.tags.forEach(function(z) { emit(z, 1); }); }' - 'function (key, values) { var total = 0; for (var i = 0; i < values.length; i++) { total += values[i]; } return total; }' - 'myresults' list_params: true # pass list arguments as separate params register: {document: '{{ OUTPUT }}'} """ @update_variables def action(self, includes: dict, variables: dict) -> any: from pymongo import MongoClient body = self.simple_input(variables) in_data = body['request'] conf = in_data['conf'] collection = in_data['collection'] if isinstance(conf, str): # url client = MongoClient(conf) database = client.get_database('test') elif 'url' in conf: client = MongoClient(conf['url']) database = client.get_database('test') else: database = conf.pop('database', 'test') conf.pop('type', None) # removes airflow step specific field conf.pop('extra', None) # removes airflow step specific field client = MongoClient(**conf) database = client.get_database(database) try: action = Action(in_data) result = action(database[collection]) return variables, result finally: client.close()
class Action: def __init__(self, in_data: dict or str) -> None: super().__init__() if isinstance(in_data, str): # next: count self.action = in_data else: if 'command' in in_data: # action with no params self.action = in_data['command'] else: # action with params [action] = [k for k in in_data.keys() if k != 'conf' and k != 'next' and k != 'collection' and k != 'list_params'] self.action = action self.params = in_data[action] if 'next' in in_data: self.next = Action(in_data['next']) self.list_params = in_data.get('list_params', False) def __call__(self, collection): from pymongo.cursor import Cursor if hasattr(self, 'params'): if self.list_params and isinstance(self.params, list): res = getattr(collection, self.action)(*self.params) elif self.list_params and isinstance(self.params, dict): res = getattr(collection, self.action)(**self.params) else: res = getattr(collection, self.action)(self.params) else: res = getattr(collection, self.action)() if hasattr(self, 'next'): return self.next(res) if isinstance(res, Cursor): return drop_ids(list(res)) try: if '_id' in res: res.pop('_id') return res except TypeError: pass if hasattr(res, 'modified_count'): return res.modified_count if hasattr(res, 'deleted_count'): return res.deleted_count if hasattr(res, 'inserted_id'): return res.inserted_id if hasattr(res, 'inserted_ids'): return res.inserted_ids return res def drop_ids(results): for res in results: res.pop('_id') return results