Source code for urlfetch

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
urlfetch
~~~~~~~~~~

An easy to use HTTP client based on httplib.

:copyright: (c) 2011-2014 by Yue Du.
:license: BSD 2-clause License, see LICENSE for more details.
"""

__version__ = '1.0.1'
__author__ = 'Yue Du <ifduyue@gmail.com>'
__url__ = 'https://github.com/ifduyue/urlfetch'
__license__ = 'BSD 2-Clause License'

import os, sys, base64, codecs, uuid, stat, time, socket
from functools import partial
from io import BytesIO
try:
    import simplejson as json
except ImportError:
    import json

py3k = sys.version_info >= (3, 0)

if py3k:
    from http.client import HTTPConnection, HTTPSConnection
    from urllib.parse import urlencode
    import urllib.parse as urlparse
    import http.cookies as Cookie
    basestring = (str, bytes)
    unicode = str
    b = lambda s: s.encode('latin-1')
    u = lambda s: s
else:
    from httplib import HTTPConnection, HTTPSConnection
    from urllib import urlencode
    import urlparse
    import Cookie
    basestring = basestring
    unicode = unicode
    b = lambda s: s
    u = lambda s: unicode(s, 'unicode_escape')


__all__ = ('request', 'fetch', 'Session',
           'get', 'head', 'put', 'post', 'delete',
           'options', 'trace', 'patch',
           'UrlfetchException', 'ContentLimitExceeded', 'URLError',
           'ContentDecodingError', 'TooManyRedirects')


[docs]class UrlfetchException(IOError): "Base exception. All exceptions and errors will subclass from this."
[docs]class ContentLimitExceeded(UrlfetchException): "Content length is beyond the limit."
[docs]class URLError(UrlfetchException, ValueError): "Error parsing or handling the URL."
[docs]class ContentDecodingError(UrlfetchException): "Failed to decode the content."
[docs]class TooManyRedirects(UrlfetchException): """Too many redirects."""
[docs]class Timeout(UrlfetchException): """Request timed out."""
class cached_property(object): """Cached property. A property that is only computed once per instance and then replaces itself with an ordinary attribute. Deleting the attribute resets the property. """ def __init__(self, fget=None, fset=None, fdel=None, doc=None): self.__get = fget self.__set = fset self.__del = fdel self.__doc__ = doc or fget.__doc__ self.__name__ = fget.__name__ self.__module__ = fget.__module__ def __get__(self, instance, owner): if instance is None: # attribute is accessed through the owner class return self try: return instance.__dict__[self.__name__] except KeyError: value = instance.__dict__[self.__name__] = self.__get(instance) return value def __set__(self, instance, value): if instance is None: return self if self.__set is not None: value = self.__set(instance, value) instance.__dict__[self.__name__] = value def __delete__(self, instance): if instance is None: return self try: value = instance.__dict__.pop(self.__name__) except KeyError: pass else: if self.__del is not None: self.__del(instance, value) def setter(self, fset): return self.__class__(self.__get, fset, self.__del) def deleter(self, fdel): return self.__class__(self.__get, self.__set, fdel) ############################################################################## # Core Methods and Classes #################################################### ##############################################################################
[docs]class Response(object): """A Response object. >>> import urlfetch >>> response = urlfetch.get("http://docs.python.org/") >>> response.total_time 0.033042049407959 >>> response.status, response.reason, response.version (200, 'OK', 10) >>> type(response.body), len(response.body) (<type 'str'>, 8719) >>> type(response.text), len(response.text) (<type 'unicode'>, 8719) >>> response.getheader('server') 'Apache/2.2.16 (Debian)' >>> response.getheaders() [ ('content-length', '8719'), ('x-cache', 'MISS from localhost'), ('accept-ranges', 'bytes'), ('vary', 'Accept-Encoding'), ('server', 'Apache/2.2.16 (Debian)'), ('last-modified', 'Tue, 26 Jun 2012 19:23:18 GMT'), ('connection', 'close'), ('etag', '"13cc5e4-220f-4c36507ded580"'), ('date', 'Wed, 27 Jun 2012 06:50:30 GMT'), ('content-type', 'text/html'), ('x-cache-lookup', 'MISS from localhost:8080') ] >>> response.headers { 'content-length': '8719', 'x-cache': 'MISS from localhost', 'accept-ranges': 'bytes', 'vary': 'Accept-Encoding', 'server': 'Apache/2.2.16 (Debian)', 'last-modified': 'Tue, 26 Jun 2012 19:23:18 GMT', 'connection': 'close', 'etag': '"13cc5e4-220f-4c36507ded580"', 'date': 'Wed, 27 Jun 2012 06:50:30 GMT', 'content-type': 'text/html', 'x-cache-lookup': 'MISS from localhost:8080' } :raises: :class:`ContentLimitExceeded` """ def __init__(self, r, **kwargs): for k in kwargs: setattr(self, k, kwargs[k]) self._r = r # httplib.HTTPResponse self.msg = r.msg #: Status code returned by server. self.status = r.status # compatible with requests #: An alias of :attr:`status`. self.status_code = r.status #: Reason phrase returned by server. self.reason = r.reason #: HTTP protocol version used by server. #: 10 for HTTP/1.0, 11 for HTTP/1.1. self.version = r.version #: total time self.total_time = kwargs.pop('total_time', None) self.getheader = r.getheader self.getheaders = r.getheaders self._content_encoding = self.getheader('content-encoding', None) self._decoder = None try: self.length_limit = int(kwargs.get('length_limit')) except: self.length_limit = None # if content (length) size is more than length_limit, skip content_length = int(self.getheader('Content-Length', 0)) if self.length_limit and content_length > self.length_limit: self.close() raise ContentLimitExceeded("Content length is more than %d bytes" % self.length_limit)
[docs] def read(self, chunk_size=8192): """Read content (for streaming and large files) :arg int chunk_size: size of chunk, default is 8192. """ chunk = self._r.read(chunk_size) return chunk
def __iter__(self): return self def __next__(self): chunk = self.read() if not chunk: if self._decoder: chunk = self._decoder.flush() self._decoder = None return chunk else: raise StopIteration else: ce = self._content_encoding if ce in ('gzip', 'deflate'): if not self._decoder: import zlib if ce == 'gzip': self._decoder = zlib.decompressobj(16 + zlib.MAX_WBITS) else: self._decoder = zlib.decompressobj() try: return self._decoder.decompress(chunk) except zlib.error: self._decoder = zlib.decompressobj(-zlib.MAX_WBITS) try: return self._decoder.decompress(chunk) except (IOError, zlib.error) as e: raise ContentDecodingError(e) if ce: raise ContentDecodingError('Unknown encoding: %s' % ce) return chunk next = __next__ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() return False @classmethod
[docs] def from_httplib(cls, connection, **kwargs): """Make an :class:`~urlfetch.Response` object from a httplib response object.""" return cls(connection, **kwargs)
@cached_property
[docs] def body(self): """Response body. :raises: :class:`ContentLimitExceeded`, :class:`ContentDecodingError` """ content = b("") for chunk in self: content += chunk if self.length_limit and len(content) > self.length_limit: raise ContentLimitExceeded("Content length is more than %d " "bytes" % self.length_limit) return content # compatible with requests #: An alias of :attr:`body`.
@property
[docs] def content(self): return self.body
@cached_property
[docs] def text(self): """Response body in unicode.""" return mb_code(self.content)
@cached_property
[docs] def json(self): """Load response body as json. :raises: :class:`ContentDecodingError` """ try: return json.loads(self.text) except Exception as e: raise ContentDecodingError(e)
@cached_property
[docs] def headers(self): """Response headers. Response headers is a dict with all keys in lower case. >>> import urlfetch >>> response = urlfetch.get("http://docs.python.org/") >>> response.headers { 'content-length': '8719', 'x-cache': 'MISS from localhost', 'accept-ranges': 'bytes', 'vary': 'Accept-Encoding', 'server': 'Apache/2.2.16 (Debian)', 'last-modified': 'Tue, 26 Jun 2012 19:23:18 GMT', 'connection': 'close', 'etag': '"13cc5e4-220f-4c36507ded580"', 'date': 'Wed, 27 Jun 2012 06:50:30 GMT', 'content-type': 'text/html', 'x-cache-lookup': 'MISS from localhost:8080' } """ if py3k: return dict((k.lower(), v) for k, v in self.getheaders()) else: return dict(self.getheaders())
@cached_property
[docs] def cookies(self): """Cookies in dict""" c = Cookie.SimpleCookie(self.getheader('set-cookie')) sc = [(i.key, i.value) for i in c.values()] return dict(sc)
@cached_property
[docs] def cookiestring(self): """Cookie string""" cookies = self.cookies return '; '.join(['%s=%s' % (k, v) for k, v in cookies.items()])
@cached_property
[docs] def close(self): """Close the connection.""" self._r.close()
def __del__(self): self.close()
[docs]class Session(object): """A session object. :class:`urlfetch.Session` can hold common headers and cookies. Every request issued by a :class:`urlfetch.Session` object will bring u these headers and cookies. :class:`urlfetch.Session` plays a role in handling cookies, just like a cookiejar. :arg dict headers: Init headers. :arg dict cookies: Init cookies. :arg tuple auth: (username, password) for basic authentication. """ def __init__(self, headers={}, cookies={}, auth=None): """Init a :class:`~urlfetch.Session` object""" #: headers self.headers = headers.copy() #: cookies self.cookies = cookies.copy() if auth and isinstance(auth, (list, tuple)): auth = '%s:%s' % tuple(auth) auth = base64.b64encode(auth.encode('utf-8')) self.headers['Authorization'] = 'Basic ' + auth.decode('utf-8')
[docs] def putheader(self, header, value): """Add an header to default headers.""" self.headers[header] = value
[docs] def popheader(self, header): """Remove an header from default headers.""" return self.headers.pop(header)
[docs] def putcookie(self, key, value=""): """Add an cookie to default cookies.""" self.cookies[key] = value
[docs] def popcookie(self, key): """Remove an cookie from default cookies.""" return self.cookies.pop(key)
@property def cookiestring(self): """Cookie string. It's assignalbe, and will change :attr:`~.Session.cookies` correspondingly. >>> s = Session() >>> s.cookiestring = 'foo=bar; 1=2' >>> s.cookies {'1': '2', 'foo': 'bar'} """ return '; '.join(['%s=%s' % (k, v) for k, v in self.cookies.items()]) @cookiestring.setter
[docs] def cookiestring(self, value): """"Cookie string setter""" c = Cookie.SimpleCookie(value) sc = [(i.key, i.value) for i in c.values()] self.cookies = dict(sc)
[docs] def snapshot(self): session = { 'headers': self.headers.copy(), 'cookies': self.cookies.copy() } return session
[docs] def request(self, *args, **kwargs): """Issue a request.""" headers = self.headers.copy() if self.cookiestring: headers['Cookie'] = self.cookiestring headers.update(kwargs.get('headers', {})) kwargs['headers'] = headers r = request(*args, **kwargs) self.cookies.update(r.cookies) return r
[docs] def fetch(self, *args, **kwargs): """Fetch an URL""" data = kwargs.get('data', None) files = kwargs.get('files', {}) if data and isinstance(data, (basestring, dict)) or files: return self.post(*args, **kwargs) return self.get(*args, **kwargs)
[docs] def get(self, *args, **kwargs): """Issue a get request.""" kwargs['method'] = 'GET' return self.request(*args, **kwargs)
[docs] def post(self, *args, **kwargs): """Issue a post request.""" kwargs['method'] = 'POST' return self.request(*args, **kwargs)
[docs] def put(self, *args, **kwargs): """Issue a put request.""" kwargs['method'] = 'PUT' return self.request(*args, **kwargs)
[docs] def delete(self, *args, **kwargs): """Issue a delete request.""" kwargs['method'] = 'DELETE' return self.request(*args, **kwargs)
[docs] def head(self, *args, **kwargs): """Issue a head request.""" kwargs['method'] = 'HEAD' return self.request(*args, **kwargs)
[docs] def options(self, *args, **kwargs): """Issue a options request.""" kwargs['method'] = 'OPTIONS' return self.request(*args, **kwargs)
[docs] def trace(self, *args, **kwargs): """Issue a trace request.""" kwargs['method'] = 'TRACE' return self.request(*args, **kwargs)
[docs] def patch(self, *args, **kwargs): """Issue a patch request.""" kwargs['method'] = 'PATCH' return self.request(*args, **kwargs)
[docs]def fetch(*args, **kwargs): """fetch an URL. :func:`~urlfetch.fetch` is a wrapper of :func:`~urlfetch.request`. It calls :func:`~urlfetch.get` by default. If one of parameter ``data`` or parameter ``files`` is supplied, :func:`~urlfetch.post` is called. """ data = kwargs.get('data', None) files = kwargs.get('files', {}) if data and isinstance(data, (basestring, dict)) or files: return post(*args, **kwargs) return get(*args, **kwargs)
[docs]def request(url, method="GET", params=None, data=None, headers={}, timeout=None, files={}, randua=False, auth=None, length_limit=None, proxies=None, trust_env=True, max_redirects=0, **kwargs): """request an URL :arg string url: URL to be fetched. :arg string method: (optional) HTTP method, one of ``GET``, ``DELETE``, ``HEAD``, ``OPTIONS``, ``PUT``, ``POST``, ``TRACE``, ``PATCH``. ``GET`` is the default. :arg dict/string params: (optional) Dict or string to attach to url as querystring. :arg dict headers: (optional) HTTP request headers. :arg float timeout: (optional) Timeout in seconds :arg files: (optional) Files to be sended :arg randua: (optional) If ``True`` or ``path string``, use a random user-agent in headers, instead of ``'urlfetch/' + __version__`` :arg tuple auth: (optional) (username, password) for basic authentication :arg int length_limit: (optional) If ``None``, no limits on content length, if the limit reached raised exception 'Content length is more than ...' :arg dict proxies: (optional) HTTP proxy, like {'http': '127.0.0.1:8888', 'https': '127.0.0.1:563'} :arg bool trust_env: (optional) If ``True``, urlfetch will get infomations from env, such as HTTP_PROXY, HTTPS_PROXY :arg int max_redirects: (integer, optional) Max redirects allowed within a request. Default is 0, which means redirects are not allowed. :returns: A :class:`~urlfetch.Response` object :raises: :class:`URLError`, :class:`UrlfetchException`, :class:`TooManyRedirects`, """ def make_connection(conn_type, host, port, timeout): """Return HTTP or HTTPS connection.""" if conn_type == 'http': conn = HTTPConnection(host, port, timeout=timeout) elif conn_type == 'https': conn = HTTPSConnection(host, port, timeout=timeout) else: raise URLError('Unknown Connection Type: %s' % conn_type) return conn via_proxy = False method = method.upper() if method not in ALLOWED_METHODS: raise UrlfetchException("Method should be one of " + ", ".join(ALLOWED_METHODS)) if params: if isinstance(params, dict): url = url_concat(url, params) elif isinstance(params, basestring): if url[-1] not in ('?', '&'): url += '&' if ('?' in url) else '?' url += params parsed_url = parse_url(url) reqheaders = { 'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, compress, identity, *', 'User-Agent': random_useragent(randua), 'Host': parsed_url['http_host'] } # Proxy support scheme = parsed_url['scheme'] if proxies is None and trust_env: proxies = PROXIES proxy = proxies.get(scheme) if proxy and parsed_url['host'] not in PROXY_IGNORE_HOSTS: via_proxy = True if '://' not in proxy: proxy = '%s://%s' % (scheme, proxy) parsed_proxy = parse_url(proxy) # Proxy-Authorization if parsed_proxy['username'] and parsed_proxy['password']: proxyauth = '%s:%s' % (parsed_proxy['username'], parsed_proxy['password']) proxyauth = base64.b64encode(proxyauth.encode('utf-8')) reqheaders['Proxy-Authorization'] = 'Basic ' + \ proxyauth.decode('utf-8') conn = make_connection(scheme, parsed_proxy['host'], parsed_proxy['port'], timeout) else: conn = make_connection(scheme, parsed_url['host'], parsed_url['port'], timeout) if not auth and parsed_url['username'] and parsed_url['password']: auth = (parsed_url['username'], parsed_url['password']) if auth: if isinstance(auth, (list, tuple)): auth = '%s:%s' % tuple(auth) auth = base64.b64encode(auth.encode('utf-8')) reqheaders['Authorization'] = 'Basic ' + auth.decode('utf-8') if files: content_type, data = encode_multipart(data, files) reqheaders['Content-Type'] = content_type elif isinstance(data, dict): data = urlencode(data, 1) if isinstance(data, basestring) and not files: # httplib will set 'Content-Length', also you can set it by yourself reqheaders["Content-Type"] = "application/x-www-form-urlencoded" # what if the method is GET, HEAD or DELETE # just do not make so much decisions for users reqheaders.update(headers) start_time = time.time() try: request_url = url if via_proxy else parsed_url['uri'] conn.request(method, request_url, data, reqheaders) resp = conn.getresponse() except socket.timeout as e: raise Timeout(e) except Exception as e: raise UrlfetchException(e) end_time = time.time() total_time = end_time - start_time history = [] response = Response.from_httplib(resp, reqheaders=reqheaders, length_limit=length_limit, history=history[:], url=url, total_time=total_time, start_time=start_time) while (response.status in (301, 302, 303, 307) and 'location' in response.headers and max_redirects): response.body, response.close(), history.append(response) if len(history) > max_redirects: raise TooManyRedirects('max_redirects exceeded') method = method if response.status == 307 else 'GET' location = response.headers['location'] if location[:2] == '//': url = parsed_url['scheme'] + ':' + location else: url = urlparse.urljoin(url, location) parsed_url = parse_url(url) reqheaders['Host'] = parsed_url['http_host'] reqheaders['Referer'] = response.url # Proxy scheme = parsed_url['scheme'] proxy = proxies.get(scheme) if proxy and parsed_url['host'] not in PROXY_IGNORE_HOSTS: via_proxy = True if '://' not in proxy: proxy = '%s://%s' % (parsed_url['scheme'], proxy) parsed_proxy = parse_url(proxy) # Proxy-Authorization if parsed_proxy['username'] and parsed_proxy['password']: proxyauth = '%s:%s' % (parsed_proxy['username'], parsed_proxy['username']) proxyauth = base64.b64encode(proxyauth.encode('utf-8')) reqheaders['Proxy-Authorization'] = 'Basic ' + \ proxyauth.decode('utf-8') conn = make_connection(scheme, parsed_proxy['host'], parsed_proxy['port'], timeout) else: via_proxy = False reqheaders.pop('Proxy-Authorization', None) conn = make_connection(scheme, parsed_url['host'], parsed_url['port'], timeout) try: request_url = url if via_proxy else parsed_url['uri'] conn.request(method, request_url, data, reqheaders) resp = conn.getresponse() except socket.timeout as e: raise Timeout(e) except Exception as e: raise UrlfetchException(e) response = Response.from_httplib(resp, reqheaders=reqheaders, length_limit=length_limit, history=history[:], url=url, total_time=total_time, start_time=start_time) return response ############################################################################## # Shortcuts and Helpers ####################################################### ##############################################################################
def _partial_method(method): func = partial(request, method=method) func.__doc__ = 'Issue a %s request' % method.lower() func.__name__ = method.lower() func.__module__ = request.__module__ return func get = _partial_method("GET") post = _partial_method("POST") put = _partial_method("PUT") delete = _partial_method("DELETE") head = _partial_method("HEAD") options = _partial_method("OPTIONS") trace = _partial_method("TRACE") patch = _partial_method("PATCH") del _partial_method class ObjectDict(dict): """Makes a dictionary behave like an object.""" def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): self[name] = value
[docs]def parse_url(url): """Return a dictionary of parsed url Including scheme, netloc, path, params, query, fragment, uri, username, password, host, port and http_host """ try: url = unicode(url) except UnicodeDecodeError: pass if py3k: make_utf8 = lambda x: x else: make_utf8 = lambda x: isinstance(x, unicode) and x.encode('utf-8') or x if '://' in url: scheme, url = url.split('://', 1) else: scheme = 'http' url = 'http://' + url parsed = urlparse.urlsplit(url) r = ObjectDict() r['scheme'] = make_utf8(scheme) r['netloc'] = make_utf8(parsed.netloc) r['path'] = make_utf8(parsed.path) r['query'] = make_utf8(parsed.query) r['fragment'] = make_utf8(parsed.fragment) r['uri'] = make_utf8(parsed.path) if parsed.query: r['uri'] += '?' + make_utf8(parsed.query) r['username'] = make_utf8(parsed.username) r['password'] = make_utf8(parsed.password) host = make_utf8(parsed.hostname.encode('idna').decode('utf-8')) r['host'] = r['hostname'] = host try: r['port'] = parsed.port except ValueError: r['port'] = None if r['port']: r['http_host'] = '%s:%d' % (r['host'], r['port']) else: r['http_host'] = r['host'] return r
[docs]def get_proxies_from_environ(): """Get proxies from os.environ.""" proxies = {} http_proxy = os.getenv('http_proxy') or os.getenv('HTTP_PROXY') https_proxy = os.getenv('https_proxy') or os.getenv('HTTPS_PROXY') if http_proxy: proxies['http'] = http_proxy if https_proxy: proxies['https'] = https_proxy return proxies
[docs]def mb_code(s, coding=None, errors='replace'): """encoding/decoding helper.""" if isinstance(s, unicode): return s if coding is None else s.encode(coding, errors=errors) for c in ('utf-8', 'gb2312', 'gbk', 'gb18030', 'big5'): try: s = s.decode(c) return s if coding is None else s.encode(coding, errors=errors) except: pass return unicode(s, errors=errors)
[docs]def random_useragent(filename=None): """Returns a User-Agent string randomly from file. :arg string filename: (Optional) Path to the file from which a random useragent is generated. By default it's ``None``, a file shiped with this module will be used. :returns: A User-Agent string. """ import random if isinstance(filename, basestring): filenames = [filename] else: filenames = [] if filename: filenames.extend([ os.path.join(os.path.abspath(os.path.dirname(__file__)), 'urlfetch.useragents.list'), os.path.join(sys.prefix, 'share', 'urlfetch', 'urlfetch.useragents.list'), ]) for filename in filenames: try: st = os.stat(filename) if stat.S_ISREG(st.st_mode) and os.access(filename, os.R_OK): break except: pass else: return 'urlfetch/%s' % __version__ with open(filename, 'rb') as f: filesize = st.st_size pos = 0 r = random.Random() # try getting a valid line for no more than 3 times for i in range(3): pos += r.randint(0, filesize) pos %= filesize f.seek(pos) # in case we are in middle of a line f.readline() line = f.readline() if not line: if f.tell() == filesize: # end of file f.seek(0) line = f.readline() line = line.strip() if line and line[0] != '#': return line return 'urlfetch/%s' % __version__
[docs]def url_concat(url, args, keep_existing=True): """Concatenate url and argument dictionary >>> url_concat("http://example.com/foo?a=b", dict(c="d")) 'http://example.com/foo?a=b&c=d' :arg string url: URL being concat to. :arg dict args: Args being concat. :arg bool keep_existing: (Optional) Whether to keep the args which are alreay in url, default is ``True``. """ if not args: return url if keep_existing: if url[-1] not in ('?', '&'): url += '&' if ('?' in url) else '?' return url + urlencode(args, 1) else: url, seq, query = url.partition('?') query = urlparse.parse_qs(query, True) query.update(args) return url + '?' + urlencode(query, 1)
[docs]def choose_boundary(): """Generate a multipart boundry. :returns: A boundary string """ global BOUNDARY_PREFIX if BOUNDARY_PREFIX is None: BOUNDARY_PREFIX = "urlfetch" try: uid = repr(os.getuid()) BOUNDARY_PREFIX += "." + uid except AttributeError: pass try: pid = repr(os.getpid()) BOUNDARY_PREFIX += "." + pid except AttributeError: pass return "%s.%s" % (BOUNDARY_PREFIX, uuid.uuid4().hex)
[docs]def encode_multipart(data, files): """Encode multipart. :arg dict data: Data to be encoded :arg dict files: Files to be encoded :returns: Encoded binary string :raises: :class:`UrlfetchException` """ body = BytesIO() boundary = choose_boundary() part_boundary = b('--%s\r\n' % boundary) if isinstance(data, dict): for name, values in data.items(): if not isinstance(values, (list, tuple, set)): # behave like urllib.urlencode(dict, 1) values = (values, ) for value in values: body.write(part_boundary) writer(body).write('Content-Disposition: form-data; ' 'name="%s"\r\n' % name) body.write(b'Content-Type: text/plain\r\n\r\n') if isinstance(value, int): value = str(value) if py3k and isinstance(value, str): writer(body).write(value) else: body.write(value) body.write(b'\r\n') for fieldname, f in files.items(): if isinstance(f, tuple): filename, f = f elif hasattr(f, 'name'): filename = os.path.basename(f.name) else: filename = None raise UrlfetchException("file must has filename") if hasattr(f, 'read'): value = f.read() elif isinstance(f, basestring): value = f else: value = str(f) body.write(part_boundary) if filename: writer(body).write('Content-Disposition: form-data; name="%s"; ' 'filename="%s"\r\n' % (fieldname, filename)) body.write(b'Content-Type: application/octet-stream\r\n\r\n') else: writer(body).write('Content-Disposition: form-data; name="%s"' '\r\n' % name) body.write(b'Content-Type: text/plain\r\n\r\n') if py3k and isinstance(value, str): writer(body).write(value) else: body.write(value) body.write(b'\r\n') body.write(b('--' + boundary + '--\r\n')) content_type = 'multipart/form-data; boundary=%s' % boundary return content_type, body.getvalue() ############################################################################## # Constants and Globals ####################################################### ##############################################################################
ALLOWED_METHODS = ("GET", "DELETE", "HEAD", "OPTIONS", "PUT", "POST", "TRACE", "PATCH") PROXY_IGNORE_HOSTS = ('127.0.0.1', 'localhost') PROXIES = get_proxies_from_environ() writer = codecs.lookup('utf-8')[3] BOUNDARY_PREFIX = None