Commit 2fd77b7b authored by Bram Daams's avatar Bram Daams
Browse files

test cronjob; example registry file; find_by_hash method

parent e6e5701f
Pipeline #34841 passed with stage
in 1 minute and 22 seconds
......@@ -20,6 +20,15 @@ registry = hcRegistry(cred, '/var/lib/hcregistry.json')
h = hc(cred)
h.print_status()
# scan jobs that want to use SCH
jobs = CronTabs().all.find_command('JOB_ID')
for job in jobs:
print("job:")
print(job)
hc_id = registry.get_id(job)
print(hc_id)
print("--------------")
```
And run it within the virtual environment
......
[
{
"JOB_ID": "false1",
"HC_ID": "c6078292-6ae8-43b5-a23e-9068e2f400ad",
"hash": "7fd04f078e7ec8c20d0fa5c0a7d7e7cd"
},
{
"JOB_ID": "false2",
"HC_ID": "c6078292-6ae8-43b5-a23e-9068e2f400ad",
"hash": "6f99db84b1563cbc9002ef7dbc9b48da"
}
]
# Sample cron file that does almost nothing
# some comment
1 0 * * * root JOB_ID=false1 /bin/false
2 0 * * * root JOB_ID=false2 /bin/false
......@@ -3,6 +3,8 @@ import requests
import click
import arrow
import json
import hashlib
import platform
class hcCred:
......@@ -29,6 +31,28 @@ class hcRegistry:
except Exception:
print("could not load registry")
def get_hash(self, job):
md5 = hashlib.md5()
md5.update(platform.node().encode('utf-8'))
md5.update(str(job.slices).encode('utf-8'))
md5.update(job.command.encode('utf-8'))
return md5.hexdigest()
def find_by_hash(self, job):
h = self.get_hash(job)
return next((elem for elem in self.data if elem['hash'] == h), False)
def find_by_job_id(self, job):
pass
def get_id(self, job):
r = self.find_by_hash(job)
if r:
return r['HC_ID']
# r = self.find_by_job_id(job)
return False
def register(self, id):
pass
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment