Package mosp :: Module controller
[hide private]
[frames] | no frames]

Source Code for Module mosp.controller

  1  """External-controllable simulation and a controller to couple multiple MoSP simulations.""" 
  2   
  3  import socket 
  4  import select 
  5  import sys 
  6  import time 
  7  import json 
  8   
  9  from SimPy import SimulationRT 
 10  from SimPy.SimulationRT import hold, passivate 
 11   
 12  sys.path.extend(['.', '..','../..']) 
 13  from core import Simulation 
 14   
 15  __author__ = "B. Henne, P. Tute" 
 16  __contact__ = "henne@dcsec.uni-hannover.de" 
 17  __copyright__ = "(c) 2010-2012, DCSec, Leibniz Universitaet Hannover, Germany" 
 18  __license__ = "GPLv3" 
 19   
 20  HANDLERS = {} 
 21  QUIT = '\x00' #: Signal for simulations to end 
 22  GET_MODE = '\x01' #: Signal to initialize getting data from simulations 
 23  PUT_MODE = '\x02' #: Signal to intitialize putting data to simulations 
 24  STEP = '\x03' #: Signal to let simulations do a number of steps 
 25  IDENT = '\x04' #: Signal to let simulations identify themselves 
 26  REGISTER = '\x05' #: Signal to register simulations at another simulation 
 27  STEP_DONE_GOT_DATA = '\xF9' #: Signal used by simulation to signal all demanded steps are done and data should be send 
 28  STEP_DONE = '\xFA' #: Signal used by simulation to signal all demanded steps are done 
 29  SIM_ENDED = '\xFB' #: Signal used by simulations to signal that they have ended (prematurely) 
 30  ACK = '\xFC' #: ACK for messages 
 31  FIELD_SEP = '\xFD' #: Seperates fields within one message 
 32  MSG_SEP = '\xFE' #: Seperates messages within one message-blob 
 33  MSG_END = '\xFF' #: Signals end of message-blob 
 34   
 35  SIM_TIMEOUT = 120 #: Timeout, after which simulations are considered dead when waiting for STEP_DONE 
 36  TICK_DELAY = 0.01 #: Delay between each tick. Mainly used to avoid too fast sending of messages when using a monitor. 
37 38 39 -class Controller(object):
40 """Controller to connect and coordinate simulations."""
41 - def __init__(self, sims):
42 """Initialize connections and identifiy simulations. 43 44 @param sims: a list of all simulations that should be connected, represented by lists containing type, port.""" 45 # final structure of simulation-lists: 46 # 0 1 2 3 4 5 6 47 # name, type, port, socket, msg_from, msg_to, node_id 48 self.sims = [] 49 self.get_from = set() 50 for sim_port in sims: 51 # extend the list of each simulation to hold all necessary values 52 new_sim = [''] 53 new_sim.append('') 54 new_sim.append(sim_port) 55 new_sim.extend([None, '', '', 0]) 56 self.sims.append(new_sim) 57 self.need_registration = [] 58 self.register_to = [] 59 for sim in self.sims: 60 self.connect_to(sim) 61 self.byName = self.sims_byName() 62 self.bySocket = self.sims_bySocket() 63 # build registration message with all siafu sims 64 register_msg = '' 65 for sim in self.need_registration: 66 if register_msg: 67 # not the first register-message, seperate with MSG_SEP 68 register_msg += MSG_SEP 69 else: 70 # no register-messages so far, send REGISTER first 71 register_msg += REGISTER 72 register_msg += str(sim[6]) + FIELD_SEP + sim[0] 73 if register_msg: 74 register_msg += MSG_END 75 # send registration message to all mosp-sims 76 for sim in self.register_to: 77 sim[3].sendall(register_msg) 78 for sim in self.sims: 79 if __debug__: 80 print sim 81 # set socket to non-blocking AFTER connection establishment to avoid complications and complexity 82 sim[3].setblocking(0)
83
84 - def connect_to(self, sim):
85 """Connect to sim and exchange simulation information.""" 86 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 87 try: 88 s.connect(('localhost', sim[2])) 89 except socket.error as (errno, message): 90 if errno == 115: 91 # operation in progress...not needed for nonblocking sockets 92 pass 93 else: 94 raise socket.error, (errno, message) 95 sim[3] = s 96 # identify simulation 97 s.send(IDENT) 98 # receive type 99 sim[1] = '' 100 symbol = s.recv(1) 101 while symbol != FIELD_SEP: 102 sim[1] += symbol 103 symbol = s.recv(1) 104 if sim[1] == 'siafu': 105 self.need_registration.append(sim) 106 elif sim[1] == 'mosp': 107 self.register_to.append(sim) 108 #receive name 109 sim[0] = '' 110 symbol = s.recv(1) 111 while symbol != FIELD_SEP: 112 sim[0] += symbol 113 symbol = s.recv(1) 114 # receive node_id 115 node_id = '' 116 symbol = s.recv(1) 117 while symbol != MSG_END: 118 node_id += symbol 119 symbol = s.recv(1) 120 sim[6] = int(node_id) 121 if __debug__: 122 print 'talked to', repr(sim[0]), 'of type', repr(sim[1]), 'at port', sim[2], 'node_id is', sim[6]
123
124 - def sims_byName(self):
125 """Get simulations by name.""" 126 byName = {} 127 for sim in self.sims: 128 byName[sim[0]] = sim 129 return byName
130
131 - def sims_bySocket(self):
132 """Get simulations by socket.""" 133 bySocket = {} 134 for sim in self.sims: 135 bySocket[sim[3]] = sim 136 return bySocket
137
138 - def send(self, socket, msg):
139 """Send a message to a socket.""" 140 totalsent = 0 141 while totalsent < len(msg): 142 sent = socket.send(msg[totalsent:]) 143 if sent == 0: 144 raise RuntimeError("socket connection broken") 145 totalsent = totalsent + sent
146
147 - def receive(self, sock):
148 """Receive the next message(-blob) from a socket.""" 149 msg = '' 150 r = [] 151 while 42: 152 try: 153 chunk = sock.recv(1) 154 except socket.error as (errno, message): 155 if errno == 11: 156 # resource not ready to read yet...wait for it 157 continue 158 else: 159 raise socket.error, (errno, message) 160 if chunk == '': 161 raise RuntimeError("socket connection broken") 162 if chunk == MSG_END: 163 return msg 164 else: 165 msg = msg + chunk
166
167 - def cut_msgs(self, msgsblob):
168 """Cut a message blob into seperate messages and messages into fields. 169 170 @todo: make this more flexible, accept more message-types. 171 172 """ 173 msgs = [] 174 for m in msgsblob.split(MSG_SEP): 175 fields = m.split(FIELD_SEP) 176 if __debug__: 177 print "f", fields, len(fields), m 178 if len(fields) == 3: 179 msgs.append(fields) 180 else: 181 sys.stderr.write('discarded: %s\n' % m) 182 return msgs
183
184 - def dispatch(self, msgs, sender):
185 """Sort messages to the correct server. 186 187 @param sender: sender of the messages, to return messages with invalid receiver to. 188 189 """ 190 byName = self.byName 191 for m in msgs: 192 try: 193 sim = byName[m[0]] 194 except KeyError: 195 # return message to sender! 196 if __debug__: 197 print 'unknown sim!', m[0], ' message returned!' 198 sim = sender 199 print 'SIM\t', sim 200 if sim[5]: 201 # add message seperator since there are other messages already 202 sim[5] += MSG_SEP 203 sim[5] += m[1] + FIELD_SEP + m[2]
204
205 - def put_to_sims(self, byName, bySocket):
206 """Put all collected data to simulations. 207 208 This sends all messages, that were received since the last call to the receiver. 209 @param byName: all simulations that should be considered as receivers. 210 @type byName: dict with the names of simulations as keys and the according lists as items. 211 @param bySocket: all simulations that should be considered as receivers. 212 @type bySocket: dict with the sockets of simulations as keys and the according lists as items. 213 214 """ 215 need_put = [sim[3] for sim in self.sims if sim[5]] 216 if __debug__: 217 print 'PUT TO', need_put 218 while need_put: 219 ready_to_read, ready_to_write, in_error = select.select([], need_put, [], 0) 220 # write to all sockets from need_put that are ready 221 for sock in ready_to_write: 222 sim = bySocket[sock] 223 self.send(sock, PUT_MODE) 224 self.send(sock, sim[5]) 225 self.send(sock, MSG_END) 226 # wait for ack 227 r = [] 228 while sock not in r: 229 r, w, e = select.select([sock], [], [], 0.1) 230 ack = sock.recv(1) 231 if ack == ACK: 232 if __debug__: 233 print 'got ack for put' 234 else: 235 print 'no ack for put, got', repr(ack) 236 need_put.remove(sock) 237 # clear outgoing messages 238 for sim in self.sims: 239 sim[5] = ''
240
241 - def do_steps(self, byName, bySocket, steps=1):
242 """Tell all simulations to do a number of steps. 243 244 @param byName: all simulations that should be considered as receivers. 245 @type byName: dict with the names of simulations as keys and the according lists as items. 246 @param bySocket: all simulations that should be considered as receivers. 247 @type bySocket: dict with the sockets of simulations as keys and the according lists as items. 248 @param steps: the number of steps that all simulations should do (default = 1). 249 250 """ 251 if __debug__: 252 print 'sending STEP' 253 need_step = bySocket.keys() 254 while need_step: 255 ready_to_read, ready_to_write, in_error = select.select([], need_step, [], 0) 256 for sock in ready_to_write: 257 self.send(sock, STEP) 258 self.send(sock, str(steps)) 259 self.send(sock, MSG_END) 260 r = [] 261 while sock not in r: 262 r, w, e = select.select([sock], [], [], 0.1) 263 ack = sock.recv(1) 264 if __debug__: 265 if ack == ACK: 266 print 'got ack for step' 267 else: 268 print 'no ack for step, got', repr(ack) 269 need_step.remove(sock) 270 # wait for all steps to finish 271 wait_for_step = bySocket.keys() 272 while wait_for_step: 273 ready_to_read, ready_to_write, in_error = select.select(wait_for_step, [], [], 0) 274 for sock in ready_to_read: 275 sim = bySocket[sock] 276 result = sock.recv(1) 277 if result == SIM_ENDED: 278 print 'sim ended', sim[0] 279 elif result == STEP_DONE: 280 if __debug__: 281 print 'step done', sim[0] 282 elif result == STEP_DONE_GOT_DATA: 283 self.get_from.add(sock) 284 if __debug__: 285 print 'step done, need get', sim[0] 286 else: 287 print 'wanted {!r}, {!r} or {!r}, got {!r}'.format(STEP_DONE, STEP_DONE_GOT_DATA, SIM_ENDED, result) 288 wait_for_step.remove(sock)
289
290 - def get_from_sims(self, byName, bySocket):
291 """Get the next messages from all simulations that have data ready. 292 293 @param byName: all simulations that should be considered as receivers. 294 @type byName: dict with the names of simulations as keys and the according lists as items. 295 @param bySocket: all simulations that should be considered as receivers. 296 @type bySocket: dict with the sockets of simulations as keys and the according lists as items. 297 298 """ 299 if __debug__: 300 print 'getting data from', self.get_from 301 get_from = self.get_from 302 while get_from: 303 ready_to_read, ready_to_write, in_error = select.select(get_from, [], [], 0) 304 for sock in ready_to_read: 305 sim = bySocket[sock] 306 sim[4] = self.receive(sock) 307 blob = self.cut_msgs(sim[4]) 308 self.dispatch(blob, sim) 309 if __debug__: 310 print "got", repr(sim[4]), "from", sim[0] 311 sock.send(ACK) 312 get_from.remove(sock) 313 self.get_from.clear()
314
315 - def run(self, until):
316 """Run the simulations for a given number of ticks. 317 318 @param until: number of ticks to run all simulations for.""" 319 t = 0 320 byName = self.byName 321 bySocket = self.bySocket 322 while t < until: 323 time.sleep(TICK_DELAY) 324 # PUT 325 self.put_to_sims(byName, bySocket) 326 # STEP 327 self.do_steps(byName, bySocket) 328 # now get data from all simulations that announced STEP_DONE_GOT_DATA 329 self.get_from_sims(byName, bySocket) 330 if __debug__: 331 print 'all steps done' 332 t += 1
333
334 - def shutdown(self):
335 """End all simulations.""" 336 if __debug__: 337 print 'ending sims' 338 for sim in self.sims: 339 self.send(sim[3], QUIT)
340
341 342 -class Ticker(SimulationRT.Process):
343 """Ticker class to stop the simulation from doing more than one tick at a time.""" 344 #def __init__(self, name, sim): 345 # SimulationRT.Process.__init__(self, name=name, sim=sim) 346
347 - def go(self):
348 while True: 349 yield hold, self, 1
350
351 352 -def handle(code):
353 def re(userfunc): 354 HANDLERS[code] = userfunc 355 return userfunc
356 return re 357
358 -class SimulationControlled(Simulation):
359 """The MoSP Simulation, extended for use with an external controller. 360 361 @author: P. Tute 362 @author: B. Henne""" 363
364 - def __init__(self, geo, name, host, port, start_timestamp=None, rel_speed=None, seed=1, allow_dup=False):
365 """Initialize the MOSP Simulation. 366 367 @param geo: geo model for simulation, a mosp.geo.osm.OSMModel extending the mops.collide.World 368 @param name: name to identify this simulation by 369 @param host: host to bind connection to 370 @param port: port to bind connection to 371 @param start_timestamp: time.time timestamp when simulation starts - used to calc DateTime of simlation out of simulation ticks. 372 @param rel_speed: (SimPy) ratio simulation time over wallclock time; example: rel_speed=200 executes 200 units of simulation time in about one second 373 @param seed: seed for simulation random generator 374 @param allow_dup: allow duplicates? only one or multiple Simulations can be startet at once 375 376 """ 377 Simulation.__init__(self, geo, start_timestamp, rel_speed, seed, allow_dup) 378 self.name = name 379 self.net_host = host 380 self.net_port = port 381 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 382 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # avoid complication after crashes, allow reuse of port. 383 s.bind((self.net_host, self.net_port)) 384 print 'Waiting for external controller to connect...' 385 s.listen(1) 386 self.net_conn, self.net_addr = s.accept() 387 print 'Connected by', self.net_addr 388 389 # alarm for person.pause_movement() 390 self.activate(self.person_alarm_clock, self.person_alarm_clock.serve(), 0) 391 # ticker is needed to stop sim from doing more than one step at a time 392 self.ticker = Ticker(name='ticker', sim=self) 393 self.activate(self.ticker, self.ticker.go(), 0) 394 395 self.do_step = 0 396 self.data_to_send = [] 397 398 self.registered_nodes = {} 399 400 # based on code from SimPy.SimulationRT.simulate 401 self.rtstart = self.wallclock() 402 self.rtset(self.rel_speed)
403 404
405 - def send_string(self, string):
406 """Send a string to the controller.""" 407 conn = self.net_conn 408 l = len(string) 409 totalsent = 0 410 while totalsent < l: 411 sent = conn.send(string[totalsent:]) 412 if sent == 0: 413 raise RuntimeError("socket connection broken") 414 totalsent = totalsent + sent
415 416 @handle(QUIT)
417 - def shutdownServer(self):
418 """Stops the simulation and closes connection to controller. 419 420 This handles the QUIT-signal from the controller. 421 422 """ 423 if __debug__: 424 print 'shutdown' 425 self._stop = True 426 self.net_conn.close()
427 428 @handle(GET_MODE)
429 - def get_data_from_sim(self):
430 """Send all available data to the controller. 431 432 The GET_MODE-signal is handled by this method. 433 Calling this when the controller does not expect data will cause it to crash. Use wisely. 434 435 """ 436 conn = self.net_conn 437 if __debug__: 438 print 'get' 439 for msg in self.data_to_send[:]: 440 to = msg[0] 441 agent_type = msg[1] 442 content = msg[2] 443 if __debug__: 444 print '\tto', to 445 self.send_string(to) 446 conn.send(FIELD_SEP) 447 if __debug__: 448 print '\tagent type', agent_type 449 self.send_string(agent_type) 450 conn.send(FIELD_SEP) 451 if __debug__: 452 print '\tcontent', content 453 self.send_string(content) 454 self.data_to_send.remove(msg) 455 if self.data_to_send: 456 conn.send(MSG_SEP) 457 conn.send(MSG_END) 458 ack = '' 459 while ack == '': 460 ack = conn.recv(1) 461 if ack == ACK: 462 if __debug__: 463 print '\tgot ack' 464 else: 465 print '\tno ack! got', repr(ack) 466 self.data_to_send = []
467 468 @handle(PUT_MODE)
469 - def put_data_to_sim(self):
470 """Receive data from the controller. 471 472 This handles the PUT_MODE-signal and should only be called when receiving PUT_MODE. You will most likely never do this by hand! 473 474 """ 475 if __debug__: 476 print 'put' 477 conn = self.net_conn 478 payload = '' 479 while 42: 480 if __debug__: 481 print '\t', 482 data = conn.recv(1) 483 if data == MSG_END: 484 if __debug__: 485 print 'msg end' 486 # msg end, process last message and end reception 487 self.process_data(payload) 488 break 489 elif data == MSG_SEP: 490 if __debug__: 491 print 'msg sep' 492 # msg seperator, process last received message 493 self.process_data(payload) 494 payload = '' 495 else: 496 if __debug__: 497 print 'payload', repr(data) 498 payload += data 499 conn.send(ACK)
500 501 @handle(STEP)
502 - def enable_step(self):
503 """Receives the number of steps to and sets self.do_step accordingly. 504 505 This handles the STEP-signal from the controller. 506 507 """ 508 if __debug__: 509 print 'step' 510 conn = self.net_conn 511 nr_of_steps = '' 512 symbol = conn.recv(1) 513 while symbol != MSG_END: 514 if __debug__: 515 print 'got', repr(symbol) 516 nr_of_steps += symbol 517 symbol = conn.recv(1) 518 self.do_step = int(nr_of_steps) 519 if __debug__: 520 print 'doing', nr_of_steps, 'steps'
521 522 @handle(IDENT)
523 - def identify(self):
524 """Send type, name and associated osm-node-id to controller. 525 526 This handles the IDENT-signal from the controller. 527 528 """ 529 if __debug__: 530 print 'ident' 531 print '\t', self.name 532 self.net_conn.sendall('mosp') 533 self.net_conn.send(FIELD_SEP) 534 self.net_conn.sendall(self.name) 535 self.net_conn.send(FIELD_SEP) 536 # XXX no real associated osm-node-id, since it is not needed now...update if this changes 537 self.net_conn.send('0') 538 self.net_conn.send(MSG_END)
539 540 @handle(REGISTER)
541 - def register(self):
542 """Receive description data of other simulations and make them adressable. 543 544 This handles the REGISTER-signal from the controller. 545 546 """ 547 if __debug__: 548 print 'register' 549 conn = self.net_conn 550 msg = '' 551 symbol = conn.recv(1) 552 while symbol != MSG_END: 553 msg += symbol 554 symbol = conn.recv(1) 555 for m in msg.split(MSG_SEP): 556 m_cut = m.split(FIELD_SEP) 557 try: 558 node_id = int(m_cut[0]) 559 except ValueError: 560 print 'Invalid node ID when registering!', m_cut[0] 561 continue 562 # register node with node_id and simulation name m_cut[1] 563 self.registered_nodes[node_id] = m_cut[1] 564 if __debug__: 565 print 'REGISTERED', node_id, m_cut[1]
566
567 - def process_data(self, message):
568 """Extract usable data from received message. 569 570 The expected message format is typeFIELD_SEPcontents. 571 If type is 'P', contents should be a json string representing a person to re-add. 572 If type is 'L', contents should be a string containing a log message. 573 This mehtod can be extended to support more message types, if necessary. 574 575 @param message: message containing a json-string 576 577 """ 578 if __debug__: 579 print 'processing', message 580 message_split = message.split(FIELD_SEP) 581 if message_split[0] == 'P': 582 # received a person, readd it 583 person_dict = json.loads(message_split[1]) 584 self.readd_person(person_dict['p_id'], person_dict) 585 elif message_split[0] == 'L': 586 # received a log-message 587 logging.info(message_split[1])
588
589 - def run(self, until, real_time, monitor=True):
590 """Run Simulation after setup in external-controlled modus. 591 592 @param until: simulation runs until this tick 593 @param real_time: run in real-time? or as fast as possible 594 @param monitor: start defined monitors? 595 596 """ 597 if monitor: 598 if len(self.monitors) == 0: 599 raise monitors.NoSimulationMonitorDefinedException('at mosp.Simulation.run()') 600 for mon in self.monitors: 601 mon.init() 602 603 # alarm for person.pause_movement() 604 self.activate(self.person_alarm_clock, self.person_alarm_clock.serve(), 0) 605 606 # based on code from SimPy.SimulationRT.simulate 607 self.rtstart = self.wallclock() 608 self.rtset(self.rel_speed) 609 610 last_event_time = 0 611 while self._timestamps and not self._stop: 612 next_event_time = self.peek() 613 614 if last_event_time != next_event_time: 615 while not self.do_step: 616 flag = self.net_conn.recv(1) 617 if flag == '': 618 continue # nothing to do 619 elif flag == QUIT: 620 HANDLERS[QUIT](self) 621 break # do not read anymore 622 try: 623 HANDLERS[flag](self) 624 except KeyError: 625 print '\n\n',repr(flag) 626 raise KeyError 627 if self._stop: 628 break 629 self.net_conn.send(ACK) # acknowledge start of step 630 if __debug__: 631 print '\tdoing it' 632 pass # replaces next logging statement 633 #logging.debug('Simulation.run.next_event_time = %s' % next_event_time) 634 last_event_time = next_event_time 635 if next_event_time > until: 636 break 637 638 # network communication harms real time simulation, it's only a delay now 639 if real_time: 640 delay = ( 641 next_event_time / self.rel_speed - 642 (self.wallclock() - self.rtstart) 643 ) 644 if delay > 0: time.sleep(delay) 645 646 # do communication stuff 647 while self.messages and self.messages[0].time < next_event_time: 648 # execute messages 649 heappop(self.messages)() # execute __call__() of popped object 650 current_step = self._t 651 while self._t == current_step: 652 self.step() 653 if self.do_step: 654 # only do this when a step was supposed to be done (not when QUIT was received etc.) 655 if self.data_to_send: 656 self.net_conn.send(STEP_DONE_GOT_DATA) 657 self.get_data_from_sim() 658 else: 659 self.net_conn.send(STEP_DONE) 660 self.do_step -= 1 661 if __debug__: 662 print '\tnow at:', self.now() 663 664 # There are still events in the timestamps list and the simulation 665 # has not been manually stopped. This means we have reached the stop 666 # time. 667 for m in self.monitors: 668 m.end() 669 if not self._stop: 670 # already received QUIT...no need to inform controller or shut down again 671 self.net_conn.send(SIM_ENDED) 672 self.shutdownServer() 673 if not self._stop and self._timestamps: 674 self._t = until 675 return 'SimPy: Normal exit' 676 else: 677 return 'SimPy: No activities scheduled'
678
679 - def send_person(self, person, node_id):
680 """Prepare a person to be send to another simulation. 681 682 Necessary data is extracted from the person and prepared for transfer using json. 683 This does NOT remove the person from the simulation. That must be done elsewhere. 684 @param person: The person to be send, 685 @param node_id: ID of the node that is used as exit. 686 687 """ 688 try: 689 osm_node_id = self.geo.map_nodeid_osmnodeid[node_id] 690 except KeyError: 691 print 'Invalid node_id for send_person!' 692 return 693 #to_sim = self.registered_nodes[int(osm_node_id)] 694 to_sim = 'asdf' 695 props = person.get_properties() 696 message_type = 'P' 697 props_json = json.dumps(props) 698 self.data_to_send.append((to_sim, message_type, props_json))
699 700 701 if __name__ == '__main__': 702 sims = [50001, 703 # 4444, 704 ] 705 disp = Controller(sims) 706 disp.run(5000) 707 disp.shutdown() 708