Commit 789b3c64 authored by Bram Daams's avatar Bram Daams
Browse files

sake_cased_methods and more fixes to please pylint

parent c9154d0e
Pipeline #36628 failed with stage
in 1 minute and 27 seconds
......@@ -12,20 +12,24 @@ $ pip install .
## Test
For now, create a file `test.py`
``` python
from hc import hcCred, Healthchecks
from crontabs import CronTabs
from hc import HealthcheckCredentials, Healthchecks
cred = HealthcheckCredentials(
api_url='https://hc.example.com/api/v1/',
api_key='mysecretapikey'
)
cred = hcCred('https://hc.example.com/api/v1/', 'mysecretapikey')
h = Healthchecks(cred)
h.PrintStatus()
# scan jobs that want to use SCH
jobs = CronTabs().all.find_command('JOB_ID')
for job in jobs:
check = h.FindCheck(job)
check = h.find_check(job)
if check:
h.UpdateCheck(check, job)
h.update_check(check, job)
else:
h.NewCheck(job)
h.new_check(job)
```
And run it within the virtual environment
......
......@@ -2,29 +2,22 @@
module for interfacing a healthchecks.io compatible service
"""
import sys
import click
import arrow
import hashlib
import platform
import re
import collections
import arrow
import click
import requests
import tzlocal
HealthcheckCredentials = collections.namedtuple('HealthcheckCredentials', 'api_url api_key')
class hcCred:
class Healthchecks:
"""
Object to store the healthchecks api url and key
Interfaces with e healthckecks.io compatible API to register
cron jobs found on the system.
"""
def __init__(self, url, api_key):
self.url = url
self.api_key = api_key
def __repr__(self):
return """healthchecks api access credentials
to {url}""".format(url=self.url)
class Healthchecks:
def __init__(self, cred):
self.cred = cred
self.auth_headers = {'X-Api-Key': self.cred.api_key}
......@@ -32,7 +25,7 @@ class Healthchecks:
def get_checks(self):
"""Returns a list of checks from the HC API"""
url = "{}checks/".format(self.cred.url)
url = "{}checks/".format(self.cred.api_url)
try:
response = requests.get(url, headers=self.auth_headers)
......@@ -45,11 +38,11 @@ class Healthchecks:
raise Exception('fetching cron checks failed')
def FindCheck(self, job):
def find_check(self, job):
"""
Find a check in Healthchecks for the host and given job
"""
job_id = self.GetJobId(job)
job_id = self.get_job_id(job)
tag_for_job = 'job_id={job_id}'.format(job_id=job_id)
tag_for_host = 'host={hostname}'.format(hostname=platform.node())
......@@ -69,18 +62,20 @@ class Healthchecks:
return None
def GetJobTags(self, job):
@staticmethod
def get_job_tags(job):
"""
Returns the tags specified in the environment variable
JOB_TAGS in the cron job
"""
regex = r'.*JOB_TAGS=([\w,]*)'
m = re.match(regex, job.command)
if m:
return m.group(1).replace(',', ' ')
match = re.match(regex, job.command)
if match:
return match.group(1).replace(',', ' ')
return ""
def GetJobId(self, job):
@staticmethod
def get_job_id(job):
"""
Returns the value of environment variable JOB_ID if specified
in the cron job
......@@ -92,7 +87,8 @@ class Healthchecks:
return None
def GenerateJobHash(self, job):
@staticmethod
def generate_job_hash(job):
"""Returns the unique hash for given cron job"""
md5 = hashlib.md5()
md5.update(platform.node().encode('utf-8'))
......@@ -100,7 +96,16 @@ class Healthchecks:
md5.update(job.command.encode('utf-8'))
return md5.hexdigest()
def GetCheckHash(self, check):
@staticmethod
def get_check_hash(check):
"""
returns the hash stored in a tag of a healthchecks check
the tags lookes like:
hash=fdec0d88e53cc57ef666c8ec548c88bb
returns None if the tag is not found
"""
regex = r"hash=(\w*)"
hash_search = re.search(regex, check['tags'])
......@@ -109,9 +114,12 @@ class Healthchecks:
return None
def UpdateCheck(self, check, job):
job_hash = self.GenerateJobHash(job)
check_hash = self.GetCheckHash(check)
def update_check(self, check, job):
"""
update check metadata for given cron job
"""
job_hash = self.generate_job_hash(job)
check_hash = self.get_check_hash(check)
if check_hash:
if job_hash == check_hash:
......@@ -132,9 +140,9 @@ class Healthchecks:
'tags': 'sch host={host} job_id={job_id} '
'hash={hash} {tags}'.format(
host=platform.node(),
job_id=self.GetJobId(job),
job_id=self.get_job_id(job),
hash=job_hash,
tags=self.GetJobTags(job)
tags=self.get_job_tags(job)
)
}
......@@ -154,8 +162,11 @@ class Healthchecks:
return True
def NewCheck(self, job):
job_hash = self.GenerateJobHash(job)
def new_check(self, job):
"""
creates a new check for given job
"""
job_hash = self.generate_job_hash(job)
# gather all the jobs' metadata
data = {
......@@ -168,16 +179,16 @@ class Healthchecks:
'tags': 'sch host={host} job_id={job_id} '
'hash={hash} {tags}'.format(
host=platform.node(),
job_id=self.GetJobId(job),
job_id=self.get_job_id(job),
hash=job_hash,
tags=self.GetJobTags(job)
tags=self.get_job_tags(job)
)
}
# post the data
try:
response = requests.post(
url='{}/checks/'.format(self.cred.url),
url='{}/checks/'.format(self.cred.api_url),
headers=self.auth_headers,
json=data
)
......@@ -191,7 +202,7 @@ class Healthchecks:
print('check created')
return True
def PrintStatus(self, status_filter=""):
def print_status(self, status_filter=""):
"""Show status of monitored cron jobs"""
click.secho("{status:<6} {last_ping:<15} {name:<40}".format(
status="Status",
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment