测试
The same python implementation for curl function
'''
sid=`curl -u admin:changeme -k https://localhost:8089/services/search/jobs -d search="search source=\"http:hec_test\" refresh | head 21" 2>/dev/null | sed "1,2d" | sed "2d" | sed "s/.*>\([0-9]*\.[0-9]*\)<.*/\1/"`
echo $sid
curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid?output_mode=json
curl -u admin:changeme -k https://localhost:8089/services/search/jobs/$sid/results/ --get -d output_mode=json 2>/dev/null >out.json
'''
基于Python3的封装
# coding=utf-8
import urllib
import httplib2
from xml.dom import minidom
import time
import json
import traceback
class SplunkQuery(object):
def __init__(self):
self.baseurl = 'https://IP:8089'
self.userName = 'xxx'
self.password = 'xxx'
self.sessionKey = self.get_key()
def get_key(self):
session_key = ""
try:
server_content = httplib2.Http(disable_ssl_certificate_validation=True).request(
self.baseurl + '/services/auth/login', 'POST', headers={}, body=urllib.parse.urlencode({'username': self.userName, 'password': self.password}))[1]
session_key = minidom.parseString(server_content).getElementsByTagName('sessionKey')[
0].childNodes[0].nodeValue
except:
# traceback.print_exc()
pass
return session_key
def submit_job(self, search_query, earliest_time=None, latest_time=None):
# check if the query has the search operator
if not search_query.startswith('search'):
search_query = 'search ' + search_query
data = {'search': search_query}
if earliest_time:
data['earliest_time'] = earliest_time
if latest_time:
data['latest_time'] = latest_time
sid_body = httplib2.Http(disable_ssl_certificate_validation=True) \
.request(self.baseurl + '/services/search/jobs',
'POST',
headers={
'Authorization': 'Splunk %s' % self.sessionKey},
body=urllib.parse.urlencode(data))[1]
sid = minidom.parseString(sid_body).getElementsByTagName("sid")[0].childNodes[0].nodeValue
print("sid:" + sid)
return sid
def request_results(self, sid):
start = time.time()
response = httplib2.Http(disable_ssl_certificate_validation=True) \
.request(self.baseurl + '/services/search/jobs/' + sid +
"?output_mode=json", 'POST',
headers={
'Authorization': 'Splunk %s' % self.sessionKey},
body=urllib.parse.urlencode({}))[1]
data = json.loads(response)
while not data["entry"][0]["content"]["isDone"]:
time.sleep(0.1)
response = httplib2.Http(disable_ssl_certificate_validation=True) \
.request(self.baseurl + '/services/search/jobs/' + sid +
"?output_mode=json",
'POST',
headers={
'Authorization': 'Splunk %s' % self.sessionKey},
body=urllib.parse.urlencode({}))[1]
data = json.loads(response)
request_time = time.time() - start
print("result event count:", data["entry"][0]["content"]["eventCount"], "request time:", request_time)
result_response = httplib2.Http(disable_ssl_certificate_validation=True) \
.request(self.baseurl + '/services/search/jobs/' + sid + "/results?count=0",
'GET',
headers={
'Authorization': 'Splunk %s' % self.sessionKey},
body=urllib.parse.urlencode({"output_mode": "json"}))[1]
results = json.loads(result_response)["results"]
print(len(results))
# assert data["entry"][0]["content"]["eventCount"] == len(results)
end = time.time()
print("result count:", len(results), "result request time:", end - start)
# response = httplib2.Http(disable_ssl_certificate_validation=True) \
# .request(self.baseurl + '/services/search/jobs/' + sid +
# "?output_mode=json", 'DELETE',
# headers={
# 'Authorization': 'Splunk %s' % self.sessionKey},
# body=urllib.parse.urlencode({}))[1]
return results
def run(self, searchQuery, earliest_time=None, latest_time=None):
start = time.time()
sid = self.submit_job(searchQuery, earliest_time)
result = self.request_results(sid)
end = time.time()
print("search time:", end - start)
return result
调用
print(">>>>>>>>>>>>>>>>SplunkQuery>>>>>>>>>>>>>>>>>>>>>")
Q = SplunkQuery()
result = Q.run(searchQuery='''index=xx sourcetype=xx''')
print(result[0])
result = Q.run(searchQuery='''index=xx sourcetype=xx''', earliest_time="2020-06-19T12:00:00")
print(result[5])