1
0
mirror of https://gitee.com/sui-feng-cb/AzurLaneAutoScript1 synced 2026-03-09 18:39:04 +08:00
AzurLaneAutoScript/module/device/method/uiautomator_2.py

326 lines
10 KiB
Python
Raw Normal View History

2022-12-04 23:40:32 +08:00
import typing as t
from dataclasses import dataclass
from functools import wraps
from json.decoder import JSONDecodeError
2022-12-04 23:40:32 +08:00
from subprocess import list2cmdline
import uiautomator2 as u2
from adbutils.errors import AdbError
from lxml import etree
from module.base.utils import *
from module.device.connection import Connection
2023-01-18 01:05:54 +08:00
from module.device.method.utils import (RETRY_TRIES, retry_sleep, handle_adb_error,
ImageTruncated, PackageNotInstalled, possible_reasons)
from module.exception import RequestHumanTakeover
from module.logger import logger
def retry(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
"""
Args:
self (Uiautomator2):
"""
init = None
for _ in range(RETRY_TRIES):
try:
if callable(init):
2022-12-29 19:22:55 +08:00
retry_sleep(_)
init()
return func(self, *args, **kwargs)
# Can't handle
except RequestHumanTakeover:
break
# When adb server was killed
except ConnectionResetError as e:
logger.error(e)
def init():
self.adb_reconnect()
# In `device.set_new_command_timeout(604800)`
# json.decoder.JSONDecodeError: Expecting value: line 1 column 2 (char 1)
except JSONDecodeError as e:
logger.error(e)
def init():
self.install_uiautomator2()
# AdbError
except AdbError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# RuntimeError: USB device 127.0.0.1:5555 is offline
except RuntimeError as e:
if handle_adb_error(e):
def init():
self.adb_reconnect()
else:
break
# In `assert c.read string(4) == _OKAY`
# ADB on emulator not enabled
except AssertionError as e:
logger.exception(e)
possible_reasons(
'If you are using BlueStacks or LD player or WSA, '
'please enable ADB in the settings of your emulator'
)
break
# Package not installed
except PackageNotInstalled as e:
logger.error(e)
def init():
self.detect_package()
2023-01-18 01:05:54 +08:00
# ImageTruncated
except ImageTruncated as e:
logger.error(e)
def init():
pass
# Unknown
except Exception as e:
logger.exception(e)
def init():
pass
logger.critical(f'Retry {func.__name__}() failed')
raise RequestHumanTakeover
return retry_wrapper
2022-12-04 23:40:32 +08:00
@dataclass
class ProcessInfo:
pid: int
ppid: int
thread_count: int
cmdline: str
name: str
@dataclass
class ShellBackgroundResponse:
success: bool
pid: int
description: str
class Uiautomator2(Connection):
@retry
def screenshot_uiautomator2(self):
image = self.u2.screenshot(format='raw')
image = np.frombuffer(image, np.uint8)
2023-01-18 01:05:54 +08:00
if image is None:
raise ImageTruncated('Empty image after reading from buffer')
image = cv2.imdecode(image, cv2.IMREAD_COLOR)
2023-01-18 01:05:54 +08:00
if image is None:
raise ImageTruncated('Empty image after cv2.imdecode')
cv2.cvtColor(image, cv2.COLOR_BGR2RGB, dst=image)
2023-01-18 01:05:54 +08:00
if image is None:
raise ImageTruncated('Empty image after cv2.cvtColor')
return image
@retry
def click_uiautomator2(self, x, y):
self.u2.click(x, y)
@retry
def long_click_uiautomator2(self, x, y, duration=(1, 1.2)):
self.u2.long_click(x, y, duration=duration)
@retry
def swipe_uiautomator2(self, p1, p2, duration=0.1):
self.u2.swipe(*p1, *p2, duration=duration)
@retry
def _drag_along(self, path):
"""Swipe following path.
Args:
path (list): (x, y, sleep)
Examples:
al.drag_along([
(403, 421, 0.2),
(821, 326, 0.1),
(821, 326-10, 0.1),
(821, 326+10, 0.1),
(821, 326, 0),
])
Equals to:
al.device.touch.down(403, 421)
time.sleep(0.2)
al.device.touch.move(821, 326)
time.sleep(0.1)
al.device.touch.move(821, 326-10)
time.sleep(0.1)
al.device.touch.move(821, 326+10)
time.sleep(0.1)
al.device.touch.up(821, 326)
"""
length = len(path)
for index, data in enumerate(path):
x, y, second = data
if index == 0:
self.u2.touch.down(x, y)
logger.info(point2str(x, y) + ' down')
elif index - length == -1:
self.u2.touch.up(x, y)
logger.info(point2str(x, y) + ' up')
else:
self.u2.touch.move(x, y)
logger.info(point2str(x, y) + ' move')
self.sleep(second)
def drag_uiautomator2(self, p1, p2, segments=1, shake=(0, 15), point_random=(-10, -10, 10, 10),
shake_random=(-5, -5, 5, 5), swipe_duration=0.25, shake_duration=0.1):
"""Drag and shake, like:
/\
+-----------+ + +
\/
A simple swipe or drag don't work well, because it only has two points.
Add some way point to make it more like swipe.
Args:
p1 (tuple): Start point, (x, y).
p2 (tuple): End point, (x, y).
segments (int):
shake (tuple): Shake after arrive end point.
point_random: Add random to start point and end point.
shake_random: Add random to shake array.
swipe_duration: Duration between way points.
shake_duration: Duration between shake points.
"""
p1 = np.array(p1) - random_rectangle_point(point_random)
p2 = np.array(p2) - random_rectangle_point(point_random)
path = [(x, y, swipe_duration) for x, y in random_line_segments(p1, p2, n=segments, random_range=point_random)]
path += [
(*p2 + shake + random_rectangle_point(shake_random), shake_duration),
(*p2 - shake - random_rectangle_point(shake_random), shake_duration),
(*p2, shake_duration)
]
path = [(int(x), int(y), d) for x, y, d in path]
self._drag_along(path)
@retry
def app_current_uiautomator2(self):
"""
Returns:
str: Package name.
"""
result = self.u2.app_current()
return result['package']
@retry
def app_start_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.package
try:
self.u2.app_start(package_name)
except u2.exceptions.BaseError as e:
# BaseError: package "com.bilibili.azurlane" not found
logger.error(e)
raise PackageNotInstalled(package_name)
@retry
def app_stop_uiautomator2(self, package_name=None):
if not package_name:
package_name = self.package
self.u2.app_stop(package_name)
@retry
def dump_hierarchy_uiautomator2(self) -> etree._Element:
content = self.u2.dump_hierarchy(compressed=True)
hierarchy = etree.fromstring(content.encode('utf-8'))
return hierarchy
2022-12-04 23:40:32 +08:00
2023-01-07 02:43:52 +08:00
@retry
def resolution_uiautomator2(self) -> t.Tuple[int, int]:
2023-01-07 02:43:52 +08:00
"""
2023-02-08 01:53:04 +08:00
Faster u2.window_size(), cause that calls `dumpsys display` twice.
2023-01-07 02:43:52 +08:00
Returns:
(width, height)
"""
2023-02-08 01:53:04 +08:00
info = self.u2.http.get('/info').json()
w, h = info['display']['width'], info['display']['height']
rotation = self.get_orientation()
if (w > h) != (rotation % 2 == 1):
w, h = h, w
return w, h
2023-01-07 02:43:52 +08:00
def resolution_check_uiautomator2(self):
"""
Alas does not actively check resolution but the width and height of screenshots.
However, some screenshot methods do not provide device resolution, so check it here.
2023-02-08 01:53:04 +08:00
Returns:
(width, height)
Raises:
RequestHumanTakeover: If resolution is not 1280x720
"""
width, height = self.resolution_uiautomator2()
logger.attr('Screen_size', f'{width}x{height}')
if width == 1280 and height == 720:
2023-02-08 01:53:04 +08:00
return (width, height)
if width == 720 and height == 1280:
2023-02-08 01:53:04 +08:00
return (width, height)
logger.critical(f'Resolution not supported: {width}x{height}')
logger.critical('Please set emulator resolution to 1280x720')
raise RequestHumanTakeover
2022-12-04 23:40:32 +08:00
@retry
def proc_list_uiautomator2(self) -> t.List[ProcessInfo]:
2022-12-04 23:40:32 +08:00
"""
Get info about current processes.
"""
resp = self.u2.http.get("/proc/list", timeout=10)
resp.raise_for_status()
result = [
ProcessInfo(
pid=proc['pid'],
ppid=proc['ppid'],
thread_count=proc['threadCount'],
cmdline=' '.join(proc['cmdline']) if proc['cmdline'] is not None else '',
2022-12-04 23:40:32 +08:00
name=proc['name'],
) for proc in resp.json()
]
return result
@retry
def u2_shell_background(self, cmdline, timeout=10) -> ShellBackgroundResponse:
"""
Run at background.
Note that this function will always return a success response,
as this is a untested and hidden method in ATX.
"""
if isinstance(cmdline, (list, tuple)):
cmdline = list2cmdline(cmdline)
elif isinstance(cmdline, str):
cmdline = cmdline
else:
raise TypeError("cmdargs type invalid", type(cmdline))
data = dict(command=cmdline, timeout=str(timeout))
ret = self.u2.http.post("/shell/background", data=data, timeout=timeout + 10)
ret.raise_for_status()
resp = ret.json()
resp = ShellBackgroundResponse(
success=bool(resp.get('success', False)),
pid=resp.get('pid', 0),
description=resp.get('description', '')
)
return resp