1 """External-controllable simulation and a controller to couple multiple MoSP simulations."""
2
3 import socket
4 import select
5 import sys
6 import time
7 import json
8
9 from SimPy import SimulationRT
10 from SimPy.SimulationRT import hold, passivate
11
12 sys.path.extend(['.', '..','../..'])
13 from core import Simulation
14
15 __author__ = "B. Henne, P. Tute"
16 __contact__ = "henne@dcsec.uni-hannover.de"
17 __copyright__ = "(c) 2010-2012, DCSec, Leibniz Universitaet Hannover, Germany"
18 __license__ = "GPLv3"
19
20 HANDLERS = {}
21 QUIT = '\x00'
22 GET_MODE = '\x01'
23 PUT_MODE = '\x02'
24 STEP = '\x03'
25 IDENT = '\x04'
26 REGISTER = '\x05'
27 STEP_DONE_GOT_DATA = '\xF9'
28 STEP_DONE = '\xFA'
29 SIM_ENDED = '\xFB'
30 ACK = '\xFC'
31 FIELD_SEP = '\xFD'
32 MSG_SEP = '\xFE'
33 MSG_END = '\xFF'
34
35 SIM_TIMEOUT = 120
36 TICK_DELAY = 0.01
40 """Controller to connect and coordinate simulations."""
42 """Initialize connections and identifiy simulations.
43
44 @param sims: a list of all simulations that should be connected, represented by lists containing type, port."""
45
46
47
48 self.sims = []
49 self.get_from = set()
50 for sim_port in sims:
51
52 new_sim = ['']
53 new_sim.append('')
54 new_sim.append(sim_port)
55 new_sim.extend([None, '', '', 0])
56 self.sims.append(new_sim)
57 self.need_registration = []
58 self.register_to = []
59 for sim in self.sims:
60 self.connect_to(sim)
61 self.byName = self.sims_byName()
62 self.bySocket = self.sims_bySocket()
63
64 register_msg = ''
65 for sim in self.need_registration:
66 if register_msg:
67
68 register_msg += MSG_SEP
69 else:
70
71 register_msg += REGISTER
72 register_msg += str(sim[6]) + FIELD_SEP + sim[0]
73 if register_msg:
74 register_msg += MSG_END
75
76 for sim in self.register_to:
77 sim[3].sendall(register_msg)
78 for sim in self.sims:
79 if __debug__:
80 print sim
81
82 sim[3].setblocking(0)
83
85 """Connect to sim and exchange simulation information."""
86 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87 try:
88 s.connect(('localhost', sim[2]))
89 except socket.error as (errno, message):
90 if errno == 115:
91
92 pass
93 else:
94 raise socket.error, (errno, message)
95 sim[3] = s
96
97 s.send(IDENT)
98
99 sim[1] = ''
100 symbol = s.recv(1)
101 while symbol != FIELD_SEP:
102 sim[1] += symbol
103 symbol = s.recv(1)
104 if sim[1] == 'siafu':
105 self.need_registration.append(sim)
106 elif sim[1] == 'mosp':
107 self.register_to.append(sim)
108
109 sim[0] = ''
110 symbol = s.recv(1)
111 while symbol != FIELD_SEP:
112 sim[0] += symbol
113 symbol = s.recv(1)
114
115 node_id = ''
116 symbol = s.recv(1)
117 while symbol != MSG_END:
118 node_id += symbol
119 symbol = s.recv(1)
120 sim[6] = int(node_id)
121 if __debug__:
122 print 'talked to', repr(sim[0]), 'of type', repr(sim[1]), 'at port', sim[2], 'node_id is', sim[6]
123
125 """Get simulations by name."""
126 byName = {}
127 for sim in self.sims:
128 byName[sim[0]] = sim
129 return byName
130
132 """Get simulations by socket."""
133 bySocket = {}
134 for sim in self.sims:
135 bySocket[sim[3]] = sim
136 return bySocket
137
138 - def send(self, socket, msg):
139 """Send a message to a socket."""
140 totalsent = 0
141 while totalsent < len(msg):
142 sent = socket.send(msg[totalsent:])
143 if sent == 0:
144 raise RuntimeError("socket connection broken")
145 totalsent = totalsent + sent
146
148 """Receive the next message(-blob) from a socket."""
149 msg = ''
150 r = []
151 while 42:
152 try:
153 chunk = sock.recv(1)
154 except socket.error as (errno, message):
155 if errno == 11:
156
157 continue
158 else:
159 raise socket.error, (errno, message)
160 if chunk == '':
161 raise RuntimeError("socket connection broken")
162 if chunk == MSG_END:
163 return msg
164 else:
165 msg = msg + chunk
166
168 """Cut a message blob into seperate messages and messages into fields.
169
170 @todo: make this more flexible, accept more message-types.
171
172 """
173 msgs = []
174 for m in msgsblob.split(MSG_SEP):
175 fields = m.split(FIELD_SEP)
176 if __debug__:
177 print "f", fields, len(fields), m
178 if len(fields) == 3:
179 msgs.append(fields)
180 else:
181 sys.stderr.write('discarded: %s\n' % m)
182 return msgs
183
185 """Sort messages to the correct server.
186
187 @param sender: sender of the messages, to return messages with invalid receiver to.
188
189 """
190 byName = self.byName
191 for m in msgs:
192 try:
193 sim = byName[m[0]]
194 except KeyError:
195
196 if __debug__:
197 print 'unknown sim!', m[0], ' message returned!'
198 sim = sender
199 print 'SIM\t', sim
200 if sim[5]:
201
202 sim[5] += MSG_SEP
203 sim[5] += m[1] + FIELD_SEP + m[2]
204
206 """Put all collected data to simulations.
207
208 This sends all messages, that were received since the last call to the receiver.
209 @param byName: all simulations that should be considered as receivers.
210 @type byName: dict with the names of simulations as keys and the according lists as items.
211 @param bySocket: all simulations that should be considered as receivers.
212 @type bySocket: dict with the sockets of simulations as keys and the according lists as items.
213
214 """
215 need_put = [sim[3] for sim in self.sims if sim[5]]
216 if __debug__:
217 print 'PUT TO', need_put
218 while need_put:
219 ready_to_read, ready_to_write, in_error = select.select([], need_put, [], 0)
220
221 for sock in ready_to_write:
222 sim = bySocket[sock]
223 self.send(sock, PUT_MODE)
224 self.send(sock, sim[5])
225 self.send(sock, MSG_END)
226
227 r = []
228 while sock not in r:
229 r, w, e = select.select([sock], [], [], 0.1)
230 ack = sock.recv(1)
231 if ack == ACK:
232 if __debug__:
233 print 'got ack for put'
234 else:
235 print 'no ack for put, got', repr(ack)
236 need_put.remove(sock)
237
238 for sim in self.sims:
239 sim[5] = ''
240
241 - def do_steps(self, byName, bySocket, steps=1):
242 """Tell all simulations to do a number of steps.
243
244 @param byName: all simulations that should be considered as receivers.
245 @type byName: dict with the names of simulations as keys and the according lists as items.
246 @param bySocket: all simulations that should be considered as receivers.
247 @type bySocket: dict with the sockets of simulations as keys and the according lists as items.
248 @param steps: the number of steps that all simulations should do (default = 1).
249
250 """
251 if __debug__:
252 print 'sending STEP'
253 need_step = bySocket.keys()
254 while need_step:
255 ready_to_read, ready_to_write, in_error = select.select([], need_step, [], 0)
256 for sock in ready_to_write:
257 self.send(sock, STEP)
258 self.send(sock, str(steps))
259 self.send(sock, MSG_END)
260 r = []
261 while sock not in r:
262 r, w, e = select.select([sock], [], [], 0.1)
263 ack = sock.recv(1)
264 if __debug__:
265 if ack == ACK:
266 print 'got ack for step'
267 else:
268 print 'no ack for step, got', repr(ack)
269 need_step.remove(sock)
270
271 wait_for_step = bySocket.keys()
272 while wait_for_step:
273 ready_to_read, ready_to_write, in_error = select.select(wait_for_step, [], [], 0)
274 for sock in ready_to_read:
275 sim = bySocket[sock]
276 result = sock.recv(1)
277 if result == SIM_ENDED:
278 print 'sim ended', sim[0]
279 elif result == STEP_DONE:
280 if __debug__:
281 print 'step done', sim[0]
282 elif result == STEP_DONE_GOT_DATA:
283 self.get_from.add(sock)
284 if __debug__:
285 print 'step done, need get', sim[0]
286 else:
287 print 'wanted {!r}, {!r} or {!r}, got {!r}'.format(STEP_DONE, STEP_DONE_GOT_DATA, SIM_ENDED, result)
288 wait_for_step.remove(sock)
289
291 """Get the next messages from all simulations that have data ready.
292
293 @param byName: all simulations that should be considered as receivers.
294 @type byName: dict with the names of simulations as keys and the according lists as items.
295 @param bySocket: all simulations that should be considered as receivers.
296 @type bySocket: dict with the sockets of simulations as keys and the according lists as items.
297
298 """
299 if __debug__:
300 print 'getting data from', self.get_from
301 get_from = self.get_from
302 while get_from:
303 ready_to_read, ready_to_write, in_error = select.select(get_from, [], [], 0)
304 for sock in ready_to_read:
305 sim = bySocket[sock]
306 sim[4] = self.receive(sock)
307 blob = self.cut_msgs(sim[4])
308 self.dispatch(blob, sim)
309 if __debug__:
310 print "got", repr(sim[4]), "from", sim[0]
311 sock.send(ACK)
312 get_from.remove(sock)
313 self.get_from.clear()
314
315 - def run(self, until):
316 """Run the simulations for a given number of ticks.
317
318 @param until: number of ticks to run all simulations for."""
319 t = 0
320 byName = self.byName
321 bySocket = self.bySocket
322 while t < until:
323 time.sleep(TICK_DELAY)
324
325 self.put_to_sims(byName, bySocket)
326
327 self.do_steps(byName, bySocket)
328
329 self.get_from_sims(byName, bySocket)
330 if __debug__:
331 print 'all steps done'
332 t += 1
333
335 """End all simulations."""
336 if __debug__:
337 print 'ending sims'
338 for sim in self.sims:
339 self.send(sim[3], QUIT)
340
341
342 -class Ticker(SimulationRT.Process):
343 """Ticker class to stop the simulation from doing more than one tick at a time."""
344
345
346
348 while True:
349 yield hold, self, 1
350
353 def re(userfunc):
354 HANDLERS[code] = userfunc
355 return userfunc
356 return re
357
359 """The MoSP Simulation, extended for use with an external controller.
360
361 @author: P. Tute
362 @author: B. Henne"""
363
364 - def __init__(self, geo, name, host, port, start_timestamp=None, rel_speed=None, seed=1, allow_dup=False):
365 """Initialize the MOSP Simulation.
366
367 @param geo: geo model for simulation, a mosp.geo.osm.OSMModel extending the mops.collide.World
368 @param name: name to identify this simulation by
369 @param host: host to bind connection to
370 @param port: port to bind connection to
371 @param start_timestamp: time.time timestamp when simulation starts - used to calc DateTime of simlation out of simulation ticks.
372 @param rel_speed: (SimPy) ratio simulation time over wallclock time; example: rel_speed=200 executes 200 units of simulation time in about one second
373 @param seed: seed for simulation random generator
374 @param allow_dup: allow duplicates? only one or multiple Simulations can be startet at once
375
376 """
377 Simulation.__init__(self, geo, start_timestamp, rel_speed, seed, allow_dup)
378 self.name = name
379 self.net_host = host
380 self.net_port = port
381 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
382 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
383 s.bind((self.net_host, self.net_port))
384 print 'Waiting for external controller to connect...'
385 s.listen(1)
386 self.net_conn, self.net_addr = s.accept()
387 print 'Connected by', self.net_addr
388
389
390 self.activate(self.person_alarm_clock, self.person_alarm_clock.serve(), 0)
391
392 self.ticker = Ticker(name='ticker', sim=self)
393 self.activate(self.ticker, self.ticker.go(), 0)
394
395 self.do_step = 0
396 self.data_to_send = []
397
398 self.registered_nodes = {}
399
400
401 self.rtstart = self.wallclock()
402 self.rtset(self.rel_speed)
403
404
406 """Send a string to the controller."""
407 conn = self.net_conn
408 l = len(string)
409 totalsent = 0
410 while totalsent < l:
411 sent = conn.send(string[totalsent:])
412 if sent == 0:
413 raise RuntimeError("socket connection broken")
414 totalsent = totalsent + sent
415
416 @handle(QUIT)
418 """Stops the simulation and closes connection to controller.
419
420 This handles the QUIT-signal from the controller.
421
422 """
423 if __debug__:
424 print 'shutdown'
425 self._stop = True
426 self.net_conn.close()
427
428 @handle(GET_MODE)
430 """Send all available data to the controller.
431
432 The GET_MODE-signal is handled by this method.
433 Calling this when the controller does not expect data will cause it to crash. Use wisely.
434
435 """
436 conn = self.net_conn
437 if __debug__:
438 print 'get'
439 for msg in self.data_to_send[:]:
440 to = msg[0]
441 agent_type = msg[1]
442 content = msg[2]
443 if __debug__:
444 print '\tto', to
445 self.send_string(to)
446 conn.send(FIELD_SEP)
447 if __debug__:
448 print '\tagent type', agent_type
449 self.send_string(agent_type)
450 conn.send(FIELD_SEP)
451 if __debug__:
452 print '\tcontent', content
453 self.send_string(content)
454 self.data_to_send.remove(msg)
455 if self.data_to_send:
456 conn.send(MSG_SEP)
457 conn.send(MSG_END)
458 ack = ''
459 while ack == '':
460 ack = conn.recv(1)
461 if ack == ACK:
462 if __debug__:
463 print '\tgot ack'
464 else:
465 print '\tno ack! got', repr(ack)
466 self.data_to_send = []
467
468 @handle(PUT_MODE)
470 """Receive data from the controller.
471
472 This handles the PUT_MODE-signal and should only be called when receiving PUT_MODE. You will most likely never do this by hand!
473
474 """
475 if __debug__:
476 print 'put'
477 conn = self.net_conn
478 payload = ''
479 while 42:
480 if __debug__:
481 print '\t',
482 data = conn.recv(1)
483 if data == MSG_END:
484 if __debug__:
485 print 'msg end'
486
487 self.process_data(payload)
488 break
489 elif data == MSG_SEP:
490 if __debug__:
491 print 'msg sep'
492
493 self.process_data(payload)
494 payload = ''
495 else:
496 if __debug__:
497 print 'payload', repr(data)
498 payload += data
499 conn.send(ACK)
500
501 @handle(STEP)
503 """Receives the number of steps to and sets self.do_step accordingly.
504
505 This handles the STEP-signal from the controller.
506
507 """
508 if __debug__:
509 print 'step'
510 conn = self.net_conn
511 nr_of_steps = ''
512 symbol = conn.recv(1)
513 while symbol != MSG_END:
514 if __debug__:
515 print 'got', repr(symbol)
516 nr_of_steps += symbol
517 symbol = conn.recv(1)
518 self.do_step = int(nr_of_steps)
519 if __debug__:
520 print 'doing', nr_of_steps, 'steps'
521
522 @handle(IDENT)
524 """Send type, name and associated osm-node-id to controller.
525
526 This handles the IDENT-signal from the controller.
527
528 """
529 if __debug__:
530 print 'ident'
531 print '\t', self.name
532 self.net_conn.sendall('mosp')
533 self.net_conn.send(FIELD_SEP)
534 self.net_conn.sendall(self.name)
535 self.net_conn.send(FIELD_SEP)
536
537 self.net_conn.send('0')
538 self.net_conn.send(MSG_END)
539
540 @handle(REGISTER)
542 """Receive description data of other simulations and make them adressable.
543
544 This handles the REGISTER-signal from the controller.
545
546 """
547 if __debug__:
548 print 'register'
549 conn = self.net_conn
550 msg = ''
551 symbol = conn.recv(1)
552 while symbol != MSG_END:
553 msg += symbol
554 symbol = conn.recv(1)
555 for m in msg.split(MSG_SEP):
556 m_cut = m.split(FIELD_SEP)
557 try:
558 node_id = int(m_cut[0])
559 except ValueError:
560 print 'Invalid node ID when registering!', m_cut[0]
561 continue
562
563 self.registered_nodes[node_id] = m_cut[1]
564 if __debug__:
565 print 'REGISTERED', node_id, m_cut[1]
566
568 """Extract usable data from received message.
569
570 The expected message format is typeFIELD_SEPcontents.
571 If type is 'P', contents should be a json string representing a person to re-add.
572 If type is 'L', contents should be a string containing a log message.
573 This mehtod can be extended to support more message types, if necessary.
574
575 @param message: message containing a json-string
576
577 """
578 if __debug__:
579 print 'processing', message
580 message_split = message.split(FIELD_SEP)
581 if message_split[0] == 'P':
582
583 person_dict = json.loads(message_split[1])
584 self.readd_person(person_dict['p_id'], person_dict)
585 elif message_split[0] == 'L':
586
587 logging.info(message_split[1])
588
589 - def run(self, until, real_time, monitor=True):
590 """Run Simulation after setup in external-controlled modus.
591
592 @param until: simulation runs until this tick
593 @param real_time: run in real-time? or as fast as possible
594 @param monitor: start defined monitors?
595
596 """
597 if monitor:
598 if len(self.monitors) == 0:
599 raise monitors.NoSimulationMonitorDefinedException('at mosp.Simulation.run()')
600 for mon in self.monitors:
601 mon.init()
602
603
604 self.activate(self.person_alarm_clock, self.person_alarm_clock.serve(), 0)
605
606
607 self.rtstart = self.wallclock()
608 self.rtset(self.rel_speed)
609
610 last_event_time = 0
611 while self._timestamps and not self._stop:
612 next_event_time = self.peek()
613
614 if last_event_time != next_event_time:
615 while not self.do_step:
616 flag = self.net_conn.recv(1)
617 if flag == '':
618 continue
619 elif flag == QUIT:
620 HANDLERS[QUIT](self)
621 break
622 try:
623 HANDLERS[flag](self)
624 except KeyError:
625 print '\n\n',repr(flag)
626 raise KeyError
627 if self._stop:
628 break
629 self.net_conn.send(ACK)
630 if __debug__:
631 print '\tdoing it'
632 pass
633
634 last_event_time = next_event_time
635 if next_event_time > until:
636 break
637
638
639 if real_time:
640 delay = (
641 next_event_time / self.rel_speed -
642 (self.wallclock() - self.rtstart)
643 )
644 if delay > 0: time.sleep(delay)
645
646
647 while self.messages and self.messages[0].time < next_event_time:
648
649 heappop(self.messages)()
650 current_step = self._t
651 while self._t == current_step:
652 self.step()
653 if self.do_step:
654
655 if self.data_to_send:
656 self.net_conn.send(STEP_DONE_GOT_DATA)
657 self.get_data_from_sim()
658 else:
659 self.net_conn.send(STEP_DONE)
660 self.do_step -= 1
661 if __debug__:
662 print '\tnow at:', self.now()
663
664
665
666
667 for m in self.monitors:
668 m.end()
669 if not self._stop:
670
671 self.net_conn.send(SIM_ENDED)
672 self.shutdownServer()
673 if not self._stop and self._timestamps:
674 self._t = until
675 return 'SimPy: Normal exit'
676 else:
677 return 'SimPy: No activities scheduled'
678
680 """Prepare a person to be send to another simulation.
681
682 Necessary data is extracted from the person and prepared for transfer using json.
683 This does NOT remove the person from the simulation. That must be done elsewhere.
684 @param person: The person to be send,
685 @param node_id: ID of the node that is used as exit.
686
687 """
688 try:
689 osm_node_id = self.geo.map_nodeid_osmnodeid[node_id]
690 except KeyError:
691 print 'Invalid node_id for send_person!'
692 return
693
694 to_sim = 'asdf'
695 props = person.get_properties()
696 message_type = 'P'
697 props_json = json.dumps(props)
698 self.data_to_send.append((to_sim, message_type, props_json))
699
700
701 if __name__ == '__main__':
702 sims = [50001,
703
704 ]
705 disp = Controller(sims)
706 disp.run(5000)
707 disp.shutdown()
708