Vega strike Python Modules doc  0.5.1
Documentation of the " Modules " folder of Vega strike
 All Data Structures Namespaces Files Functions Variables
CGIHTTPServer.py
Go to the documentation of this file.
1 """CGI-savvy HTTP Server.
2 
3 This module builds on SimpleHTTPServer by implementing GET and POST
4 requests to cgi-bin scripts.
5 
6 If the os.fork() function is not present (e.g. on Windows),
7 os.popen2() is used as a fallback, with slightly altered semantics; if
8 that function is not present either (e.g. on Macintosh), only Python
9 scripts are supported, and they are executed by the current process.
10 
11 In all cases, the implementation is intentionally naive -- all
12 requests are executed sychronously.
13 
14 SECURITY WARNING: DON'T USE THIS CODE UNLESS YOU ARE INSIDE A FIREWALL
15 -- it may execute arbitrary Python code or external programs.
16 
17 """
18 
19 
20 __version__ = "0.4"
21 
22 __all__ = ["CGIHTTPRequestHandler"]
23 
24 import os
25 import sys
26 import urllib
27 import BaseHTTPServer
28 import SimpleHTTPServer
29 
30 
32 
33  """Complete HTTP server with GET, HEAD and POST commands.
34 
35  GET and HEAD also support running CGI scripts.
36 
37  The POST command is *only* implemented for CGI scripts.
38 
39  """
40 
41  # Determine platform specifics
42  have_fork = hasattr(os, 'fork')
43  have_popen2 = hasattr(os, 'popen2')
44  have_popen3 = hasattr(os, 'popen3')
45 
46  # Make rfile unbuffered -- we need to read one line and then pass
47  # the rest to a subprocess, so we can't use buffered input.
48  rbufsize = 0
49 
50  def do_POST(self):
51  """Serve a POST request.
52 
53  This is only implemented for CGI scripts.
54 
55  """
56 
57  if self.is_cgi():
58  self.run_cgi()
59  else:
60  self.send_error(501, "Can only POST to CGI scripts")
61 
62  def send_head(self):
63  """Version of send_head that support CGI scripts"""
64  if self.is_cgi():
65  return self.run_cgi()
66  else:
68 
69  def is_cgi(self):
70  """Test whether self.path corresponds to a CGI script.
71 
72  Return a tuple (dir, rest) if self.path requires running a
73  CGI script, None if not. Note that rest begins with a
74  slash if it is not empty.
75 
76  The default implementation tests whether the path
77  begins with one of the strings in the list
78  self.cgi_directories (and the next character is a '/'
79  or the end of the string).
80 
81  """
82 
83  path = self.path
84 
85  for x in self.cgi_directories:
86  i = len(x)
87  if path[:i] == x and (not path[i:] or path[i] == '/'):
88  self.cgi_info = path[:i], path[i+1:]
89  return 1
90  return 0
91 
92  cgi_directories = ['/cgi-bin', '/htbin']
93 
94  def is_executable(self, path):
95  """Test whether argument path is an executable file."""
96  return executable(path)
97 
98  def is_python(self, path):
99  """Test whether argument path is a Python script."""
100  head, tail = os.path.splitext(path)
101  return tail.lower() in (".py", ".pyw")
102 
103  def run_cgi(self):
104  """Execute a CGI script."""
105  dir, rest = self.cgi_info
106  i = rest.rfind('?')
107  if i >= 0:
108  rest, query = rest[:i], rest[i+1:]
109  else:
110  query = ''
111  i = rest.find('/')
112  if i >= 0:
113  script, rest = rest[:i], rest[i:]
114  else:
115  script, rest = rest, ''
116  scriptname = dir + '/' + script
117  scriptfile = self.translate_path(scriptname)
118  if not os.path.exists(scriptfile):
119  self.send_error(404, "No such CGI script (%s)" % `scriptname`)
120  return
121  if not os.path.isfile(scriptfile):
122  self.send_error(403, "CGI script is not a plain file (%s)" %
123  `scriptname`)
124  return
125  ispy = self.is_python(scriptname)
126  if not ispy:
127  if not (self.have_fork or self.have_popen2 or self.have_popen3):
128  self.send_error(403, "CGI script is not a Python script (%s)" %
129  `scriptname`)
130  return
131  if not self.is_executable(scriptfile):
132  self.send_error(403, "CGI script is not executable (%s)" %
133  `scriptname`)
134  return
135 
136  # Reference: http://hoohoo.ncsa.uiuc.edu/cgi/env.html
137  # XXX Much of the following could be prepared ahead of time!
138  env = {}
139  env['SERVER_SOFTWARE'] = self.version_string()
140  env['SERVER_NAME'] = self.server.server_name
141  env['GATEWAY_INTERFACE'] = 'CGI/1.1'
142  env['SERVER_PROTOCOL'] = self.protocol_version
143  env['SERVER_PORT'] = str(self.server.server_port)
144  env['REQUEST_METHOD'] = self.command
145  uqrest = urllib.unquote(rest)
146  env['PATH_INFO'] = uqrest
147  env['PATH_TRANSLATED'] = self.translate_path(uqrest)
148  env['SCRIPT_NAME'] = scriptname
149  if query:
150  env['QUERY_STRING'] = query
151  host = self.address_string()
152  if host != self.client_address[0]:
153  env['REMOTE_HOST'] = host
154  env['REMOTE_ADDR'] = self.client_address[0]
155  # XXX AUTH_TYPE
156  # XXX REMOTE_USER
157  # XXX REMOTE_IDENT
158  if self.headers.typeheader is None:
159  env['CONTENT_TYPE'] = self.headers.type
160  else:
161  env['CONTENT_TYPE'] = self.headers.typeheader
162  length = self.headers.getheader('content-length')
163  if length:
164  env['CONTENT_LENGTH'] = length
165  accept = []
166  for line in self.headers.getallmatchingheaders('accept'):
167  if line[:1] in "\t\n\r ":
168  accept.append(line.strip())
169  else:
170  accept = accept + line[7:].split(',')
171  env['HTTP_ACCEPT'] = ','.join(accept)
172  ua = self.headers.getheader('user-agent')
173  if ua:
174  env['HTTP_USER_AGENT'] = ua
175  co = filter(None, self.headers.getheaders('cookie'))
176  if co:
177  env['HTTP_COOKIE'] = ', '.join(co)
178  # XXX Other HTTP_* headers
179  if not self.have_fork:
180  # Since we're setting the env in the parent, provide empty
181  # values to override previously set values
182  for k in ('QUERY_STRING', 'REMOTE_HOST', 'CONTENT_LENGTH',
183  'HTTP_USER_AGENT', 'HTTP_COOKIE'):
184  env.setdefault(k, "")
185 
186  self.send_response(200, "Script output follows")
187 
188  decoded_query = query.replace('+', ' ')
189 
190  if self.have_fork:
191  # Unix -- fork as we should
192  args = [script]
193  if '=' not in decoded_query:
194  args.append(decoded_query)
195  nobody = nobody_uid()
196  self.rfile.flush() # Always flush before forking
197  self.wfile.flush() # Always flush before forking
198  pid = os.fork()
199  if pid != 0:
200  # Parent
201  pid, sts = os.waitpid(pid, 0)
202  if sts:
203  self.log_error("CGI script exit status %#x", sts)
204  return
205  # Child
206  try:
207  try:
208  os.setuid(nobody)
209  except os.error:
210  pass
211  os.dup2(self.rfile.fileno(), 0)
212  os.dup2(self.wfile.fileno(), 1)
213  os.execve(scriptfile, args, env)
214  except:
215  self.server.handle_error(self.request, self.client_address)
216  os._exit(127)
217 
218  elif self.have_popen2 or self.have_popen3:
219  # Windows -- use popen2 or popen3 to create a subprocess
220  import shutil
221  if self.have_popen3:
222  popenx = os.popen3
223  else:
224  popenx = os.popen2
225  os.environ.update(env)
226  cmdline = scriptfile
227  if self.is_python(scriptfile):
228  interp = sys.executable
229  if interp.lower().endswith("w.exe"):
230  # On Windows, use python.exe, not pythonw.exe
231  interp = interp[:-5] + interp[-4:]
232  cmdline = "%s -u %s" % (interp, cmdline)
233  if '=' not in query and '"' not in query:
234  cmdline = '%s "%s"' % (cmdline, query)
235  self.log_message("command: %s", cmdline)
236  try:
237  nbytes = int(length)
238  except:
239  nbytes = 0
240  files = popenx(cmdline, 'b')
241  fi = files[0]
242  fo = files[1]
243  if self.have_popen3:
244  fe = files[2]
245  if self.command.lower() == "post" and nbytes > 0:
246  data = self.rfile.read(nbytes)
247  fi.write(data)
248  fi.close()
249  shutil.copyfileobj(fo, self.wfile)
250  if self.have_popen3:
251  errors = fe.read()
252  fe.close()
253  if errors:
254  self.log_error('%s', errors)
255  sts = fo.close()
256  if sts:
257  self.log_error("CGI script exit status %#x", sts)
258  else:
259  self.log_message("CGI script exited OK")
260 
261  else:
262  # Other O.S. -- execute script in this process
263  os.environ.update(env)
264  save_argv = sys.argv
265  save_stdin = sys.stdin
266  save_stdout = sys.stdout
267  save_stderr = sys.stderr
268  try:
269  try:
270  sys.argv = [scriptfile]
271  if '=' not in decoded_query:
272  sys.argv.append(decoded_query)
273  sys.stdout = self.wfile
274  sys.stdin = self.rfile
275  execfile(scriptfile, {"__name__": "__main__"})
276  finally:
277  sys.argv = save_argv
278  sys.stdin = save_stdin
279  sys.stdout = save_stdout
280  sys.stderr = save_stderr
281  except SystemExit, sts:
282  self.log_error("CGI script exit status %s", str(sts))
283  else:
284  self.log_message("CGI script exited OK")
285 
286 
287 nobody = None
288 
290  """Internal routine to get nobody's uid"""
291  global nobody
292  if nobody:
293  return nobody
294  try:
295  import pwd
296  except ImportError:
297  return -1
298  try:
299  nobody = pwd.getpwnam('nobody')[2]
300  except KeyError:
301  nobody = 1 + max(map(lambda x: x[2], pwd.getpwall()))
302  return nobody
303 
304 
305 def executable(path):
306  """Test for executable file."""
307  try:
308  st = os.stat(path)
309  except os.error:
310  return 0
311  return st[0] & 0111 != 0
312 
313 
314 def test(HandlerClass = CGIHTTPRequestHandler,
316  SimpleHTTPServer.test(HandlerClass, ServerClass)
317 
318 
319 if __name__ == '__main__':
320  test()