Source code for catcher_modules.database.couchbase

from catcher.steps.external_step import ExternalStep
from catcher.steps.step import update_variables


[docs]class Couchbase(ExternalStep): """ Allows you to perform put/get/delete/query operations in `Couchbase <https://www.couchbase.com/>`_ :Input: :conf: couchbase configuration. Is an object. **Required**. - bucket: bucket to work with - user: database user (optional) - host: database host (optional) - password: user's password :put: put value in the database by the key. :get: get object by key. :delete: delete object by key. :query: query to run. :Examples: Put value by key :: couchbase: request: conf: bucket: test host: localhost put: key: my_key value: {foo: bar, baz: [1,2,3,4]} Get value by key :: couchbase: request: conf: bucket: test user: test password: test host: localhost get: key: my_key Delete value by key :: couchbase: request: conf: bucket: test user: test password: test host: localhost delete: key: my_key Query by foo :: couchbase: request: conf: bucket: test user: test password: test host: localhost query: "select `baz` from test where `foo` = 'bar'" """ @update_variables def action(self, includes: dict, variables: dict) -> any: from couchbase.cluster import Cluster, PasswordAuthenticator body = self.simple_input(variables) in_data = body['request'] conf = in_data['conf'] cluster = Cluster('couchbase://' + conf['host']) if 'user' in conf and 'password' in conf: cluster.authenticate(PasswordAuthenticator(conf['user'], conf['password'])) bucket = cluster.open_bucket(conf['bucket']) if 'put' in in_data: put = in_data['put'] result = bucket.upsert(put['key'], put['value']) if result.success: return variables, {} else: raise RuntimeError(result.errstr) if 'get' in in_data: get = in_data['get'] result = bucket.get(get['key'], quiet=True) if result is None: raise RuntimeError('no data found for key ' + get['key']) return variables, result.value if 'delete' in in_data: delete = in_data['delete'] bucket.remove(delete['key'], quiet=True) return variables, {} if 'query' in in_data: query = in_data['query'] res = [row for row in bucket.n1ql_query(query)] if len(res) == 1: return variables, res[0] return variables, res raise RuntimeError('nothing to do: ' + str(in_data))