From a484ea0275f4d451d881b4edb1e1e4ae93be42a7 Mon Sep 17 00:00:00 2001 From: Joshua Haberman Date: Thu, 3 Jun 2010 12:07:07 -0700 Subject: WIP: intrusive changes to upb_decoder. --- src/upb.h | 4 +- src/upb_decoder.c | 171 +++++++++++++++++++++++++----------------------------- src/upb_decoder.h | 14 ++--- src/upb_srcsink.h | 109 +++++++++++++++++----------------- src/upb_string.h | 7 ++- 5 files changed, 147 insertions(+), 158 deletions(-) diff --git a/src/upb.h b/src/upb.h index c65a686..978ee5c 100644 --- a/src/upb.h +++ b/src/upb.h @@ -270,8 +270,8 @@ INLINE void upb_value_write(upb_valueptr ptr, upb_value val, enum upb_status_code { UPB_STATUS_OK = 0, - // The input byte stream ended in the middle of a record. - UPB_STATUS_NEED_MORE_DATA = 1, + // A read or write from a streaming src/sink could not be completed right now. + UPB_STATUS_TRYAGAIN = 1, // An unrecoverable error occurred. UPB_STATUS_ERROR = -1, diff --git a/src/upb_decoder.c b/src/upb_decoder.c index 32b8f16..5cb25a1 100644 --- a/src/upb_decoder.c +++ b/src/upb_decoder.c @@ -221,23 +221,6 @@ const uint8_t *upb_get_v_uint64_t_full(const uint8_t *buf, const uint8_t *end, return buf; } -const uint8_t *upb_decode_wire_value(uint8_t *buf, uint8_t *end, - upb_wire_type_t wt, upb_wire_value *wv, - upb_status *status) -{ - switch(wt) { - case UPB_WIRE_TYPE_VARINT: - return upb_get_v_uint64_t(buf, end, &wv->varint, status); - case UPB_WIRE_TYPE_64BIT: - return upb_get_f_uint64_t(buf, end, &wv->_64bit, status); - case UPB_WIRE_TYPE_32BIT: - return upb_get_f_uint32_t(buf, end, &wv->_32bit, status); - default: - status->code = UPB_STATUS_ERROR; // Doesn't handle delimited, groups. - return end; - } -} - // Advances buf past the current wire value (of type wt), saving the result in // outbuf. static const uint8_t *skip_wire_value(const uint8_t *buf, const uint8_t *end, @@ -293,7 +276,7 @@ static const uint8_t *upb_decode_value(const uint8_t *buf, const uint8_t *end, typedef struct { upb_msgdef *msgdef; upb_fielddef *field; - size_t end_offset; // For groups, 0. + int32_t end_offset; // For groups, 0. } upb_decoder_frame; struct upb_decoder { @@ -304,7 +287,15 @@ struct upb_decoder { // State pertaining to a particular decode (resettable). // Stack entries store the offset where the submsg ends (for groups, 0). upb_decoder_frame stack[UPB_MAX_NESTING], *top, *limit; - size_t completed_offset; + + // The current buffer. + upb_string *buf; + + // The overall stream offset of the beginning of this buffer. + uint32_t stream_offset; + + // The current offset in this buffer. + uint32_t buffer_offset; }; upb_decoder *upb_decoder_new(upb_msgdef *msgdef) @@ -331,16 +322,6 @@ void upb_decoder_reset(upb_decoder *d, upb_sink *sink) d->top->end_offset = 0; } -// Parses a tag, places the result in *tag. -upb_key upb_decoder_src_getkey(upb_decoder *d) -{ - upb_key key; - upb_fill_buffer(d); - d-> - const uint8_t *ret = upb_get_v_uint32_t(buf, end, &tag_int, status); - return ret; -} - static const void *get_msgend(upb_decoder *d, const uint8_t *start) @@ -395,9 +376,78 @@ static const void *pop(upb_decoder *d, const uint8_t *start, upb_status *status) return get_msgend(d, start); } +// Parses a tag, places the result in *tag. +upb_fielddef *upb_decoder_src_getdef(upb_decoder *d) +{ + uint32_t key; + upb_fill_buffer(d); + d->buf = upb_get_v_uint32_t(d->buf, d->end, &key, &d->status); + if (!upb_ok(status)) return NULL; + if(upb_wiretype_from_key(key) == UPB_WIRE_TYPE_END_GROUP) { + if(!isgroup(d->submsg_end)) { + upb_seterr(d->status, UPB_STATUS_ERROR, "End group seen but current " + "message is not a group, byte offset: %zd", + d->completed_offset + (completed - start)); + return NULL; + } + submsg_end = pop(d, start, status); + msgdef = d->top->msgdef; + completed = buf; + return NULL; + } + // Look up field by tag number. + return upb_msg_itof(d->top->msgdef, upb_fieldnum_from_key(key)); +} -size_t upb_decoder_decode(upb_decoder *d, upb_strptr str, upb_status *status) +bool upb_decoder_src_getval(upb_src *src, upb_valueptr val) { + if(upb_wiretype_from_key(d->key) == UPB_WIRE_TYPE_DELIMITED) { + int32_t delim_len; + d->buf = upb_get_INT32(d->buf, d->end, &delim_len, &d->status); + CHECK_STATUS(); // Checking decode_tag() and upb_get_INT32(). + int32_t needed = + const uint8_t *delim_end = buf + delim_len; + if(f->type == UPB_TYPE(MESSAGE)) { + submsg_end = push(d, start, delim_end - start, f, status); + msgdef = d->top->msgdef; + } else { + if(f && upb_isstringtype(f->type)) { + int32_t str_start = buf - start; + uint32_t len = str_start + delim_len; + sink_status = upb_sink_onstr(d->sink, f, str, str_start, len, status); + } // else { TODO: packed arrays } + // If field was not found, it is skipped silently. + buf = delim_end; // Could be >end. + } + } else { + if(!upb_check_type(tag.wire_type, f->type)) { + buf = skip_wire_value(buf, end, tag.wire_type, status); + } else if (f->type == UPB_TYPE(GROUP)) { + submsg_end = push(d, start, 0, f, status); + msgdef = d->top->msgdef; + } else { + upb_value val; + buf = upb_decode_value(buf, end, f->type, upb_value_addrof(&val), + status); + CHECK_STATUS(); // Checking upb_decode_value(). + sink_status = upb_sink_onvalue(d->sink, f, val, status); + } + } + CHECK_STATUS(); + + while(buf >= submsg_end) { + if(buf > submsg_end) { + upb_seterr(status, UPB_STATUS_ERROR, "Expected submsg end offset " + "did not lie on a tag/value boundary."); + goto err; + } + submsg_end = pop(d, start, status); + msgdef = d->top->msgdef; + } + // while(buf < d->packed_end) { TODO: packed arrays } + completed = buf; +} + // buf is our current offset, moves from start to end. const uint8_t *buf = (uint8_t*)upb_string_getrobuf(str); const uint8_t *const start = buf; // ptr equivalent of d->completed_offset @@ -421,68 +471,7 @@ size_t upb_decoder_decode(upb_decoder *d, upb_strptr str, upb_status *status) // Parse/handle tag. upb_tag tag; buf = decode_tag(buf, end, &tag, status); - if(tag.wire_type == UPB_WIRE_TYPE_END_GROUP) { - CHECK_STATUS(); - if(!isgroup(submsg_end)) { - upb_seterr(status, UPB_STATUS_ERROR, "End group seen but current " - "message is not a group, byte offset: %zd", - d->completed_offset + (completed - start)); - goto err; - } - submsg_end = pop(d, start, status); - msgdef = d->top->msgdef; - completed = buf; - continue; - } - - // Look up field by tag number. - upb_fielddef *f = upb_msg_itof(msgdef, tag.field_number); - // Parse/handle field. - if(tag.wire_type == UPB_WIRE_TYPE_DELIMITED) { - int32_t delim_len; - buf = upb_get_INT32(buf, end, &delim_len, status); - CHECK_STATUS(); // Checking decode_tag() and upb_get_INT32(). - const uint8_t *delim_end = buf + delim_len; - if(f && f->type == UPB_TYPE(MESSAGE)) { - submsg_end = push(d, start, delim_end - start, f, status); - msgdef = d->top->msgdef; - } else { - if(f && upb_isstringtype(f->type)) { - int32_t str_start = buf - start; - uint32_t len = str_start + delim_len; - sink_status = upb_sink_onstr(d->sink, f, str, str_start, len, status); - } // else { TODO: packed arrays } - // If field was not found, it is skipped silently. - buf = delim_end; // Could be >end. - } - } else { - if(!f || !upb_check_type(tag.wire_type, f->type)) { - buf = skip_wire_value(buf, end, tag.wire_type, status); - } else if (f->type == UPB_TYPE(GROUP)) { - submsg_end = push(d, start, 0, f, status); - msgdef = d->top->msgdef; - } else { - upb_value val; - buf = upb_decode_value(buf, end, f->type, upb_value_addrof(&val), - status); - CHECK_STATUS(); // Checking upb_decode_value(). - sink_status = upb_sink_onvalue(d->sink, f, val, status); - } - } - CHECK_STATUS(); - - while(buf >= submsg_end) { - if(buf > submsg_end) { - upb_seterr(status, UPB_STATUS_ERROR, "Expected submsg end offset " - "did not lie on a tag/value boundary."); - goto err; - } - submsg_end = pop(d, start, status); - msgdef = d->top->msgdef; - } - // while(buf < d->packed_end) { TODO: packed arrays } - completed = buf; } size_t read; diff --git a/src/upb_decoder.h b/src/upb_decoder.h index b84c149..ea20d3d 100644 --- a/src/upb_decoder.h +++ b/src/upb_decoder.h @@ -39,15 +39,11 @@ void upb_decoder_free(upb_decoder *p); // state where it has not seen any data, and expects the next data to be from // the beginning of a new protobuf. Parsers must be reset before they can be // used. A decoder can be reset multiple times. -void upb_decoder_reset(upb_decoder *p, upb_sink *sink); - -// Decodes protobuf data out of str, returning how much data was decoded. The -// next call to upb_decoder_decode should begin with the first byte that was -// not decoded. "status" indicates whether an error occurred. -// -// TODO: provide the following guarantee: -// retval will always be >= len. -size_t upb_decoder_decode(upb_decoder *p, upb_strptr str, upb_status *status); +void upb_decoder_reset(upb_decoder *p, upb_bytesrc *bytesrc); + +// Returns a upb_src pointer by which the decoder can be used. The returned +// upb_src is invalidated by upb_decoder_reset(). +upb_src *upb_decoder_getsrc(upb_decoder *p); #ifdef __cplusplus } /* extern "C" */ diff --git a/src/upb_srcsink.h b/src/upb_srcsink.h index 6dd11d1..97b9885 100644 --- a/src/upb_srcsink.h +++ b/src/upb_srcsink.h @@ -26,6 +26,8 @@ extern "C" { /* upb_src ********************************************************************/ +// TODO: decide how to handle unknown fields. + // Retrieves the fielddef for the next field in the stream. Returns NULL on // error or end-of-stream. upb_fielddef *upb_src_getdef(upb_src *src); @@ -38,7 +40,8 @@ bool upb_src_getval(upb_src *src, upb_valueptr val); // Like upb_src_getval() but skips the value. bool upb_src_skipval(upb_src *src); -// Descends into a submessage. +// Descends into a submessage. May only be called after a def has been +// returned that indicates a submessage. bool upb_src_startmsg(upb_src *src); // Stops reading a submessage. May be called before the stream is EOF, in @@ -88,61 +91,61 @@ int32_t upb_bytesink_put(upb_bytesink *sink, upb_string *str); // Returns the current error status for the stream. upb_status *upb_bytesink_status(upb_bytesink *sink); -/* upb_sink implementation ****************************************************/ - -typedef struct upb_sink_callbacks { - upb_value_cb value_cb; - upb_str_cb str_cb; - upb_start_cb start_cb; - upb_end_cb end_cb; -} upb_sink_callbacks; - -// These macros implement a mini virtual function dispatch for upb_sink instances. -// This allows functions that call upb_sinks to just write: -// -// upb_sink_onvalue(sink, field, val); -// -// The macro will handle the virtual function lookup and dispatch. We could -// potentially define these later to also be capable of calling a C++ virtual -// method instead of doing the virtual dispatch manually. This would make it -// possible to write C++ sinks in a more natural style without loss of -// efficiency. We could have a flag in upb_sink defining whether it is a C -// sink or a C++ one. -#define upb_sink_onvalue(s, f, val, status) s->vtbl->value_cb(s, f, val, status) -#define upb_sink_onstr(s, f, str, start, end, status) s->vtbl->str_cb(s, f, str, start, end, status) -#define upb_sink_onstart(s, f, status) s->vtbl->start_cb(s, f, status) -#define upb_sink_onend(s, f, status) s->vtbl->end_cb(s, f, status) - -// Initializes a plain C visitor with the given vtbl. The sink must have been -// allocated separately. -INLINE void upb_sink_init(upb_sink *s, upb_sink_callbacks *vtbl) { +/* Dynamic Dispatch implementation for src/sink interfaces ********************/ + +// The rest of this file only concerns components that are implementing any of +// the above interfaces. To simple clients the code below should be considered +// private. + +// Typedefs for function pointers to all of the above functions. +typedef upb_fielddef (*upb_src_getdef_fptr)(upb_src *src); +typedef bool (*upb_src_getval_fptr)(upb_src *src, upb_valueptr val); +typedef bool (*upb_src_skipval_fptr)(upb_src *src); +typedef bool (*upb_src_startmsg_fptr)(upb_src *src); +typedef bool (*upb_src_endmsg_fptr)(upb_src *src); +typedef upb_status *(*upb_src_status_fptr)(upb_src *src); + +typedef bool (*upb_sink_putdef_fptr)(upb_sink *sink, upb_fielddef *def); +typedef bool (*upb_sink_putval_fptr)(upb_sink *sink, upb_value val); +typedef bool (*upb_sink_startmsg_fptr)(upb_sink *sink); +typedef bool (*upb_sink_endmsg_fptr)(upb_sink *sink); +typedef upb_status *(*upb_sink_status_fptr)(upb_sink *sink); + +typedef upb_string *(*upb_bytesrc_get_fptr)(upb_bytesrc *src); +typedef bool (*upb_bytesrc_append_fptr)( + upb_bytesrc *src, upb_string *str, upb_strlen_t len); +typedef upb_status *(*upb_bytesrc_status_fptr)(upb_src *src); + +typedef int32_t (*upb_bytesink_put_fptr)(upb_bytesink *sink, upb_string *str); +typedef upb_status *(*upb_bytesink_status_fptr)(upb_bytesink *sink); + +// Vtables for the above interfaces. +typedef struct { + upb_src_getdef_fptr getdef; + upb_src_getval_fptr getval; + upb_src_skipval_fptr skipval; + upb_src_startmsg_fptr startmsg; + upb_src_endmsg_fptr endmsg; + upb_src_status_fptr status; +} upb_src_vtable; + +// "Base Class" definitions; components that implement these interfaces should +// contain one of these structures. + +typedef struct { + upb_src_vtable *vtbl; +#ifndef NDEBUG + int state; // For debug-mode checking of API usage. +#endif +} upb_src; + +INLINE void upb_sink_init(upb_src *s, upb_src_vtable *vtbl) { s->vtbl = vtbl; +#ifndef DEBUG + // TODO: initialize debug-mode checking. +#endif } - -/* upb_bytesink ***************************************************************/ - -// A upb_bytesink is like a upb_sync, but for bytes instead of structured -// protobuf data. Parsers implement upb_bytesink and push to a upb_sink, -// serializers do the opposite (implement upb_sink and push to upb_bytesink). -// -// The two simplest kinds of sinks are "write to string" and "write to FILE*". - -// A forward declaration solely for the benefit of declaring upb_byte_cb below. -// Always prefer upb_bytesink (without the "struct" keyword) instead. -struct _upb_bytesink; - -// The single bytesink callback; it takes the bytes to be written and returns -// how many were successfully written. If the return value is <0, the caller -// should stop processing. -typedef int32_t (*upb_byte_cb)(struct _upb_bytesink *s, upb_strptr str, - uint32_t start, uint32_t end, - upb_status *status); - -typedef struct _upb_bytesink { - upb_byte_cb *cb; -} upb_bytesink; - #ifdef __cplusplus } /* extern "C" */ #endif diff --git a/src/upb_string.h b/src/upb_string.h index 0516377..c0d14d5 100644 --- a/src/upb_string.h +++ b/src/upb_string.h @@ -64,9 +64,10 @@ INLINE upb_strlen_t upb_string_len(upb_string *str) { return str->len; } // Use to read the bytes of the string. The caller *must* call // upb_string_endread() after the data has been read. The window between -// upb_string_getrobuf() and upb_string_endread() should be kept as short -// as possible. No other functions may be called on the string during this -// window except upb_string_len(). +// upb_string_getrobuf() and upb_string_endread() should be kept as short as +// possible, because any pending upb_string_detach() may be blocked until +// upb_string_endread is called(). No other functions may be called on the +// string during this window except upb_string_len(). INLINE const char *upb_string_getrobuf(upb_string *str) { return str->ptr; } INLINE void upb_string_endread(upb_string *str); -- cgit v1.2.3