# Copyright (c) 2012-2014 Erik Johansson <erik@ejohansson.se>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation; either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
# USA
from ctypes import c_bool, c_char_p, c_int, c_ubyte, c_void_p
from ctypes import byref, cast, create_string_buffer, POINTER, sizeof
import platform
import threading
import tellcore.constants as const
if platform.system() == 'Windows':
from ctypes import WINFUNCTYPE as FUNCTYPE, windll as DllLoader
LIBRARY_NAME = 'TelldusCore.dll'
else:
from ctypes import CFUNCTYPE as FUNCTYPE, cdll as DllLoader
if platform.system() == 'Darwin':
from ctypes.util import find_library
LIBRARY_NAME = find_library('TelldusCore') or \
'/Library/Frameworks/TelldusCore.framework/TelldusCore'
else:
LIBRARY_NAME = 'libtelldus-core.so.2'
DEVICE_EVENT_FUNC = FUNCTYPE(
None, c_int, c_int, c_char_p, c_int, c_void_p)
DEVICE_CHANGE_EVENT_FUNC = FUNCTYPE(
None, c_int, c_int, c_int, c_int, c_void_p)
RAW_DEVICE_EVENT_FUNC = FUNCTYPE(
None, c_char_p, c_int, c_int, c_void_p)
SENSOR_EVENT_FUNC = FUNCTYPE(
None, c_char_p, c_char_p, c_int, c_int, c_char_p, c_int, c_int, c_void_p)
CONTROLLER_EVENT_FUNC = FUNCTYPE(
None, c_int, c_int, c_int, c_char_p, c_int, c_void_p)
[docs]class TelldusError(Exception):
"""Error returned from Telldus Core API.
Automatically raised when a function in the C API returns an error.
Attributes:
error: The error code constant (one of TELLSTICK_ERROR_* from
:mod:`tellcore.constants`).
"""
def __init__(self, error, lib=None):
super(TelldusError, self).__init__()
self.error = error
self.lib = lib or Library()
[docs] def __str__(self):
"""Return the human readable error string."""
msg = self.lib.tdGetErrorString(self.error)
return "%s (%d)" % (msg, self.error)
[docs]class BaseCallbackDispatcher(object):
"""Base callback dispatcher class.
Inherit from this class and override the :func:`on_callback` method to
change how callbacks are dispatched.
"""
[docs] def on_callback(self, callback, *args):
"""Called from the callback thread when an event is received.
:param callable callback: The callback function to call.
:param args: The arguments to pass to the callback.
"""
raise NotImplementedError
[docs]class DirectCallbackDispatcher(BaseCallbackDispatcher):
"""Dispatches callbacks directly.
Since the callback is dispatched directly, the callback is called in the
callback thread.
"""
def on_callback(self, callback, *args):
callback(*args)
[docs]class Library(object):
"""Wrapper around the Telldus Core C API.
With the exception of tdInit, tdClose and tdReleaseString, all functions in
the C API (see `Telldus Core documentation
<http://developer.telldus.com/doxygen/group__core.html>`_) can be
called. The parameters are the same as in the C API documentation. The
return value are mostly the same as for the C API, except for functions
with multiple out parameters.
In addition, this class:
* automatically frees memory for strings returned from the C API,
* converts errors returned from functions into
(:class:`TelldusError`) exceptions,
* transparently converts between Python strings and C style strings.
"""
STRING_ENCODING = 'utf-8'
DECODE_STRINGS = True
class c_string_p(c_char_p):
def __init__(self, param):
c_char_p.__init__(self, param.encode(Library.STRING_ENCODING))
@classmethod
def from_param(cls, param):
if type(param) is str:
return cls(param)
try:
if type(param) is unicode:
return cls(param)
except NameError:
pass # The unicode type does not exist in python 3
return c_char_p.from_param(param)
# Must be a separate class (i.e. not part of Library), to avoid circular
# references when saving the wrapper callback function in a class with a
# destructor, as the destructor is not called in that case.
class CallbackWrapper(object):
def __init__(self, dispatcher):
self._callbacks = {}
self._lock = threading.Lock()
self._dispatcher = dispatcher
def get_callback_ids(self):
with self._lock:
return list(self._callbacks.keys())
def register_callback(self, registrator, functype, callback):
wrapper = functype(self._callback)
with self._lock:
cid = registrator(wrapper, None)
self._callbacks[cid] = (wrapper, callback)
return cid
def unregister_callback(self, cid):
with self._lock:
del self._callbacks[cid]
def _callback(self, *in_args):
args = []
# Convert all char* parameters (i.e. bytes) to proper python
# strings
for arg in in_args:
if type(arg) is bytes:
args.append(arg.decode(Library.STRING_ENCODING))
else:
args.append(arg)
# Get the real callback and the dispatcher
with self._lock:
try:
# args[-2] is callback id
(wrapper, callback) = self._callbacks[args[-2]]
except KeyError:
return
dispatcher = self._dispatcher
# Dispatch the callback, dropping the last parameter which is the
# context and always None.
try:
dispatcher.on_callback(callback, *args[:-1])
except:
pass
_lib = None
_refcount = 0
_functions = {
'tdInit': [None, []],
'tdClose': [None, []],
'tdReleaseString': [None, [c_void_p]],
'tdGetErrorString': [c_char_p, [c_int]],
'tdRegisterDeviceEvent':
[c_int, [DEVICE_EVENT_FUNC, c_void_p]],
'tdRegisterDeviceChangeEvent':
[c_int, [DEVICE_CHANGE_EVENT_FUNC, c_void_p]],
'tdRegisterRawDeviceEvent':
[c_int, [RAW_DEVICE_EVENT_FUNC, c_void_p]],
'tdRegisterSensorEvent':
[c_int, [SENSOR_EVENT_FUNC, c_void_p]],
'tdRegisterControllerEvent':
[c_int, [CONTROLLER_EVENT_FUNC, c_void_p]],
'tdUnregisterCallback': [c_int, [c_int]],
'tdTurnOn': [c_int, [c_int]],
'tdTurnOff': [c_int, [c_int]],
'tdBell': [c_int, [c_int]],
'tdDim': [c_int, [c_int, c_ubyte]],
'tdExecute': [c_int, [c_int]],
'tdUp': [c_int, [c_int]],
'tdDown': [c_int, [c_int]],
'tdStop': [c_int, [c_int]],
'tdLearn': [c_int, [c_int]],
'tdMethods': [c_int, [c_int, c_int]],
'tdLastSentCommand': [c_int, [c_int, c_int]],
'tdLastSentValue': [c_char_p, [c_int]],
'tdGetNumberOfDevices': [c_int, []],
'tdGetDeviceId': [c_int, [c_int]],
'tdGetDeviceType': [c_int, [c_int]],
'tdGetName': [c_char_p, [c_int]],
'tdSetName': [c_bool, [c_int, c_string_p]],
'tdGetProtocol': [c_char_p, [c_int]],
'tdSetProtocol': [c_bool, [c_int, c_string_p]],
'tdGetModel': [c_char_p, [c_int]],
'tdSetModel': [c_bool, [c_int, c_string_p]],
'tdGetDeviceParameter': [c_char_p, [c_int, c_string_p, c_string_p]],
'tdSetDeviceParameter': [c_bool, [c_int, c_string_p, c_string_p]],
'tdAddDevice': [c_int, []],
'tdRemoveDevice': [c_bool, [c_int]],
'tdSendRawCommand': [c_int, [c_string_p, c_int]],
'tdConnectTellStickController': [None, [c_int, c_int, c_string_p]],
'tdDisconnectTellStickController': [None, [c_int, c_int, c_string_p]],
'tdSensor': [c_int, [c_char_p, c_int, c_char_p, c_int,
POINTER(c_int), POINTER(c_int)]],
'tdSensorValue': [c_int, [c_string_p, c_string_p, c_int, c_int,
c_char_p, c_int, POINTER(c_int)]],
'tdController': [c_int, [POINTER(c_int), POINTER(c_int),
c_char_p, c_int, POINTER(c_int)]],
'tdControllerValue': [c_int, [c_int, c_string_p, c_char_p, c_int]],
'tdSetControllerValue': [c_int, [c_int, c_string_p, c_string_p]],
'tdRemoveController': [c_int, [c_int]],
}
def _to_str(self, char_p):
return char_p.value.decode(Library.STRING_ENCODING)
def _setup_functions(self, lib):
def check_int_result(result, func, args):
if result < 0:
raise TelldusError(result)
return result
def check_bool_result(result, func, args):
if not result:
raise TelldusError(const.TELLSTICK_ERROR_DEVICE_NOT_FOUND)
return result
def free_string(result, func, args):
string = cast(result, c_char_p).value
if string is not None:
lib.tdReleaseString(result)
if Library.DECODE_STRINGS:
string = string.decode(Library.STRING_ENCODING)
return string
for name, signature in Library._functions.items():
try:
func = getattr(lib, name)
func.restype = signature[0]
func.argtypes = signature[1]
if func.restype == c_int:
func.errcheck = check_int_result
elif func.restype == c_bool:
func.errcheck = check_bool_result
elif func.restype == c_char_p:
func.restype = c_void_p
func.errcheck = free_string
except AttributeError:
# Older version of the lib don't have all the functions
pass
[docs] def __init__(self, name=None, callback_dispatcher=None):
"""Load and initialize the Telldus core library.
The underlaying library is only initialized the first time this object
is created. Subsequent instances uses the same underlaying library
instance.
:param str name: If None than the platform specific name of the
Telldus library is used, but it can be e.g. an absolute path.
:param callback_dispatcher: If callbacks are to be used, this parameter
must refer to an instance of a class inheriting from
:class:`BaseCallbackDispatcher`.
"""
super(Library, self).__init__()
if not Library._lib:
assert Library._refcount == 0
if name is None:
name = LIBRARY_NAME
lib = DllLoader.LoadLibrary(name)
self._setup_functions(lib)
lib.tdInit()
Library._lib = lib
Library._refcount += 1
if callback_dispatcher is not None:
self._callback_wrapper = Library.CallbackWrapper(
callback_dispatcher)
else:
self._callback_wrapper = None
[docs] def __del__(self):
"""Close and unload the Telldus core library.
Any callback set up is removed.
The underlaying library is only closed and unloaded if this is the last
instance sharing the same underlaying library instance.
"""
# Happens if the LoadLibrary call fails
if not Library._lib:
assert Library._refcount == 0
return
assert Library._refcount >= 1
Library._refcount -= 1
if self._callback_wrapper is not None:
for cid in self._callback_wrapper.get_callback_ids():
try:
self.tdUnregisterCallback(cid)
except:
pass
if Library._refcount != 0:
return
# telldus-core before v2.1.2 (where tdController was added) does not
# handle re-initialization after tdClose has been called (see Telldus
# ticket 188).
if hasattr(Library._lib, "tdController"):
Library._lib.tdClose()
Library._lib = None
def __getattr__(self, name):
if name == 'callback_dispatcher':
return self._callback_wrapper._dispatcher
if name in Library._functions:
return getattr(self._lib, name)
raise AttributeError(name)
def tdInit(self):
raise NotImplementedError('should not be called explicitly')
def tdClose(self):
raise NotImplementedError('should not be called explicitly')
def tdReleaseString(self, string):
raise NotImplementedError('should not be called explicitly')
def tdRegisterDeviceEvent(self, callback):
assert(self._callback_wrapper is not None)
return self._callback_wrapper.register_callback(
self._lib.tdRegisterDeviceEvent, DEVICE_EVENT_FUNC, callback)
def tdRegisterDeviceChangeEvent(self, callback):
assert(self._callback_wrapper is not None)
return self._callback_wrapper.register_callback(
self._lib.tdRegisterDeviceChangeEvent, DEVICE_CHANGE_EVENT_FUNC,
callback)
def tdRegisterRawDeviceEvent(self, callback):
assert(self._callback_wrapper is not None)
return self._callback_wrapper.register_callback(
self._lib.tdRegisterRawDeviceEvent, RAW_DEVICE_EVENT_FUNC,
callback)
def tdRegisterSensorEvent(self, callback):
assert(self._callback_wrapper is not None)
return self._callback_wrapper.register_callback(
self._lib.tdRegisterSensorEvent, SENSOR_EVENT_FUNC, callback)
def tdRegisterControllerEvent(self, callback):
assert(self._callback_wrapper is not None)
return self._callback_wrapper.register_callback(
self._lib.tdRegisterControllerEvent, CONTROLLER_EVENT_FUNC,
callback)
def tdUnregisterCallback(self, cid):
assert(self._callback_wrapper is not None)
self._callback_wrapper.unregister_callback(cid)
self._lib.tdUnregisterCallback(cid)
[docs] def tdSensor(self):
"""Get the next sensor while iterating.
:return: a dict with the keys: protocol, model, id, datatypes.
"""
protocol = create_string_buffer(20)
model = create_string_buffer(20)
sid = c_int()
datatypes = c_int()
self._lib.tdSensor(protocol, sizeof(protocol), model, sizeof(model),
byref(sid), byref(datatypes))
return {'protocol': self._to_str(protocol),
'model': self._to_str(model),
'id': sid.value, 'datatypes': datatypes.value}
[docs] def tdSensorValue(self, protocol, model, sid, datatype):
"""Get the sensor value for a given sensor.
:return: a dict with the keys: value, timestamp.
"""
value = create_string_buffer(20)
timestamp = c_int()
self._lib.tdSensorValue(protocol, model, sid, datatype,
value, sizeof(value), byref(timestamp))
return {'value': self._to_str(value), 'timestamp': timestamp.value}
[docs] def tdController(self):
"""Get the next controller while iterating.
:return: a dict with the keys: id, type, name, available.
"""
cid = c_int()
ctype = c_int()
name = create_string_buffer(255)
available = c_int()
self._lib.tdController(byref(cid), byref(ctype), name, sizeof(name),
byref(available))
return {'id': cid.value, 'type': ctype.value,
'name': self._to_str(name), 'available': available.value}
def tdControllerValue(self, cid, name):
value = create_string_buffer(255)
self._lib.tdControllerValue(cid, name, value, sizeof(value))
return self._to_str(value)