1
0
mirror of https://gitee.com/sui-feng-cb/AzurLaneAutoScript1 synced 2026-03-09 18:39:04 +08:00
AzurLaneAutoScript/module/device/method/scrcpy/scrcpy.py

160 lines
5.3 KiB
Python
Raw Normal View History

import time
from functools import wraps
import numpy as np
from adbutils.errors import AdbError
import module.device.method.scrcpy.const as const
from module.base.utils import random_rectangle_point
from module.device.method.minitouch import insert_swipe
from module.device.method.scrcpy.core import ScrcpyCore, ScrcpyError
2023-01-07 02:43:52 +08:00
from module.device.method.uiautomator_2 import Uiautomator2
2022-12-29 19:22:55 +08:00
from module.device.method.utils import RETRY_TRIES, retry_sleep, handle_adb_error
from module.exception import RequestHumanTakeover
from module.logger import logger
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (Minitouch):
"""
init = None
for _ in range(RETRY_TRIES):
try:
if callable(init):
2022-12-29 19:22:55 +08:00
retry_sleep(_)
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# When adb server was killed
except ConnectionResetError as e:
logger.error(e)
def init():
self.adb_reconnect()
# Emulator closed
except ConnectionAbortedError as e:
logger.error(e)
def init():
self.adb_reconnect()
# ScrcpyError
except ScrcpyError as e:
logger.error(e)
def init():
self.scrcpy_init()
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# Unknown, probably a trucked image
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
2023-01-07 02:43:52 +08:00
class Scrcpy(ScrcpyCore, Uiautomator2):
def _scrcpy_resolution_check(self):
if not self._scrcpy_alive:
with self._scrcpy_control_socket_lock:
width, height = self.window_size_uiautomator2()
logger.attr('Screen_size', f'{width}x{height}')
if (width == 1280 and height == 720) or (width == 720 and height == 1280):
pass
else:
logger.critical(f'Resolution not supported: {width}x{height}')
logger.critical('Please set emulator resolution to 1280x720')
raise RequestHumanTakeover
@retry
def screenshot_scrcpy(self):
2023-01-07 02:43:52 +08:00
self._scrcpy_resolution_check()
self.scrcpy_ensure_running()
with self._scrcpy_control_socket_lock:
# Wait new frame
now = time.time()
while 1:
time.sleep(0.001)
if self._scrcpy_stream_loop_thread is None or not self._scrcpy_stream_loop_thread.is_alive():
raise ScrcpyError('_scrcpy_stream_loop_thread died')
if self._scrcpy_last_frame_time > now:
screenshot = self._scrcpy_last_frame.copy()
return screenshot
@retry
def click_scrcpy(self, x, y):
self.scrcpy_ensure_running()
with self._scrcpy_control_socket_lock:
self._scrcpy_control.touch(x, y, const.ACTION_DOWN)
self._scrcpy_control.touch(x, y, const.ACTION_UP)
self.sleep(0.05)
@retry
def long_click_scrcpy(self, x, y, duration=1.0):
self.scrcpy_ensure_running()
with self._scrcpy_control_socket_lock:
self._scrcpy_control.touch(x, y, const.ACTION_DOWN)
self.sleep(duration)
self._scrcpy_control.touch(x, y, const.ACTION_UP)
self.sleep(0.05)
@retry
def swipe_scrcpy(self, p1, p2):
self.scrcpy_ensure_running()
with self._scrcpy_control_socket_lock:
# Unlike minitouch, scrcpy swipes needs to be continuous
# So 5 times smother
points = insert_swipe(p0=p1, p3=p2, speed=4, min_distance=2)
self._scrcpy_control.touch(*p1, const.ACTION_DOWN)
for point in points[1:-1]:
self._scrcpy_control.touch(*point, const.ACTION_MOVE)
self.sleep(0.002)
self._scrcpy_control.touch(*p2, const.ACTION_MOVE)
self._scrcpy_control.touch(*p2, const.ACTION_UP)
self.sleep(0.05)
@retry
def drag_scrcpy(self, p1, p2, point_random=(-10, -10, 10, 10)):
self.scrcpy_ensure_running()
with self._scrcpy_control_socket_lock:
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
points = insert_swipe(p0=p1, p3=p2, speed=4, min_distance=2)
self._scrcpy_control.touch(*p1, const.ACTION_DOWN)
for point in points[1:-1]:
self._scrcpy_control.touch(*point, const.ACTION_MOVE)
self.sleep(0.002)
# Hold 280ms
for _ in range(int(0.14 // 0.002) * 2):
self._scrcpy_control.touch(*p2, const.ACTION_MOVE)
self.sleep(0.002)
self._scrcpy_control.touch(*p2, const.ACTION_UP)
self.sleep(0.05)