7 Public functions: Internaldate2tuple
21 import binascii, re, socket, time, random, sys
23 __all__ = [
"IMAP4",
"Internaldate2tuple",
24 "Int2AP",
"ParseFlags",
"Time2Internaldate"]
31 AllowedVersions = (
'IMAP4REV1',
'IMAP4')
37 'APPEND': (
'AUTH',
'SELECTED'),
38 'AUTHENTICATE': (
'NONAUTH',),
39 'CAPABILITY': (
'NONAUTH',
'AUTH',
'SELECTED',
'LOGOUT'),
40 'CHECK': (
'SELECTED',),
41 'CLOSE': (
'SELECTED',),
42 'COPY': (
'SELECTED',),
43 'CREATE': (
'AUTH',
'SELECTED'),
44 'DELETE': (
'AUTH',
'SELECTED'),
45 'EXAMINE': (
'AUTH',
'SELECTED'),
46 'EXPUNGE': (
'SELECTED',),
47 'FETCH': (
'SELECTED',),
48 'GETACL': (
'AUTH',
'SELECTED'),
49 'LIST': (
'AUTH',
'SELECTED'),
50 'LOGIN': (
'NONAUTH',),
51 'LOGOUT': (
'NONAUTH',
'AUTH',
'SELECTED',
'LOGOUT'),
52 'LSUB': (
'AUTH',
'SELECTED'),
53 'NAMESPACE': (
'AUTH',
'SELECTED'),
54 'NOOP': (
'NONAUTH',
'AUTH',
'SELECTED',
'LOGOUT'),
55 'PARTIAL': (
'SELECTED',),
56 'RENAME': (
'AUTH',
'SELECTED'),
57 'SEARCH': (
'SELECTED',),
58 'SELECT': (
'AUTH',
'SELECTED'),
59 'SETACL': (
'AUTH',
'SELECTED'),
60 'SORT': (
'SELECTED',),
61 'STATUS': (
'AUTH',
'SELECTED'),
62 'STORE': (
'SELECTED',),
63 'SUBSCRIBE': (
'AUTH',
'SELECTED'),
65 'UNSUBSCRIBE': (
'AUTH',
'SELECTED'),
70 Continuation = re.compile(
r'\+( (?P<data>.*))?')
71 Flags = re.compile(
r'.*FLAGS \((?P<flags>[^\)]*)\)')
72 InternalDate = re.compile(
r'.*INTERNALDATE "'
73 r'(?P<day>[ 123][0-9])-(?P<mon>[A-Z][a-z][a-z])-(?P<year>[0-9][0-9][0-9][0-9])'
74 r' (?P<hour>[0-9][0-9]):(?P<min>[0-9][0-9]):(?P<sec>[0-9][0-9])'
75 r' (?P<zonen>[-+])(?P<zoneh>[0-9][0-9])(?P<zonem>[0-9][0-9])'
77 Literal = re.compile(
r'.*{(?P<size>\d+)}$')
78 Response_code = re.compile(
r'\[(?P<type>[A-Z-]+)( (?P<data>[^\]]*))?\]')
79 Untagged_response = re.compile(
r'\* (?P<type>[A-Z-]+)( (?P<data>.*))?')
80 Untagged_status = re.compile(
r'\* (?P<data>\d+) (?P<type>[A-Z-]+)( (?P<data2>.*))?')
86 """IMAP4 client class.
88 Instantiate with: IMAP4([host[, port]])
90 host - host's name (default: localhost);
91 port - port number (default: standard IMAP4 port).
93 All IMAP4rev1 commands are supported by methods of the same
96 All arguments to commands are converted to strings, except for
97 AUTHENTICATE, and the last argument to APPEND which is passed as
98 an IMAP4 literal. If necessary (the string contains any
99 non-printing characters or white-space and isn't enclosed with
100 either parentheses or double quotes) each string is quoted.
101 However, the 'password' argument to the LOGIN command is always
102 quoted. If you want to avoid having an argument string quoted
103 (eg: the 'flags' argument to STORE) then enclose the string in
104 parentheses (eg: "(\Deleted)").
106 Each command returns a tuple: (type, [data, ...]) where 'type'
107 is usually 'OK' or 'NO', and 'data' is either the text from the
108 tagged response, or untagged results from command.
110 Errors raise the exception class <instance>.error("<reason>").
111 IMAP4 server errors raise <instance>.abort("<reason>"),
112 which is a sub-class of 'error'. Mailbox status changes
113 from READ-WRITE to READ-ONLY raise the exception class
114 <instance>.readonly("<reason>"), which is a sub-class of 'abort'.
116 "error" exceptions imply a program error.
117 "abort" exceptions imply the connection should be reset, and
118 the command re-tried.
119 "readonly" exceptions imply the command should be re-tried.
121 Note: to use this module, you must read the RFCs pertaining
122 to the IMAP4 protocol, as the semantics of the arguments to
123 each IMAP4 command are left to the invoker, not to mention
127 class error(Exception):
pass
128 class abort(
error):
pass
129 class readonly(abort):
pass
131 mustquote = re.compile(
r"[^\w!#$%&'*+,.:;<=>?^`|~-]")
133 def __init__(self, host = '', port = IMAP4_PORT):
137 self.state =
'LOGOUT'
139 self.tagged_commands = {}
140 self.untagged_responses = {}
141 self.continuation_response =
''
142 self.is_readonly =
None
147 self.open(host, port)
153 self.tagre = re.compile(
r'(?P<tag>'
155 +
r'\d+) (?P<type>[A-Z]+) (?P<data>.*)')
162 _mesg(
'imaplib version %s' % __version__)
163 _mesg(
'new IMAP4 connection, tag=%s' % self.tagpre)
165 self.welcome = self._get_response()
166 if self.untagged_responses.has_key(
'PREAUTH'):
168 elif self.untagged_responses.has_key(
'OK'):
169 self.state =
'NONAUTH'
171 raise self.error(self.welcome)
174 self._simple_command(cap)
175 if not self.untagged_responses.has_key(cap):
176 raise self.error(
'no CAPABILITY response from server')
177 self.capabilities =
tuple(self.untagged_responses[cap][-1].
upper().
split())
181 _mesg(
'CAPABILITIES: %s' % `self.capabilities`)
183 for version
in AllowedVersions:
184 if not version
in self.capabilities:
186 self.PROTOCOL_VERSION = version
189 raise self.error(
'server not IMAP4 compliant')
194 if Commands.has_key(attr):
195 return getattr(self, attr.lower())
196 raise AttributeError(
"Unknown IMAP4 command: '%s'" % attr)
203 def open(self, host, port):
204 """Setup connection to remote server on "host:port".
205 This connection will be used by the routines:
206 read, readline, send, shutdown.
208 self.sock =
socket.socket(socket.AF_INET, socket.SOCK_STREAM)
209 self.sock.connect((self.host, self.port))
210 self.file = self.sock.makefile(
'rb')
213 def read(self, size):
214 """Read 'size' bytes from remote."""
215 return self.file.read(size)
219 """Read line from remote."""
220 return self.file.readline()
223 def send(self, data):
224 """Send data to remote."""
225 self.sock.sendall(data)
228 """Close I/O established in "open"."""
234 """Return socket instance used to connect to IMAP4 server.
236 socket = <instance>.socket()
246 """Return most recent 'RECENT' responses if any exist,
247 else prompt server for an update using the 'NOOP' command.
249 (typ, [data]) = <instance>.recent()
251 'data' is None if no new messages,
252 else list of RECENT responses, most recent last.
255 typ, dat = self._untagged_response(
'OK', [
None], name)
258 typ, dat = self.noop()
259 return self._untagged_response(typ, dat, name)
262 def response(self, code):
263 """Return data for response 'code' if received, or None.
265 Old value for response 'code' is cleared.
267 (code, [data]) = <instance>.response(code)
269 return self._untagged_response(code, [
None], code.upper())
276 def append(self, mailbox, flags, date_time, message):
277 """Append message to named mailbox.
279 (typ, [data]) = <instance>.append(mailbox, flags, date_time, message)
281 All args except `message' can be None.
287 if (flags[0],flags[-1]) != (
'(',
')'):
288 flags =
'(%s)' % flags
295 self.literal = message
296 return self._simple_command(name, mailbox, flags, date_time)
299 def authenticate(self, mechanism, authobject):
300 """Authenticate command - requires response processing.
302 'mechanism' specifies which authentication mechanism is to
303 be used - it must appear in <instance>.capabilities in the
304 form AUTH=<mechanism>.
306 'authobject' must be a callable object:
308 data = authobject(response)
310 It will be called to process server continuation responses.
311 It should return data that will be encoded and sent to server.
312 It should return None if the client abort response '*' should
315 mech = mechanism.upper()
316 cap =
'AUTH=%s' % mech
317 if not cap
in self.capabilities:
318 raise self.error(
"Server doesn't allow %s authentication." % mech)
319 self.literal = _Authenticator(authobject).process
320 typ, dat = self._simple_command(
'AUTHENTICATE', mech)
322 raise self.error(dat[-1])
328 """Checkpoint mailbox on server.
330 (typ, [data]) = <instance>.check()
332 return self._simple_command(
'CHECK')
336 """Close currently selected mailbox.
338 Deleted messages are removed from writable mailbox.
339 This is the recommended command before 'LOGOUT'.
341 (typ, [data]) = <instance>.close()
344 typ, dat = self._simple_command(
'CLOSE')
350 def copy(self, message_set, new_mailbox):
351 """Copy 'message_set' messages onto end of 'new_mailbox'.
353 (typ, [data]) = <instance>.copy(message_set, new_mailbox)
355 return self._simple_command(
'COPY', message_set, new_mailbox)
358 def create(self, mailbox):
359 """Create new mailbox.
361 (typ, [data]) = <instance>.create(mailbox)
363 return self._simple_command(
'CREATE', mailbox)
366 def delete(self, mailbox):
367 """Delete old mailbox.
369 (typ, [data]) = <instance>.delete(mailbox)
371 return self._simple_command(
'DELETE', mailbox)
375 """Permanently remove deleted items from selected mailbox.
377 Generates 'EXPUNGE' response for each deleted message.
379 (typ, [data]) = <instance>.expunge()
381 'data' is list of 'EXPUNGE'd message numbers in order received.
384 typ, dat = self._simple_command(name)
385 return self._untagged_response(typ, dat, name)
388 def fetch(self, message_set, message_parts):
389 """Fetch (parts of) messages.
391 (typ, [data, ...]) = <instance>.fetch(message_set, message_parts)
393 'message_parts' should be a string of selected parts
394 enclosed in parentheses, eg: "(UID BODY[TEXT])".
396 'data' are tuples of message part envelope and data.
399 typ, dat = self._simple_command(name, message_set, message_parts)
400 return self._untagged_response(typ, dat, name)
403 def getacl(self, mailbox):
404 """Get the ACLs for a mailbox.
406 (typ, [data]) = <instance>.getacl(mailbox)
408 typ, dat = self._simple_command(
'GETACL', mailbox)
409 return self._untagged_response(typ, dat,
'ACL')
412 def list(self, directory='""', pattern='*'):
413 """List mailbox names in directory matching pattern.
415 (typ, [data]) = <instance>.list(directory='""', pattern='*')
417 'data' is list of LIST responses.
420 typ, dat = self._simple_command(name, directory, pattern)
421 return self._untagged_response(typ, dat, name)
424 def login(self, user, password):
425 """Identify client using plaintext password.
427 (typ, [data]) = <instance>.login(user, password)
429 NB: 'password' will be quoted.
433 typ, dat = self._simple_command(
'LOGIN', user, self._quote(password))
435 raise self.error(dat[-1])
441 """Shutdown connection to server.
443 (typ, [data]) = <instance>.logout()
445 Returns server 'BYE' response.
447 self.state =
'LOGOUT'
448 try: typ, dat = self._simple_command(
'LOGOUT')
449 except: typ, dat =
'NO', [
'%s: %s' % sys.exc_info()[:2]]
451 if self.untagged_responses.has_key(
'BYE'):
452 return 'BYE', self.untagged_responses[
'BYE']
456 def lsub(self, directory='""', pattern='*'):
457 """List 'subscribed' mailbox names in directory matching pattern.
459 (typ, [data, ...]) = <instance>.lsub(directory='""', pattern='*')
461 'data' are tuples of message part envelope and data.
464 typ, dat = self._simple_command(name, directory, pattern)
465 return self._untagged_response(typ, dat, name)
469 """ Returns IMAP namespaces ala rfc2342
471 (typ, [data, ...]) = <instance>.namespace()
474 typ, dat = self._simple_command(name)
475 return self._untagged_response(typ, dat, name)
479 """Send NOOP command.
481 (typ, data) = <instance>.noop()
485 _dump_ur(self.untagged_responses)
486 return self._simple_command(
'NOOP')
489 def partial(self, message_num, message_part, start, length):
490 """Fetch truncated part of a message.
492 (typ, [data, ...]) = <instance>.partial(message_num, message_part, start, length)
494 'data' is tuple of message part envelope and data.
497 typ, dat = self._simple_command(name, message_num, message_part, start, length)
498 return self._untagged_response(typ, dat,
'FETCH')
501 def rename(self, oldmailbox, newmailbox):
502 """Rename old mailbox name to new.
504 (typ, data) = <instance>.rename(oldmailbox, newmailbox)
506 return self._simple_command(
'RENAME', oldmailbox, newmailbox)
509 def search(self, charset, *criteria):
510 """Search mailbox for matching messages.
512 (typ, [data]) = <instance>.search(charset, criterium, ...)
514 'data' is space separated list of matching message numbers.
518 typ, dat = apply(self._simple_command, (name,
'CHARSET', charset) + criteria)
520 typ, dat = apply(self._simple_command, (name,) + criteria)
521 return self._untagged_response(typ, dat, name)
524 def select(self, mailbox='INBOX', readonly=None):
527 Flush all untagged responses.
529 (typ, [data]) = <instance>.select(mailbox='INBOX', readonly=None)
531 'data' is count of messages in mailbox ('EXISTS' response).
534 self.untagged_responses = {}
535 self.is_readonly = readonly
540 typ, dat = self._simple_command(name, mailbox)
544 self.state =
'SELECTED'
545 if self.untagged_responses.has_key(
'READ-ONLY') \
549 _dump_ur(self.untagged_responses)
550 raise self.readonly(
'%s is not writable' % mailbox)
551 return typ, self.untagged_responses.get(
'EXISTS', [
None])
554 def setacl(self, mailbox, who, what):
555 """Set a mailbox acl.
557 (typ, [data]) = <instance>.create(mailbox, who, what)
559 return self._simple_command(
'SETACL', mailbox, who, what)
562 def sort(self, sort_criteria, charset, *search_criteria):
563 """IMAP4rev1 extension SORT command.
565 (typ, [data]) = <instance>.sort(sort_criteria, charset, search_criteria, ...)
570 if (sort_criteria[0],sort_criteria[-1]) != (
'(',
')'):
571 sort_criteria =
'(%s)' % sort_criteria
572 typ, dat = apply(self._simple_command, (name, sort_criteria, charset) + search_criteria)
573 return self._untagged_response(typ, dat, name)
576 def status(self, mailbox, names):
577 """Request named status conditions for mailbox.
579 (typ, [data]) = <instance>.status(mailbox, names)
584 typ, dat = self._simple_command(name, mailbox, names)
585 return self._untagged_response(typ, dat, name)
588 def store(self, message_set, command, flags):
589 """Alters flag dispositions for messages in mailbox.
591 (typ, [data]) = <instance>.store(message_set, command, flags)
593 if (flags[0],flags[-1]) != (
'(',
')'):
594 flags =
'(%s)' % flags
595 typ, dat = self._simple_command(
'STORE', message_set, command, flags)
596 return self._untagged_response(typ, dat,
'FETCH')
599 def subscribe(self, mailbox):
600 """Subscribe to new mailbox.
602 (typ, [data]) = <instance>.subscribe(mailbox)
604 return self._simple_command(
'SUBSCRIBE', mailbox)
607 def uid(self, command, *args):
608 """Execute "command arg ..." with messages identified by UID,
609 rather than message number.
611 (typ, [data]) = <instance>.uid(command, arg1, arg2, ...)
613 Returns response appropriate to 'command'.
615 command = command.upper()
616 if not Commands.has_key(command):
617 raise self.error(
"Unknown IMAP4 UID command: %s" % command)
618 if self.state
not in Commands[command]:
619 raise self.error(
'command %s illegal in state %s'
620 % (command, self.state))
622 typ, dat = apply(self._simple_command, (name, command) + args)
623 if command
in (
'SEARCH',
'SORT'):
627 return self._untagged_response(typ, dat, name)
630 def unsubscribe(self, mailbox):
631 """Unsubscribe from old mailbox.
633 (typ, [data]) = <instance>.unsubscribe(mailbox)
635 return self._simple_command(
'UNSUBSCRIBE', mailbox)
638 def xatom(self, name, *args):
639 """Allow simple extension commands
640 notified by server in CAPABILITY response.
642 Assumes command is legal in current state.
644 (typ, [data]) = <instance>.xatom(name, arg, ...)
646 Returns response appropriate to extension command `name'.
651 if not Commands.has_key(name):
652 Commands[name] = (self.state,)
653 return apply(self._simple_command, (name,) + args)
660 def _append_untagged(self, typ, dat):
662 if dat
is None: dat =
''
663 ur = self.untagged_responses
666 _mesg(
'untagged_responses[%s] %s += ["%s"]' %
667 (typ, len(ur.get(typ,
'')), dat))
674 def _check_bye(self):
675 bye = self.untagged_responses.get(
'BYE')
677 raise self.abort(bye[-1])
680 def _command(self, name, *args):
682 if self.state
not in Commands[name]:
685 'command %s illegal in state %s' % (name, self.state))
687 for typ
in (
'OK',
'NO',
'BAD'):
688 if self.untagged_responses.has_key(typ):
689 del self.untagged_responses[typ]
691 if self.untagged_responses.has_key(
'READ-ONLY') \
692 and not self.is_readonly:
693 raise self.readonly(
'mailbox status changed to READ-ONLY')
695 tag = self._new_tag()
696 data =
'%s %s' % (tag, name)
698 if arg
is None:
continue
699 data =
'%s %s' % (data, self._checkquote(arg))
701 literal = self.literal
702 if literal
is not None:
704 if type(literal)
is type(self._command):
708 data =
'%s {%s}' % (data, len(literal))
717 self.send(
'%s%s' % (data, CRLF))
718 except (socket.error, OSError), val:
719 raise self.abort(
'socket error: %s' % val)
727 while self._get_response():
728 if self.tagged_commands[tag]:
734 literal = literator(self.continuation_response)
738 _mesg(
'write literal size %s' % len(literal))
743 except (socket.error, OSError), val:
744 raise self.abort(
'socket error: %s' % val)
752 def _command_complete(self, name, tag):
755 typ, data = self._get_tagged_response(tag)
756 except self.abort, val:
757 raise self.abort(
'command: %s => %s' % (name, val))
758 except self.error, val:
759 raise self.error(
'command: %s => %s' % (name, val))
762 raise self.error(
'%s command error: %s %s' % (name, typ, data))
766 def _get_response(self):
773 resp = self._get_line()
777 if self._match(self.tagre, resp):
778 tag = self.mo.group(
'tag')
779 if not self.tagged_commands.has_key(tag):
780 raise self.abort(
'unexpected tagged response: %s' % resp)
782 typ = self.mo.group(
'type')
783 dat = self.mo.group(
'data')
784 self.tagged_commands[tag] = (typ, [dat])
790 if not self._match(Untagged_response, resp):
791 if self._match(Untagged_status, resp):
792 dat2 = self.mo.group(
'data2')
797 if self._match(Continuation, resp):
798 self.continuation_response = self.mo.group(
'data')
801 raise self.abort(
"unexpected response: '%s'" % resp)
803 typ = self.mo.group(
'type')
804 dat = self.mo.group(
'data')
805 if dat
is None: dat =
''
806 if dat2: dat = dat +
' ' + dat2
810 while self._match(Literal, dat):
814 size = int(self.mo.group(
'size'))
817 _mesg(
'read literal size %s' % size)
818 data = self.read(size)
822 self._append_untagged(typ, (dat, data))
826 dat = self._get_line()
828 self._append_untagged(typ, dat)
832 if typ
in (
'OK',
'NO',
'BAD')
and self._match(Response_code, dat):
833 self._append_untagged(self.mo.group(
'type'), self.mo.group(
'data'))
836 if self.debug >= 1
and typ
in (
'NO',
'BAD',
'BYE'):
837 _mesg(
'%s response: %s' % (typ, dat))
842 def _get_tagged_response(self, tag):
845 result = self.tagged_commands[tag]
846 if result
is not None:
847 del self.tagged_commands[tag]
857 except self.abort, val:
866 line = self.readline()
868 raise self.abort(
'socket error: EOF')
881 def _match(self, cre, s):
886 self.mo = cre.match(s)
888 if self.mo
is not None and self.debug >= 5:
889 _mesg(
"\tmatched r'%s' => %s" % (cre.pattern, `self.mo.groups()`))
890 return self.mo
is not None
895 tag =
'%s%s' % (self.tagpre, self.tagnum)
896 self.tagnum = self.tagnum + 1
897 self.tagged_commands[tag] =
None
901 def _checkquote(self, arg):
906 if type(arg)
is not type(
''):
908 if (arg[0],arg[-1])
in ((
'(',
')'),(
'"',
'"')):
910 if self.mustquote.search(arg)
is None:
912 return self._quote(arg)
915 def _quote(self, arg):
917 arg = arg.replace(
'\\',
'\\\\')
918 arg = arg.replace(
'"',
'\\"')
923 def _simple_command(self, name, *args):
925 return self._command_complete(name, apply(self._command, (name,) + args))
928 def _untagged_response(self, typ, dat, name):
932 if not self.untagged_responses.has_key(name):
934 data = self.untagged_responses[name]
937 _mesg(
'untagged_responses[%s] => %s' % (name, data))
938 del self.untagged_responses[name]
943 class _Authenticator:
945 """Private class to provide en/decoding
946 for base64-based authentication conversation.
952 def process(self, data):
953 ret = self.mech(self.decode(data))
956 return self.encode(ret)
975 e = binascii.b2a_base64(t)
983 return binascii.a2b_base64(inp)
987 Mon2num = {
'Jan': 1,
'Feb': 2,
'Mar': 3,
'Apr': 4,
'May': 5,
'Jun': 6,
988 'Jul': 7,
'Aug': 8,
'Sep': 9,
'Oct': 10,
'Nov': 11,
'Dec': 12}
991 """Convert IMAP4 INTERNALDATE to UT.
993 Returns Python time module tuple.
996 mo = InternalDate.match(resp)
1000 mon = Mon2num[mo.group(
'mon')]
1001 zonen = mo.group(
'zonen')
1003 day = int(mo.group(
'day'))
1004 year = int(mo.group(
'year'))
1005 hour = int(mo.group(
'hour'))
1006 min = int(mo.group(
'min'))
1007 sec = int(mo.group(
'sec'))
1008 zoneh = int(mo.group(
'zoneh'))
1009 zonem = int(mo.group(
'zonem'))
1013 zone = (zoneh*60 + zonem)*60
1017 tt = (year, mon, day, hour, min, sec, -1, -1, -1)
1019 utc = time.mktime(tt)
1024 lt = time.localtime(utc)
1025 if time.daylight
and lt[-1]:
1026 zone = zone + time.altzone
1028 zone = zone + time.timezone
1030 return time.localtime(utc - zone)
1036 """Convert integer to A-P string representation."""
1038 val =
''; AP =
'ABCDEFGHIJKLMNOP'
1041 num, mod = divmod(num, 16)
1049 """Convert IMAP4 flags response to python tuple."""
1051 mo = Flags.match(resp)
1060 """Convert 'date_time' to IMAP4 INTERNALDATE representation.
1062 Return string in form: '"DD-Mmm-YYYY HH:MM:SS +HHMM"'
1065 if isinstance(date_time, (int, float)):
1066 tt = time.localtime(date_time)
1067 elif isinstance(date_time, (tuple, time.struct_time)):
1069 elif isinstance(date_time, str):
1072 raise ValueError(
"date_time not of a known type")
1074 dt = time.strftime(
"%d-%b-%Y %H:%M:%S", tt)
1077 if time.daylight
and tt[-1]:
1078 zone = -time.altzone
1080 zone = -time.timezone
1081 return '"' + dt +
" %+03d%02d" % divmod(zone/60, 60) +
'"'
1087 def _mesg(s, secs=None):
1090 tm = time.strftime(
'%M:%S', time.localtime(secs))
1091 sys.stderr.write(
' %s.%02d %s\n' % (tm, (secs*100)%100, s))
1099 l = map(
lambda x:
'%s: "%s"' % (x[0], x[1][0]
and '" "'.
join(x[1])
or ''), l)
1100 _mesg(
'untagged responses dump:%s%s' % (t, t.join(l)))
1107 if len(_cmd_log) == _cmd_log_len:
1109 _cmd_log.append((time.time(), line))
1112 _mesg(
'last %d IMAP4 interactions:' % len(_cmd_log))
1113 for secs,line
in _cmd_log:
1118 if __name__ ==
'__main__':
1120 import getopt, getpass
1124 except getopt.error, val:
1127 for opt,val
in optlist:
1131 if not args: args = (
'',)
1138 test_mesg =
'From: %(user)s@localhost%(lf)sSubject: IMAP4 test%(lf)s%(lf)sdata...%(lf)s' % {
'user':USER,
'lf':CRLF}
1140 (
'login', (USER, PASSWD)),
1141 (
'create', (
'/tmp/xxx 1',)),
1142 (
'rename', (
'/tmp/xxx 1',
'/tmp/yyy')),
1143 (
'CREATE', (
'/tmp/yyz 2',)),
1144 (
'append', (
'/tmp/yyz 2',
None,
None, test_mesg)),
1145 (
'list', (
'/tmp',
'yy*')),
1146 (
'select', (
'/tmp/yyz 2',)),
1147 (
'search', (
None,
'SUBJECT',
'test')),
1148 (
'partial', (
'1',
'RFC822', 1, 1024)),
1149 (
'store', (
'1',
'FLAGS',
'(\Deleted)')),
1158 (
'response',(
'UIDVALIDITY',)),
1159 (
'uid', (
'SEARCH',
'ALL')),
1160 (
'response', (
'EXISTS',)),
1161 (
'append', (
None,
None,
None, test_mesg)),
1167 _mesg(
'%s %s' % (cmd, args))
1168 typ, dat = apply(getattr(M, cmd), args)
1169 _mesg(
'%s => %s %s' % (cmd, typ, dat))
1174 _mesg(
'PROTOCOL_VERSION = %s' % M.PROTOCOL_VERSION)
1175 _mesg(
'CAPABILITIES = %s' % `M.capabilities`)
1177 for cmd,args
in test_seq1:
1180 for ml
in run(
'list', (
'/tmp/',
'yy%')):
1181 mo = re.match(
r'.*"([^"]+)"$', ml)
1182 if mo: path = mo.group(1)
1183 else: path = ml.split()[-1]
1184 run(
'delete', (path,))
1186 for cmd,args
in test_seq2:
1187 dat =
run(cmd, args)
1189 if (cmd,args) != (
'uid', (
'SEARCH',
'ALL')):
1192 uid = dat[-1].
split()
1193 if not uid:
continue
1194 run(
'uid', (
'FETCH',
'%s' % uid[-1],
1195 '(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER RFC822.TEXT)'))
1197 print '\nAll tests OK.'
1200 print '\nTests failed.'
1204 If you would like to see debugging output,