Attachment 'Casing.py'

Download

   1 #	Programmer:	limodou
   2 #	E-mail:		limodou@gmail.com
   3 #
   4 #	Copyleft 2005 limodou
   5 #
   6 #	Distributed under the terms of the GPL (GNU Public License)
   7 #
   8 #   Casing is free software; you can redistribute it and/or modify
   9 #   it under the terms of the GNU General Public License as published by
  10 #   the Free Software Foundation; either version 2 of the License, or
  11 #   (at your option) any later version.
  12 #
  13 #   This program is distributed in the hope that it will be useful,
  14 #   but WITHOUT ANY WARRANTY; without even the implied warranty of
  15 #   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  16 #   GNU General Public License for more details.
  17 #
  18 #   You should have received a copy of the GNU General Public License
  19 #   along with this program; if not, write to the Free Software
  20 #   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
  21 #
  22 #	$Id$
  23 #   Version = 0.1
  24 #   Update:
  25 #     
  26 
  27 import threading
  28 import time
  29 import Queue
  30 
  31 class DUMP_CLASS:pass
  32 class AbortException(Exception):pass
  33 
  34 class SyncVar(object):
  35     def __init__(self):
  36         self.flag = False
  37         self.lock = threading.Lock()
  38         
  39     def set(self, flag=True):
  40         self.lock.acquire()
  41         self.flag = flag
  42         self.lock.release()
  43         
  44     def isset(self):
  45         return self.flag
  46     
  47     def get(self):
  48         return self.flag
  49     
  50     def clear(self):
  51         self.lock.acquire()
  52         self.flag = False
  53         self.lock.release()
  54         
  55     def __ne__(self, other):
  56         return self.flag != other
  57     
  58     def __eq__(self, other):
  59         return self.flag == other
  60 
  61     def __nonzero__(self):
  62         return bool(self.flag)
  63 
  64 class FuncThread(threading.Thread):
  65     def __init__(self, casing, syncvar, sync=False, kws={}):
  66         threading.Thread.__init__(self)
  67         self.casing = casing
  68         self.syncvar = syncvar
  69         self.sync = sync
  70         self.kws = kws
  71         
  72     def run(self):
  73         if self.sync:
  74             self.casing.sync_start(self.syncvar, kws=self.kws)
  75         else:
  76             self.casing.start(**self.kws)
  77         self.syncvar.clear()
  78     
  79 class ProcessThread(threading.Thread):
  80     def __init__(self, casing, syncvar, sync=False):
  81         threading.Thread.__init__(self)
  82         self.casing = casing
  83         self.syncvar = syncvar
  84         self.sync = sync
  85         
  86     def run(self):
  87         func, args, kwargs = self.casing.on_process
  88         if kwargs.has_key('timestep'):
  89             timestep = kwargs['timestep']
  90             del kwargs['timestep']
  91         else:
  92             timestep = 0.5
  93         while 1:
  94             if self.syncvar:
  95                 if self.sync:
  96                     kwargs['syncvar'] = self.syncvar
  97                 func(*args, **kwargs)
  98                 time.sleep(timestep)
  99             else:
 100                 break
 101 
 102 class MultiCasing(object):
 103     def __init__(self, size=10, need_manual_stop=False, timestep=1):
 104         self.on_finish = None
 105         self.on_process = None
 106         self.on_abort = None
 107         self.size = size
 108         self.need_manual_stop = need_manual_stop
 109         self.queue = Queue.Queue()
 110         self.active = []
 111         self.event = threading.Event()
 112         self.event.set()
 113         self._exit_flag = False
 114         self.thread_d = None
 115         self.timestep = timestep
 116         
 117     def start_thread(self):
 118         self.thread_d = d = Casing(self._start)
 119         if self.on_process:
 120             d.onprocess(self.on_process[0], *self.on_process[1], **self.on_process[2])
 121         d.start_thread()
 122     
 123     def start_sync_thread(self):
 124         self.thread_d = d = Casing(self._start, sync=True)
 125         if self.on_process:
 126             d.onprocess(self.on_process[0], *self.on_process[1], **self.on_process[2])
 127         d.start_sync_thread()
 128     
 129     def append(self, casing_obj):
 130         self.queue.put(casing_obj, block=True, timeout=1)
 131         
 132     def stop_thread(self):
 133         for obj in self.active:
 134             obj.stop_thread()
 135         self._exit_flag = True
 136         
 137     def _start(self, syncvar=None, sync=False):
 138         self._exit_flag = False
 139         self.running = True
 140         while not self._exit_flag:
 141             self.event.wait(self.timestep)
 142             if not self.queue.empty() and len(self.active) < self.size:
 143                 casing = self.queue.get()
 144                 self.active.append(casing)
 145                 casing.onsync(self._on_sync, casing)
 146                 if not sync:
 147                     casing.start_thread()
 148                 else:
 149                     casing.start_sync_thread()
 150             elif self.queue.empty() and not self.active: #not more thread obj need to run
 151                 if not self.need_manual_stop:
 152                     break
 153             elif len(self.active) == self.size:
 154                 self.event.clear()
 155         self.running = False
 156         if not self.active and self.queue.empty() and self.on_finish:
 157             self._run(self.on_finish)
 158         elif self.on_abort:
 159             self._run(self.on_abort)
 160                 
 161     def _on_sync(self, obj):
 162         self.active.remove(obj)
 163         self.event.set()
 164   
 165     def onfinish(self, func, *args, **kwargs):
 166         self.on_finish = func, args, kwargs
 167         
 168     def onprocess(self, func, *args, **kwargs):
 169         self.on_process = func, args, kwargs
 170 
 171     def onabort(self, func, *args, **kwargs):
 172         self.on_abort = func, args, kwargs
 173 
 174     def _run(self, func):
 175         f, args, kwargs = func
 176         return f(*args, **kwargs)
 177     
 178     def isactive(self):
 179         return self.thread_d.isactive()
 180 
 181     def running_count(self):
 182         return len(self.active)
 183 
 184     def remaining_count(self):
 185         return self.queue.qsize()
 186         
 187 class Casing(object):
 188     def __init__(self, func=None, *args, **kwargs):
 189         self.funcs = []
 190         if func:
 191             self.funcs.append((func, args, kwargs))
 192         self.on_success = None
 193         self.on_exception = None
 194         self.on_abort = None
 195         self.on_process = None
 196         self.on_sync = None     #used internal
 197         
 198         self.syncvar = None
 199         self.t_func = None
 200         self.p_func = None
 201         
 202     def __add__(self, obj):
 203         assert isinstance(obj, Casing)
 204         self.funcs.extend(obj.funcs)
 205         return self
 206         
 207     def __radd__(self, obj):
 208         assert isinstance(obj, Casing)
 209         self.funcs.extend(obj.funcs)
 210         return self
 211     
 212     def copy(self):
 213         obj = Casing()
 214         for name, value in vars(self).items():
 215             setattr(obj, name, self._deepcopy(value))
 216         return obj
 217     
 218     def _deepcopy(self, obj):
 219         if isinstance(obj, tuple):
 220             s = []
 221             for i in range(len(obj)):
 222                 s.append(self._deepcopy(obj[i]))
 223             return tuple(s)
 224         elif isinstance(obj, list):
 225             s = []
 226             for i in range(len(obj)):
 227                 s.append(self._deepcopy(obj[i]))
 228             return s
 229         elif isinstance(obj, dict):
 230             s = {}
 231             for k, v in obj.items():
 232                 s[k] = self._deepcopy(v)
 233             return s
 234         else:
 235             return obj
 236         
 237     def _update(self, src, kdict):
 238         for k, v in src.items():
 239             if kdict.has_key(k):
 240                 src[k] = kdict[k]
 241 
 242     def append(self, func, *args, **kwargs):
 243         self.funcs.append((func, args, kwargs))
 244         
 245     def onsuccess(self, func, *args, **kwargs):
 246         self.on_success = func, args, kwargs
 247         
 248     def onexception(self, func, *args, **kwargs):
 249         self.on_exception = func, args, kwargs
 250         
 251     def onabort(self, func, *args, **kwargs):
 252         self.on_abort = func, args, kwargs
 253 
 254     def onprocess(self, func, *args, **kwargs):
 255         self.on_process = func, args, kwargs
 256         
 257     def onsync(self, func, *args, **kwargs):
 258         self.on_sync = func, args, kwargs
 259         
 260     def start(self, **kws):
 261         try:
 262             for func, args, kwargs in self.funcs:
 263                 self._update(kwargs, kws)
 264                 ret = self._run((func, args, kwargs))
 265             if self.on_success:
 266                 self._run(self.on_success)
 267             if self.on_sync:
 268                 self._run(self.on_sync)
 269         except AbortException:
 270             if self.on_abort:
 271                 self._run(self.on_abort)
 272             else:
 273                 print 'Abort'
 274             return
 275         except:
 276             if self.on_exception:
 277                 self._run(self.on_exception)
 278             else:
 279                 import traceback
 280                 traceback.print_exc()
 281             
 282     def start_thread(self, **kws):
 283         self.syncvar = syncvar = SyncVar()
 284         self.syncvar.set()
 285         self.t_func = t = FuncThread(self, syncvar, kws=kws)
 286         self.p_func = None
 287         t.setDaemon(True)
 288         t.start()
 289         if self.on_process:
 290             self.p_func = t1 = ProcessThread(self, syncvar)
 291             t1.setDaemon(True)
 292             t1.start()
 293             
 294     def sync_start(self, syncvar, kws={}):
 295         try:
 296             for func, args, kwargs in self.funcs:
 297                 self._update(kwargs, kws)
 298                 kwargs['syncvar'] = syncvar
 299                 if not syncvar:
 300                     return
 301                 self._run((func, args, kwargs))
 302             syncvar.clear()
 303             if self.on_success:
 304                 self._run(self.on_success)
 305             if self.on_sync:
 306                 self._run(self.on_sync)
 307         except AbortException:
 308             syncvar.clear()
 309             if self.on_abort:
 310                 self._run(self.on_abort)
 311             else:
 312                 print 'Abort'
 313             return
 314         except:
 315             syncvar.clear()
 316             if self.on_exception:
 317                 self._run(self.on_exception)
 318             else:
 319                 import traceback
 320                 traceback.print_exc()
 321 
 322     def start_sync_thread(self, **kws):
 323         self.syncvar = syncvar = SyncVar()
 324         self.syncvar.set()
 325         self.t_func = t = FuncThread(self, syncvar, sync=True, kws=kws)
 326         self.p_func = None
 327         t.setDaemon(True)
 328         t.start()
 329         if self.on_process:
 330             self.p_func = t1 = ProcessThread(self, syncvar, sync=True)
 331             t1.setDaemon(True)
 332             t1.start()
 333 
 334     def stop_thread(self):
 335         if self.syncvar:
 336             self.syncvar.clear()
 337                         
 338     def _run(self, func):
 339         f, args, kwargs = func
 340         return f(*args, **kwargs)
 341     
 342     def isactive(self):
 343         return self.t_func.isAlive()
 344     
 345 def new_obj():
 346     return DUMP_CLASS()
 347         
 348 if __name__ == '__main__':
 349     def test(n, syncvar):
 350         for i in range(1, n): 
 351             if syncvar:
 352                 syncvar.set(i)
 353                 print "=",i
 354                 time.sleep(1)
 355             else:
 356                 break
 357             
 358     def process(syncvar):
 359         print 'process...', syncvar.get()
 360 
 361     d = Casing(test, 10) + Casing(test, 20)
 362     d.onprocess(process, timestep=2)
 363     d.start_sync_thread()
 364     time.sleep(10)
 365     print 'stop'
 366     d.stop_thread()
 367     

Attached Files

To refer to attachments on a page, use attachment:filename, as shown below in the list of files. Do NOT use the URL of the [get] link, since this is subject to change and can break easily.
  • [get | view] (2021-05-11 08:51:51, 10.7 KB) [[attachment:Casing.py]]
 All files | Selected Files: delete move to page copy to page

You are not allowed to attach a file to this page.