Source code for transit.writer

## Copyright 2014 Cognitect. All Rights Reserved.
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
##      http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS-IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.

from constants import *
from rolling_cache import RollingCache, is_cacheable
import msgpack
from write_handlers import WriteHandler
from transit_types import TaggedValue
import re

[docs]class Writer(object): """ The top-level object for writing out Python objects and converting them to Transit data. During initialization, you must specify the protocol used for marshalling the data- json or msgpack. You must also specify the io source used for writing (a file descriptor). You may optionally pass in an options dictionary that will be forwarded onto the Marshaler. The cache is enabled by default.""" def __init__(self, io, protocol="json", opts={"cache_enabled": True}): if protocol == "json": self.marshaler = JsonMarshaler(io, opts=opts) elif protocol == "json_verbose": self.marshaler = VerboseJsonMarshaler(io, opts=opts) else: self.marshaler = MsgPackMarshaler(io, opts=opts)
[docs] def write(self, obj): """ Given a Python object, marshal it into Transit data and write it to the 'io' source.""" self.marshaler.marshal_top(obj)
[docs] def register(self, obj_type, handler_class): """ Register custom converters for object types present in your application. This allows you to extend Transit to encode new types. You must specify the obj type to be encoded, and the handler class that should be used by the Marshaler during write-time.""" self.marshaler.register(obj_type, handler_class)
[docs]def flatten_map(m): """ Expand a dictionary's items into a flat list""" # This is the fastest way to do this in Python return [item for t in m.items() for item in t]
[docs]def re_fn(pat): compiled = re.compile(pat) def re_inner_fn(value): return compiled.match(value) return re_inner_fn
is_escapable = re_fn("^" + re.escape(SUB) + "|" + ESC + "|" + RES)
[docs]def escape(s): if is_escapable(s): return ESC+s else: return s
[docs]class Marshaler(object): """ The base Marshaler from which all Marshalers inherit. The Marshaler specifies how to emit Transit data given encodeable Python objects. The end of this process is specialized by other Marshalers to covert the final result into an on-the-wire payload (JSON or MsgPack).""" def __init__(self, opts = {}): self.opts = opts self._init_handlers() def _init_handlers(self): self.handlers = WriteHandler()
[docs] def are_stringable_keys(self, m): """ Test whether the keys within a map are stringable - a simple map, that can be optimized and whose keys can be cached""" for x in m.keys(): if len(self.handlers[x].tag(x)) != 1: return False return True
[docs] def emit_nil(self, _, as_map_key, cache): return self.emit_string(ESC, "_", None, True, cache) if as_map_key else self.emit_object(None)
[docs] def emit_string(self, prefix, tag, string, as_map_key, cache): encoded = cache.encode(str(prefix)+tag+escape(string), as_map_key) # TODO: Remove this optimization for the time being - it breaks the cache #if "cache_enabled" in self.opts and is_cacheable(encoded, as_map_key): # return self.emit_object(cache.value_to_key[encoded], as_map_key) return self.emit_object(encoded, as_map_key)
[docs] def emit_boolean(self, b, as_map_key, cache): return self.emit_string(ESC, "?", b, True, cache) if as_map_key else self.emit_object(b)
[docs] def emit_int(self, i, as_map_key, cache): if as_map_key or i > self.opts["max_int"] or i < self.opts["min_int"]: return self.emit_string(ESC, "i", str(i), as_map_key, cache) else: return self.emit_object(i, as_map_key)
[docs] def emit_double(self, d, as_map_key, cache): return self.emit_string(ESC, "d", d, True, cache) if as_map_key else self.emit_object(d)
[docs] def emit_array(self, a, _, cache): self.emit_array_start(len(a)) map(lambda x: self.marshal(x, False, cache), a) self.emit_array_end()
[docs] def emit_map(self, m, _, cache):# use map as object from above, have to overwrite default parser. self.emit_map_start(len(m)) for k, v in m.items(): self.marshal(k, True, cache) self.marshal(v, False, cache) self.emit_map_end()
[docs] def emit_cmap(self, m, _, cache): self.emit_map_start(1) self.emit_string(ESC, "#", "cmap", True, cache) self.marshal(flatten_map(m), False, cache) self.emit_map_end()
[docs] def emit_tagged(self, tag, rep, _, cache): self.emit_array_start(2) self.emit_object(cache.encode(ESC + "#" + tag, False), False) self.marshal(rep, False, cache) self.emit_array_end()
[docs] def emit_encoded(self, tag, handler, obj, as_map_key, cache): rep = handler.rep(obj) if len(tag) == 1: if isinstance(rep, basestring): self.emit_string(ESC, tag, rep, as_map_key, cache) elif as_map_key or self.opts["prefer_strings"]: rep = handler.string_rep(obj) if isinstance(rep, basestring): self.emit_string(ESC, tag, rep, as_map_key, cache) else: raise AssertionError("Cannot be encoded as string: " + str({"tag": tag, "rep": rep, "obj": obj})) else: self.emit_tagged(tag, rep, False, cache) elif as_map_key: raise AssertionError("Cannot be used as a map key: " + str({"tag": tag, "rep": rep, "obj": obj})) else: self.emit_tagged(tag, rep, False, cache)
[docs] def marshal(self, obj, as_map_key, cache): """ Marshal an individual obj, potentially as part of another container object (like a list/dictionary/etc). Specify if this object is a key to a map/dict, and pass in the current cache being used. This method should only be called by a top-level marshalling call and should not be considered an entry-point for integration.""" handler = self.handlers[obj] tag = handler.tag(obj) rep = handler.string_rep(obj) if as_map_key else handler.rep(obj) f = marshal_dispatch.get(tag) if f: f(self, rep, as_map_key, cache) else: self.emit_encoded(tag, handler, obj, as_map_key, cache)
[docs] def marshal_top(self, obj, cache=None): """ Given a complete object that needs to be marshaled into Transit data, and optionally a cache, dispatch accordingly, and flush the data directly into the IO stream.""" if not cache: cache = RollingCache() handler = self.handlers[obj] tag = handler.tag(obj) if tag: if len(tag) == 1: self.marshal(TaggedValue(QUOTE, obj), False, cache) else: self.marshal(obj, False, cache) self.flush() else: raise AssertionError("Handler must provide a non-nil tag: " + str(handler))
[docs] def dispatch_map(self, rep, as_map_key, cache): """ Used to determine and dipatch the writing of a map - a simple map with strings as keys, or a complex map, whose keys are also compound types.""" if self.are_stringable_keys(rep): return self.emit_map(rep, as_map_key, cache) return self.emit_cmap(rep, as_map_key, cache)
[docs] def register(self, obj_type, handler_class): """ Register custom converters for object types present in your application. This allows you to extend Transit to encode new types. You must specify the obj type to be encoded, and the handler class that should be used by this marshaller.""" self.handlers[obj_type] = handler_class
marshal_dispatch = {"_": Marshaler.emit_nil, "?": Marshaler.emit_boolean, "s": lambda self, rep, as_map_key, cache: Marshaler.emit_string(self, "", "", rep, as_map_key, cache), "i": Marshaler.emit_int, "d": Marshaler.emit_double, "'": lambda self, rep, _, cache: Marshaler.emit_tagged(self, "'", rep, False, cache), "array": Marshaler.emit_array, "map": Marshaler.dispatch_map}
[docs]class MsgPackMarshaler(Marshaler): """ The Marshaler tailor to MsgPack. To use this Marshaler, specify the 'msgpack' protocol when creating a Writer.""" MSGPACK_MAX_INT = pow(2, 63) - 1 MSGPACK_MIN_INT = -pow(2, 63) default_opts = {"prefer_strings": False, "max_int": MSGPACK_MAX_INT, "min_int": MSGPACK_MIN_INT} def __init__(self, io, opts={}): self.io = io self.packer = msgpack.Packer(autoreset=False) nopts = MsgPackMarshaler.default_opts.copy() nopts.update(opts) Marshaler.__init__(self, nopts)
[docs] def emit_array_start(self, size): self.packer.pack_array_header(size)
[docs] def emit_array_end(self): pass
[docs] def emit_map_start(self, size): self.packer.pack_map_header(size)
[docs] def emit_map_end(self): pass
[docs] def emit_object(self, obj, as_map_key=False): self.packer.pack(obj)
[docs] def flush(self): self.io.write(self.packer.bytes()) self.io.flush() self.packer.reset()
REPLACE_RE = re.compile("\"")
[docs]class JsonMarshaler(Marshaler): """ The Marshaler tailor to JSON. To use this Marshaler, specify the 'json' protocol when creating a Writer.""" JSON_MAX_INT = pow(2, 63) JSON_MIN_INT = -pow(2, 63) default_opts = {"prefer_strings": True, "max_int": JSON_MAX_INT, "min_int": JSON_MIN_INT} ## Yes this is basically a custom JSON encoder, ## but I couldn't find an existing solution that worked ## well with the lazy writing method we have in this ## project. def __init__(self, io, opts={}): self.io = io nopts = JsonMarshaler.default_opts.copy() nopts.update(opts) self.started = [True] self.is_key = [None] Marshaler.__init__(self, nopts) self.flush = self.io.flush
[docs] def push_level(self): self.started.append(True) self.is_key.append(None)
[docs] def pop_level(self): self.started.pop() self.is_key.pop()
[docs] def push_map(self): self.started.append(True) self.is_key.append(True)
[docs] def write_sep(self): if self.started[-1]: self.started[-1] = False else: last = self.is_key[-1] if last: self.io.write(":") self.is_key[-1] = False elif last is False: self.io.write(",") self.is_key[-1] = True else: #elif last is None: self.io.write(",")
[docs] def emit_array_start(self, size): self.write_sep() self.io.write("[") self.push_level()
[docs] def emit_array_end(self): self.pop_level() self.io.write("]")
[docs] def emit_map(self, m, _, cache): """ Emits array as per default JSON spec.""" self.emit_array([MAP_AS_ARR] + flatten_map(m), _, cache)
[docs] def emit_map_start(self, size): self.write_sep() self.io.write("{") self.push_map()
[docs] def emit_map_end(self): self.pop_level() self.io.write("}")
[docs] def emit_object(self, obj, as_map_key=False): tp = type(obj) self.write_sep() if tp is str or tp is unicode: self.io.write("\"") self.io.write(obj.replace("\\", "\\\\").replace("\"", "\\\"")) self.io.write("\"") elif tp is int or tp is long or tp is float: self.io.write(str(obj)) elif tp is bool: self.io.write("true" if obj else "false") elif obj is None: self.io.write("null") else: raise AssertionError("Don't know how to encode: " + str(obj))
[docs]class VerboseSettings(object): """ Mixin for JsonMarshaler that adds support for Verbose output/input. Verbosity is only suggest for debuging/inspecting purposes.""" @staticmethod def _verbose_handlers(handlers): for k, v in handlers.iteritems(): if hasattr(v, "verbose_handler"): handlers[k] = v.verbose_handler() return handlers def _init_handlers(self): self.handlers = self._verbose_handlers(WriteHandler())
[docs] def emit_string(self, prefix, tag, string, as_map_key, cache): return self.emit_object(str(prefix) + tag + escape(string), as_map_key)
[docs] def emit_map(self, m, _, cache): self.emit_map_start(len(m)) for k, v in m.items(): self.marshal(k, True, cache) self.marshal(v, False, cache) self.emit_map_end()
[docs] def emit_tagged(self, tag, rep, _, cache): self.emit_map_start(1) self.emit_object(cache.encode(ESC + "#" + tag, True), True) self.marshal(rep, False, cache) self.emit_map_end()
[docs]class VerboseJsonMarshaler(VerboseSettings, JsonMarshaler): """ JsonMarshaler class with VerboseSettings mixin""" pass # All from inheritance and mixin