# SPDX-FileCopyrightText: 2022-present Artur Drogunow <artur.drogunow@zf.com>
#
# SPDX-License-Identifier: MIT
import copy
import ctypes
import fnmatch
import os.path
from typing import Dict, List, NamedTuple, Optional
from .calibration_object import CalibrationObject, get_calibration_object
from .cnp_api.cnp_class import (
DBFileInfo,
MeasurementListEntries,
TAsap3Hdl,
TModulHdl,
TScriptHdl,
TTaskInfo2,
enum_type,
)
from .cnp_api.cnp_constants import DBFileType, DriverType, TAsap3DBOType, TAsap3ECUState
from .cnp_api.cnp_prototype import CANapeDll
from .config import RC
from .ecu_task import EcuTask
from .script import Script
[docs]
class MeasurementListEntry(NamedTuple):
task_id: int
rate: int
save_flag: bool
disabled: bool
object_name: str
[docs]
class DatabaseInfo(NamedTuple):
file_name: str
file_path: str
file_type: DBFileType
[docs]
class Module:
def __init__(
self,
dll: CANapeDll,
asap3_handle: TAsap3Hdl, # type: ignore[valid-type]
module_handle: TModulHdl,
) -> None:
"""The :class:`~pycanape.module.Module` class is not meant to be instantiated
by the user. Instead, :class:`~pycanape.module.Module` instances are returned by
:meth:`~pycanape.canape.CANape.create_module`, :meth:`~pycanape.canape.CANape.get_module_by_index`
and :meth:`~pycanape.canape.CANape.get_module_by_name`.
:param asap3_handle:
:param module_handle:
"""
self._dll = dll
self.asap3_handle = asap3_handle
self.module_handle = module_handle
self._objects_cache: Optional[List[str]] = None
[docs]
def get_database_info(self) -> DatabaseInfo:
"""Get Info concerning the database file."""
cnp_info = DBFileInfo()
self._dll.Asap3GetDatabaseInfo(
self.asap3_handle,
self.module_handle,
ctypes.byref(cnp_info),
)
db_info = DatabaseInfo(
file_name=cnp_info.asap2Fname.decode(RC["ENCODING"]),
file_path=cnp_info.asap2Path.decode(RC["ENCODING"]),
file_type=DBFileType(cnp_info.type),
)
return db_info
[docs]
def get_database_path(self) -> str:
"""Get path to database file."""
db_info = self.get_database_info()
return os.path.join(db_info.file_path, db_info.file_name)
[docs]
def is_module_active(self) -> bool:
"""Return the activation state of the module.
:return:
activation state
"""
active = ctypes.c_bool()
self._dll.Asap3IsModuleActive(
self.asap3_handle,
self.module_handle,
ctypes.byref(active),
)
return active.value
[docs]
def module_activation(self, activate: bool = True) -> None:
"""Switches the module activation state.
:param activate:
True -> activate Module
False -> deactivate Module
"""
self._dll.Asap3ModuleActivation(
self.asap3_handle,
self.module_handle,
activate,
)
[docs]
def is_ecu_online(self) -> bool:
"""Asks CANape whether a ECU is online or offline"""
ecu_state = enum_type()
self._dll.Asap3IsECUOnline(
self.asap3_handle, self.module_handle, ctypes.byref(ecu_state)
)
if ecu_state.value == TAsap3ECUState.TYPE_SWITCH_ONLINE:
return True
return False
[docs]
def switch_ecu_on_offline(self, online: bool, download: bool = True) -> None:
"""Switches an ECU from online to offline and vice versa.
:param online:
Switch ECU online if True, switch ECU offline if False
:param download:
if this parameter is set to true CANape will
execute an download in case of online = True
"""
ecu_state = (
TAsap3ECUState.TYPE_SWITCH_ONLINE
if online
else TAsap3ECUState.TYPE_SWITCH_OFFLINE
)
self._dll.Asap3ECUOnOffline(
self.asap3_handle,
self.module_handle,
ecu_state,
download,
)
[docs]
def get_module_name(self) -> str:
"""Get name of module."""
buffer = ctypes.c_char_p()
ptr = ctypes.pointer(buffer)
self._dll.Asap3GetModuleName(
self.asap3_handle,
self.module_handle,
ptr,
)
return buffer.value.decode(RC["ENCODING"]) # type: ignore[union-attr]
[docs]
def get_communication_type(self) -> str:
"""Get current communication type (e.g. "CAN")."""
buffer = ctypes.c_char_p()
self._dll.Asap3GetCommunicationType(
self.asap3_handle,
self.module_handle,
ctypes.pointer(buffer),
)
return buffer.value.decode(RC["ENCODING"]) # type: ignore[union-attr]
[docs]
def get_database_objects(self) -> List[str]:
"""Get a list of all object names in database.
:return:
List of object names
"""
if self._objects_cache is None:
length = ctypes.c_ulong(0)
self._dll.Asap3GetDatabaseObjects(
self.asap3_handle,
self.module_handle,
None,
ctypes.byref(length),
TAsap3DBOType.DBTYPE_ALL,
)
buffer = ctypes.create_string_buffer(length.value)
self._dll.Asap3GetDatabaseObjects(
self.asap3_handle,
self.module_handle,
buffer,
ctypes.byref(length),
TAsap3DBOType.DBTYPE_ALL,
)
self._objects_cache = (
buffer.value.strip(b";").decode(RC["ENCODING"]).split(";")
)
return copy.copy(self._objects_cache)
[docs]
def get_ecu_tasks(self) -> Dict[str, EcuTask]:
"""Get available data acquisition tasks.
:return:
A dictionary with the ecu task description as keys
and the `EcuTask` instances as values
"""
task_info_array = (TTaskInfo2 * 32)()
ptr_task_no = ctypes.pointer(ctypes.c_ushort())
self._dll.Asap3GetEcuTasks2(
self.asap3_handle,
self.module_handle,
task_info_array,
ptr_task_no,
32,
)
cnp_task_info_list = [*task_info_array][: ptr_task_no.contents.value]
def get_task_instance(task_info: TTaskInfo2) -> EcuTask:
return EcuTask(
dll=self._dll,
asap3_handle=self.asap3_handle,
module_handle=self.module_handle,
task_info=task_info,
)
ecu_task_list = [get_task_instance(ti) for ti in cnp_task_info_list]
return {et.description: et for et in ecu_task_list}
[docs]
def get_network_name(self) -> str:
"""Receives the name of the used network."""
size = ctypes.c_uint()
self._dll.Asap3GetNetworkName(
self.asap3_handle,
self.module_handle,
None,
ctypes.byref(size),
)
buffer = ctypes.create_string_buffer(size.value)
self._dll.Asap3GetNetworkName(
self.asap3_handle,
self.module_handle,
buffer,
ctypes.byref(size),
)
return buffer.value.decode(RC["ENCODING"])
[docs]
def get_ecu_driver_type(self) -> DriverType:
"""Retrieves the drivertype of an ECU.
:return:
DriverType enum
"""
c_driver_type = enum_type()
self._dll.Asap3GetEcuDriverType(
self.asap3_handle,
self.module_handle,
ctypes.byref(c_driver_type),
)
return DriverType(c_driver_type.value)
[docs]
def get_calibration_object(self, name: str) -> CalibrationObject:
"""Get calibration object by name or wildcard pattern (e.g. '\\*InitReset')."""
if "*" in name:
filtered = fnmatch.filter(names=self.get_database_objects(), pat=name)
if len(filtered) == 1:
name = filtered[0]
return get_calibration_object(
dll=self._dll,
asap3_handle=self.asap3_handle,
module_handle=self.module_handle,
name=name,
)
[docs]
def has_resume_mode(self) -> bool:
"""Information about the Resume mode.
:return:
Function returns True if the device supports resume mode.
"""
bln = ctypes.c_bool()
self._dll.Asap3HasResumeMode(
self.asap3_handle,
self.module_handle,
ctypes.byref(bln),
)
return bln.value
[docs]
def get_measurement_list_entries(self) -> Dict[str, MeasurementListEntry]:
"""Retrieve the entries from the CANape Measurement list.
:return:
A `dict` of Measurement list entries that uses the object name of
the entries as key
"""
entries = MeasurementListEntries()
ptr = ctypes.pointer(ctypes.pointer(entries))
self._dll.Asap3GetMeasurementListEntries(
self.asap3_handle,
self.module_handle,
ptr,
)
entries = ptr.contents[0]
res = {}
for idx in range(entries.ItemCount):
c_entry = entries.Entries[idx][0]
mle = MeasurementListEntry(
task_id=c_entry.taskId,
rate=c_entry.rate,
save_flag=c_entry.SaveFlag,
disabled=c_entry.Disabled,
object_name=c_entry.ObjectName.decode(RC["ENCODING"]),
)
res[mle.object_name] = mle
return res
[docs]
def reset_data_acquisition_channels_by_module(self) -> None:
"""Clears the data acquisition channel list of a specific module.
.. note::
this function only clears these measurement objects from the
API-Measurement-List which are defined by API
"""
self._dll.Asap3ResetDataAcquisitionChnlsByModule(
self.asap3_handle,
self.module_handle,
)
[docs]
def release_module(self) -> None:
self._dll.Asap3ReleaseModule(
self.asap3_handle,
self.module_handle,
)
[docs]
def execute_script_ex(self, script_file: bool, script: str) -> Script:
"""Execute a script file or a single script command.
:param script_file:
Declares interpretation of parameter script.
If 'script_file' is true, parameter 'script' is
interpreted as the file name of the script file
to be executed. If 'script_file' is false,
'script' is interpreted as a single script command
:param script:
A script filename or a single script command
:return:
The instance of the Script class
"""
script_handle = TScriptHdl()
self._dll.Asap3ExecuteScriptEx(
self.asap3_handle,
self.module_handle,
script_file,
script.encode(RC["ENCODING"]),
ctypes.byref(script_handle),
)
return Script(
dll=self._dll, asap3_handle=self.asap3_handle, script_handle=script_handle
)