Vega strike Python Modules doc  0.5.1
Documentation of the " Modules " folder of Vega strike
 All Data Structures Namespaces Files Functions Variables
xdrlib.py
Go to the documentation of this file.
1 """Implements (a subset of) Sun XDR -- eXternal Data Representation.
2 
3 See: RFC 1014
4 
5 """
6 
7 import struct
8 try:
9  from cStringIO import StringIO as _StringIO
10 except ImportError:
11  from StringIO import StringIO as _StringIO
12 
13 __all__ = ["Error", "Packer", "Unpacker", "ConversionError"]
14 
15 # exceptions
16 class Error:
17  """Exception class for this module. Use:
18 
19  except xdrlib.Error, var:
20  # var has the Error instance for the exception
21 
22  Public ivars:
23  msg -- contains the message
24 
25  """
26  def __init__(self, msg):
27  self.msg = msg
28  def __repr__(self):
29  return repr(self.msg)
30  def __str__(self):
31  return str(self.msg)
32 
33 
35  pass
36 
37 
38 
39 class Packer:
40  """Pack various data representations into a buffer."""
41 
42  def __init__(self):
43  self.reset()
44 
45  def reset(self):
46  self.__buf = _StringIO()
47 
48  def get_buffer(self):
49  return self.__buf.getvalue()
50  # backwards compatibility
51  get_buf = get_buffer
52 
53  def pack_uint(self, x):
54  self.__buf.write(struct.pack('>L', x))
55 
56  pack_int = pack_uint
57  pack_enum = pack_int
58 
59  def pack_bool(self, x):
60  if x: self.__buf.write('\0\0\0\1')
61  else: self.__buf.write('\0\0\0\0')
62 
63  def pack_uhyper(self, x):
64  self.pack_uint(x>>32 & 0xffffffffL)
65  self.pack_uint(x & 0xffffffffL)
66 
67  pack_hyper = pack_uhyper
68 
69  def pack_float(self, x):
70  try: self.__buf.write(struct.pack('>f', x))
71  except struct.error, msg:
72  raise ConversionError, msg
73 
74  def pack_double(self, x):
75  try: self.__buf.write(struct.pack('>d', x))
76  except struct.error, msg:
77  raise ConversionError, msg
78 
79  def pack_fstring(self, n, s):
80  if n < 0:
81  raise ValueError, 'fstring size must be nonnegative'
82  n = ((n+3)/4)*4
83  data = s[:n]
84  data = data + (n - len(data)) * '\0'
85  self.__buf.write(data)
86 
87  pack_fopaque = pack_fstring
88 
89  def pack_string(self, s):
90  n = len(s)
91  self.pack_uint(n)
92  self.pack_fstring(n, s)
93 
94  pack_opaque = pack_string
95  pack_bytes = pack_string
96 
97  def pack_list(self, list, pack_item):
98  for item in list:
99  self.pack_uint(1)
100  pack_item(item)
101  self.pack_uint(0)
102 
103  def pack_farray(self, n, list, pack_item):
104  if len(list) != n:
105  raise ValueError, 'wrong array size'
106  for item in list:
107  pack_item(item)
108 
109  def pack_array(self, list, pack_item):
110  n = len(list)
111  self.pack_uint(n)
112  self.pack_farray(n, list, pack_item)
113 
114 
115 
116 class Unpacker:
117  """Unpacks various data representations from the given buffer."""
118 
119  def __init__(self, data):
120  self.reset(data)
121 
122  def reset(self, data):
123  self.__buf = data
124  self.__pos = 0
125 
126  def get_position(self):
127  return self.__pos
128 
129  def set_position(self, position):
130  self.__pos = position
131 
132  def get_buffer(self):
133  return self.__buf
134 
135  def done(self):
136  if self.__pos < len(self.__buf):
137  raise Error('unextracted data remains')
138 
139  def unpack_uint(self):
140  i = self.__pos
141  self.__pos = j = i+4
142  data = self.__buf[i:j]
143  if len(data) < 4:
144  raise EOFError
145  x = struct.unpack('>L', data)[0]
146  try:
147  return int(x)
148  except OverflowError:
149  return x
150 
151  def unpack_int(self):
152  i = self.__pos
153  self.__pos = j = i+4
154  data = self.__buf[i:j]
155  if len(data) < 4:
156  raise EOFError
157  return struct.unpack('>l', data)[0]
158 
159  unpack_enum = unpack_int
160  unpack_bool = unpack_int
161 
162  def unpack_uhyper(self):
163  hi = self.unpack_uint()
164  lo = self.unpack_uint()
165  return long(hi)<<32 | lo
166 
167  def unpack_hyper(self):
168  x = self.unpack_uhyper()
169  if x >= 0x8000000000000000L:
170  x = x - 0x10000000000000000L
171  return x
172 
173  def unpack_float(self):
174  i = self.__pos
175  self.__pos = j = i+4
176  data = self.__buf[i:j]
177  if len(data) < 4:
178  raise EOFError
179  return struct.unpack('>f', data)[0]
180 
181  def unpack_double(self):
182  i = self.__pos
183  self.__pos = j = i+8
184  data = self.__buf[i:j]
185  if len(data) < 8:
186  raise EOFError
187  return struct.unpack('>d', data)[0]
188 
189  def unpack_fstring(self, n):
190  if n < 0:
191  raise ValueError, 'fstring size must be nonnegative'
192  i = self.__pos
193  j = i + (n+3)/4*4
194  if j > len(self.__buf):
195  raise EOFError
196  self.__pos = j
197  return self.__buf[i:i+n]
198 
199  unpack_fopaque = unpack_fstring
200 
201  def unpack_string(self):
202  n = self.unpack_uint()
203  return self.unpack_fstring(n)
204 
205  unpack_opaque = unpack_string
206  unpack_bytes = unpack_string
207 
208  def unpack_list(self, unpack_item):
209  list = []
210  while 1:
211  x = self.unpack_uint()
212  if x == 0: break
213  if x != 1:
214  raise ConversionError, '0 or 1 expected, got ' + `x`
215  item = unpack_item()
216  list.append(item)
217  return list
218 
219  def unpack_farray(self, n, unpack_item):
220  list = []
221  for i in range(n):
222  list.append(unpack_item())
223  return list
224 
225  def unpack_array(self, unpack_item):
226  n = self.unpack_uint()
227  return self.unpack_farray(n, unpack_item)
228 
229 
230 # test suite
231 def _test():
232  p = Packer()
233  packtest = [
234  (p.pack_uint, (9,)),
235  (p.pack_bool, (None,)),
236  (p.pack_bool, ('hello',)),
237  (p.pack_uhyper, (45L,)),
238  (p.pack_float, (1.9,)),
239  (p.pack_double, (1.9,)),
240  (p.pack_string, ('hello world',)),
241  (p.pack_list, (range(5), p.pack_uint)),
242  (p.pack_array, (['what', 'is', 'hapnin', 'doctor'], p.pack_string)),
243  ]
244  succeedlist = [1] * len(packtest)
245  count = 0
246  for method, args in packtest:
247  print 'pack test', count,
248  try:
249  apply(method, args)
250  print 'succeeded'
251  except ConversionError, var:
252  print 'ConversionError:', var.msg
253  succeedlist[count] = 0
254  count = count + 1
255  data = p.get_buffer()
256  # now verify
257  up = Unpacker(data)
258  unpacktest = [
259  (up.unpack_uint, (), lambda x: x == 9),
260  (up.unpack_bool, (), lambda x: not x),
261  (up.unpack_bool, (), lambda x: x),
262  (up.unpack_uhyper, (), lambda x: x == 45L),
263  (up.unpack_float, (), lambda x: 1.89 < x < 1.91),
264  (up.unpack_double, (), lambda x: 1.89 < x < 1.91),
265  (up.unpack_string, (), lambda x: x == 'hello world'),
266  (up.unpack_list, (up.unpack_uint,), lambda x: x == range(5)),
267  (up.unpack_array, (up.unpack_string,),
268  lambda x: x == ['what', 'is', 'hapnin', 'doctor']),
269  ]
270  count = 0
271  for method, args, pred in unpacktest:
272  print 'unpack test', count,
273  try:
274  if succeedlist[count]:
275  x = apply(method, args)
276  print pred(x) and 'succeeded' or 'failed', ':', x
277  else:
278  print 'skipping'
279  except ConversionError, var:
280  print 'ConversionError:', var.msg
281  count = count + 1
282 
283 
284 if __name__ == '__main__':
285  _test()