mirror of
https://github.com/halfwaystudent/douyin-sparkflow.git
synced 2026-07-03 05:11:29 +08:00
89 lines
2.8 KiB
Python
89 lines
2.8 KiB
Python
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
|
|
def expand_field(field, minimum, maximum, current):
|
|
values = set()
|
|
for part in str(field).split(","):
|
|
part = part.strip()
|
|
if not part:
|
|
continue
|
|
step = 1
|
|
if "/" in part:
|
|
part, raw_step = part.split("/", 1)
|
|
step = max(1, int(raw_step))
|
|
if part == "*":
|
|
start, end = minimum, maximum
|
|
elif "-" in part:
|
|
raw_start, raw_end = part.split("-", 1)
|
|
start, end = int(raw_start), int(raw_end)
|
|
else:
|
|
start = end = int(part)
|
|
values.update(range(max(minimum, start), min(maximum, end) + 1, step))
|
|
return current in values
|
|
|
|
|
|
def cron_dow(now):
|
|
return (now.weekday() + 1) % 7
|
|
|
|
|
|
def field_matches(field, minimum, maximum, current, *, allow_sunday_alias=False):
|
|
if allow_sunday_alias and current == 0:
|
|
return expand_field(field, minimum, maximum, 0) or expand_field(field, minimum, maximum, 7)
|
|
return expand_field(field, minimum, maximum, current)
|
|
|
|
|
|
def line_should_run(line, now):
|
|
parts = line.split(maxsplit=5)
|
|
if len(parts) < 6:
|
|
return False, ""
|
|
minute, hour, day, month, weekday, command = parts
|
|
try:
|
|
matched = (
|
|
field_matches(minute, 0, 59, now.minute)
|
|
and field_matches(hour, 0, 23, now.hour)
|
|
and field_matches(day, 1, 31, now.day)
|
|
and field_matches(month, 1, 12, now.month)
|
|
and field_matches(weekday, 0, 7, cron_dow(now), allow_sunday_alias=True)
|
|
)
|
|
except ValueError:
|
|
return False, ""
|
|
return matched, command
|
|
|
|
|
|
def read_crontab(path):
|
|
if not path.exists():
|
|
return []
|
|
lines = []
|
|
for raw_line in path.read_text(encoding="utf-8", errors="replace").splitlines():
|
|
line = raw_line.strip()
|
|
if not line or line.startswith("#") or "=" in line.split(maxsplit=1)[0]:
|
|
continue
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
def run_loop(crontab_path):
|
|
crontab_path.parent.mkdir(parents=True, exist_ok=True)
|
|
last_minute_key = None
|
|
print(f"[cron_runner] watching {crontab_path}", flush=True)
|
|
while True:
|
|
now = datetime.now()
|
|
minute_key = now.strftime("%Y-%m-%d %H:%M")
|
|
if minute_key != last_minute_key:
|
|
last_minute_key = minute_key
|
|
for line in read_crontab(crontab_path):
|
|
matched, command = line_should_run(line, now)
|
|
if matched and command:
|
|
print(f"[cron_runner] {now.isoformat(timespec='seconds')} run: {command}", flush=True)
|
|
subprocess.Popen(["/bin/bash", "-lc", command])
|
|
time.sleep(15)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
target = Path(sys.argv[1] if len(sys.argv) > 1 else "/host-spool-cron/root")
|
|
run_loop(target)
|