Source code for catcher_modules.service.elastic
from catcher.steps.external_step import ExternalStep
from catcher.steps.step import Step, update_variables
[docs]class Elastic(ExternalStep):
"""
Allows you to get data from `Elasticsearch <https://www.elastic.co/elastic-stack>`_. Useful, when your services push their
logs there and you need to check the logs automatically from the test.
:Input:
:search: search elastic
- url: RFC-1738 compatible (can contain user credentials) server url.
- index: ES index (database).
- query: your query to run.
- <other param>: you can add any param here (see Search with limiting fields for an example)
:refresh: Trigger a refresh for an index.
- url: RFC-1738 compatible (can contain user credentials) server url.
- index: ES index (database).
:Examples:
Search with limiting fields
::
elastic:
search:
url: 'http://127.0.0.1:9200'
index: test
query:
match: {payload : "three"}
_source: ['name']
register: {docs: '{{ OUTPUT }}'}
Connect to multiple ES instances. One simple and one secured
::
elastic:
search:
url:
- 'http://127.0.0.1:9200'
- 'https://{{ user }}:{{ secret }}@{{ host2 }}:443'
index: test
query: {match_all: {}}
Refresh index
::
elastic:
refresh:
url: 'http://127.0.0.1:9092'
index: test
In bool query `must` and `should` are lists
::
elastic:
search:
url: 'http://127.0.0.1:9200'
index: test
query:
bool:
must:
- term: {shape: "round"}
- bool:
should:
- term: {color: "red"}
- term: {color": "blue"}
"""
@update_variables
def action(self, includes: dict, variables: dict) -> any:
from elasticsearch import Elasticsearch
body = self.simple_input(variables)
method = Step.filter_predefined_keys(body) # search/refresh
conf = body[method]
index = conf['index']
url = conf['url']
if isinstance(url, str):
url = [url]
es = Elasticsearch(url)
if method == 'search':
return variables, self._search(es, index, conf)
elif method == 'refresh':
return variables, self._refresh(es, index)
else:
raise AttributeError('unknown method: ' + method)
def _search(self, es, index, conf):
query = dict([(key, value) for key, value in conf.items() if key != 'index' and key != 'url'])
res = es.search(index, query)
return [hit['_source'] for hit in res['hits']['hits']]
def _refresh(self, es, index):
return es.indices.refresh(index)