Source code for Aeros.threading

import threading
import ctypes

[docs]class AdvancedThread(threading.Thread): """ This thread class extends the pure-python "Thread" class by a stop() function to terminate the thread's run() method. """ def __init__(self, *args, **kwargs): threading.Thread.__init__(self, *args, **kwargs) def __get_id(self): """ Get's the threads PID. """ if hasattr(self, '_thread_id'): return self._thread_id for id, thread in threading._active.items(): if thread is self: return id
[docs] def stop(self): """ Sends a kill signal to the given thread """ thread_id = self.__get_id() res = ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, ctypes.py_object(SystemExit)) if res > 1: ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0) print('Exception raise failure')