1
0
mirror of https://gitee.com/sui-feng-cb/AzurLaneAutoScript1 synced 2026-03-09 18:39:04 +08:00
AzurLaneAutoScript/module/webui/utils.py

415 lines
12 KiB
Python
Raw Normal View History

import ctypes
2022-01-10 00:42:40 +08:00
import datetime
import operator
import re
import threading
import time
from typing import Callable, Generator, List
from module.logger import logger
2021-10-14 13:31:54 +08:00
from pywebio.input import PASSWORD, input
from pywebio.output import toast
from pywebio.session import eval_js, register_thread, run_js
RE_DATETIME = r'(\d{2}|\d{4})(?:\-)?([0]{1}\d{1}|[1]{1}[0-2]{1})(?:\-)?' + \
r'([0-2]{1}\d{1}|[3]{1}[0-1]{1})(?:\s)?([0-1]{1}\d{1}|[2]' + \
r'{1}[0-3]{1})(?::)?([0-5]{1}\d{1})(?::)?([0-5]{1}\d{1})'
class QueueHandler:
def __init__(self, q) -> None:
self.queue = q
2021-10-03 21:40:05 +08:00
def write(self, s: str):
if s.endswith('\n'):
s = s[:-1]
# reduce log length by cutting off the date.
self.queue.put(s[11:] + '\n')
2021-10-03 21:40:05 +08:00
class Thread(threading.Thread):
# https://www.geeksforgeeks.org/python-different-ways-to-kill-a-thread/
2022-01-08 21:45:05 +08:00
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
2022-01-10 00:42:40 +08:00
super().__init__(group=group, target=target, name=name,
args=args, kwargs=kwargs, daemon=daemon)
def _get_id(self):
# returns id of the respective thread
if hasattr(self, '_thread_id'):
return self._thread_id
2021-10-03 21:40:05 +08:00
for thd_id, thread in threading._active.items():
if thread is self:
2021-10-03 21:40:05 +08:00
return thd_id
def stop(self):
thread_id = self._get_id()
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id,
2021-10-03 21:40:05 +08:00
ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
print('Exception raise failure')
2021-10-03 21:40:05 +08:00
class Task:
def __init__(self, g: Generator, delay: float, next_run: float = None) -> None:
self.g = g
2022-01-09 18:59:25 +08:00
g.send(None)
self.delay = delay
self.next_run = next_run if next_run else time.time()
def __str__(self) -> str:
return f'<{self.g.__name__} (delay={self.delay})>'
def __next__(self) -> None:
return next(self.g)
2022-01-09 18:59:25 +08:00
def send(self, obj) -> None:
return self.g.send(obj)
__repr__ = __str__
class TaskHandler:
2022-01-08 21:45:05 +08:00
def __init__(self, use_pywebio=False) -> None:
# List of background running task
self.tasks: List[Task] = []
# List of task name to be removed
self.pending_remove_tasks: List[Task] = []
# Running task
self._task = None
# Task running thread
2022-01-08 21:45:05 +08:00
self._thread: Thread = None
self._lock = threading.Lock()
2022-01-08 21:45:05 +08:00
# register pywebio thread
self.use_pywebio = use_pywebio
def add(self, func, delay: float, pending_delete: bool = False) -> None:
"""
Add a task running background.
Another way of `self.add_task()`.
func: Callable or Generator
"""
if isinstance(func, Callable):
g = get_generator(func)
elif isinstance(func, Generator):
g = func
self.add_task(Task(g, delay), pending_delete=pending_delete)
def add_task(self, task: Task, pending_delete: bool = False) -> None:
"""
Add a task running background.
"""
if task in self.tasks:
logger.warning(f"Task {task} already in tasks list.")
return
logger.info(f"Add task {task}")
with self._lock:
self.tasks.append(task)
if pending_delete:
self.pending_remove_tasks.append(task)
def _remove_task(self, task: Task) -> None:
if task in self.tasks:
self.tasks.remove(task)
logger.info(f"Task {task} removed.")
else:
logger.warning(
f"Failed to remove task {task}. Current tasks list: {self.tasks}")
def remove_task(self, task: Task, nowait: bool = False) -> None:
"""
Remove a task in `self.tasks`.
Args:
task:
nowait: if True, remove it right now,
otherwise remove when call `self.remove_pending_task`
"""
if nowait:
with self._lock:
self._remove_task(task)
else:
self.pending_remove_tasks.append(task)
def remove_pending_task(self) -> None:
"""
Remove all pending remove tasks.
"""
with self._lock:
for task in self.pending_remove_tasks:
self._remove_task(task)
self.pending_remove_tasks = []
def remove_current_task(self) -> None:
self.remove_task(self._task, nowait=True)
def loop(self) -> None:
"""
Start task loop.
You **should** run this function in an individual thread.
"""
while True:
if self.tasks:
with self._lock:
self.tasks.sort(key=operator.attrgetter('next_run'))
task = self.tasks[0]
if task.next_run < time.time():
start_time = time.time()
try:
self._task = task
# logger.debug(f'Start task {task.g.__name__}')
2022-01-09 18:59:25 +08:00
task.send(self)
# logger.debug(f'End task {task.g.__name__}')
except Exception as e:
logger.exception(e)
self.remove_task(task, nowait=True)
finally:
self._task = None
end_time = time.time()
task.next_run += task.delay
with self._lock:
for task in self.tasks:
task.next_run += end_time - start_time
else:
time.sleep(0.05)
else:
time.sleep(0.5)
def start(self) -> None:
"""
Start task handler.
"""
logger.info("Start task handler")
2022-01-08 21:45:05 +08:00
if self._thread is not None and self._thread.is_alive():
logger.warning("Task handler already running!")
return
self._thread = Thread(target=self.loop)
2022-01-08 21:45:05 +08:00
if self.use_pywebio:
register_thread(self._thread)
self._thread.start()
def stop(self) -> None:
self.remove_pending_task()
if self._thread.is_alive():
self._thread.stop()
logger.info("Finish task handler")
class Switch:
def __init__(self, status, get_state, name=None):
"""
Args:
status
(dict):A dict describes each state.
{
0: {
'func': (Callable)
},
1: {
'func'
'args': (Optional, tuple)
'kwargs': (Optional, dict)
},
2: [
func1,
{
'func': func2
'args': args2
}
]
-1: []
}
(Callable):current state will pass into this function
lambda state: do_update(state=state)
get_state:
(Callable):
return current state
(Generator):
yield current state, do nothing when state not in status
name:
"""
self._lock = threading.Lock()
self.name = name
self.status = status
self.get_state = get_state
if isinstance(get_state, Generator):
self._generator = get_state
elif isinstance(get_state, Callable):
self._generator = self._get_state()
@staticmethod
def get_state():
pass
def _get_state(self):
"""
Predefined generator when `get_state` is an callable
Customize it if you have multiple criteria on state
"""
_status = self.get_state()
yield _status
while True:
status = self.get_state()
if _status != status:
_status = status
yield _status
continue
yield -1
def switch(self):
with self._lock:
r = next(self._generator)
if callable(self.status):
self.status(r)
elif r in self.status:
f = self.status[r]
2022-01-10 00:42:40 +08:00
if isinstance(f, (dict, Callable)):
f = [f]
for d in f:
if isinstance(d, Callable):
d = {'func': d}
func = d['func']
args = d.get('args', tuple())
kwargs = d.get('kwargs', dict())
func(*args, **kwargs)
def g(self) -> Generator:
g = get_generator(self.switch)
if self.name:
name = self.name
else:
name = self.get_state.__name__
g.__name__ = f'Switch_{name}_refresh'
return g
def get_generator(func: Callable):
def _g():
2022-01-09 18:59:25 +08:00
yield
while True:
yield func()
g = _g()
g.__name__ = func.__name__
return g
def filepath_css(filename):
return f'./assets/gui/css/{filename}.css'
2021-10-03 21:40:05 +08:00
def filepath_icon(filename):
return f'./assets/gui/icon/{filename}.svg'
2021-10-03 21:40:05 +08:00
def add_css(filepath):
with open(filepath, "r") as f:
css = f.read().replace('\n', '')
run_js(f"""$('head').append('<style>{css}</style>')""")
2021-10-03 21:40:05 +08:00
def _read(path):
with open(path, 'r') as f:
return f.read()
2021-10-03 21:40:05 +08:00
class Icon:
"""
Storage html of icon.
"""
ALAS = _read(filepath_icon('alas'))
SETTING = _read(filepath_icon('setting'))
RUN = _read(filepath_icon('run'))
DEVELOP = _read(filepath_icon('develop'))
2021-10-24 16:54:07 +08:00
ADD = _read(filepath_icon('add'))
2021-10-03 21:40:05 +08:00
def parse_pin_value(val):
"""
input, textarea return str
select return its option (str or int)
checkbox return [] or [True] (define in put_checkbox_)
"""
if isinstance(val, list):
if len(val) == 0:
return False
else:
return True
elif isinstance(val, (int, float)):
return val
else:
try:
v = float(val)
2021-10-03 21:40:05 +08:00
except ValueError:
return val
if v.is_integer():
return int(v)
else:
2021-10-03 21:40:05 +08:00
return v
2021-10-09 17:20:59 +08:00
2021-10-09 17:20:59 +08:00
def login(password):
2021-11-07 16:01:48 +08:00
if get_localstorage('password') == password:
return True
pwd = input(label='Please login below.',
type=PASSWORD, placeholder='PASSWORD')
2021-10-09 17:20:59 +08:00
if pwd == password:
2021-11-07 16:01:48 +08:00
set_localstorage('password', pwd)
2021-10-09 17:20:59 +08:00
return True
else:
toast('Wrong password!', color='error')
2021-10-14 13:31:54 +08:00
return False
def get_window_visibility_state():
ret = eval_js("document.visibilityState")
return False if ret == "hidden" else True
2021-11-07 16:01:48 +08:00
# https://pywebio.readthedocs.io/zh_CN/latest/cookbook.html#cookie-and-localstorage-manipulation
def set_localstorage(key, value):
return run_js("localStorage.setItem(key, value)", key=key, value=value)
def get_localstorage(key):
return eval_js("localStorage.getItem(key)", key=key)
def re_fullmatch(pattern, string):
if pattern == 'datetime':
pattern = RE_DATETIME
# elif:
return re.fullmatch(pattern=pattern, string=string)
2021-11-07 16:01:48 +08:00
2022-01-10 00:42:40 +08:00
def get_next_time(t: datetime.time):
now = datetime.datetime.today().time()
second = (t.hour - now.hour) * 3600 + \
(t.minute - now.minute) * 60 + \
(t.second - now.second)
if second < 0:
second += 86400
return second
if __name__ == '__main__':
def gen(x):
n = 0
while True:
n += x
print(n)
yield n
th = TaskHandler()
th.start()
t1 = Task(gen(1), delay=1)
t2 = Task(gen(-2), delay=3)
th.add_task(t1)
th.add_task(t2)
time.sleep(5)
th.remove_task(t2, nowait=True)
time.sleep(5)
th.stop()