From b471ca6b81b88dc23aae6a53345d94d9a2714a7c Mon Sep 17 00:00:00 2001 From: Joshua Haberman Date: Mon, 6 Dec 2010 15:52:40 -0800 Subject: The last major revision to the upb_stream protocol. Sources and sinks communicate by means of a upb_handlers object, which encapsulates a set of handler callbacks and will possibly offer richer semantics in the future like giving specific fields different callbacks. The upb_handlers protocol supports delegation, so sets of handlers can be written in reusable ways. For example, if a set of handlers is written to handle a specific .proto type, those handlers can be used whether that type is at the top level or whether it is a sub-message of a higher-level type. Delegation allows the streaming protocol to properly compose. --- core/upb_stream.c | 55 ------------ core/upb_stream.h | 167 +++++++++++++++++++++++------------ core/upb_stream_vtbl.h | 235 +++++++++++++++++++++---------------------------- core/upb_string.c | 9 ++ core/upb_string.h | 7 +- 5 files changed, 224 insertions(+), 249 deletions(-) delete mode 100644 core/upb_stream.c (limited to 'core') diff --git a/core/upb_stream.c b/core/upb_stream.c deleted file mode 100644 index 0d47392..0000000 --- a/core/upb_stream.c +++ /dev/null @@ -1,55 +0,0 @@ -/* - * upb - a minimalist implementation of protocol buffers. - * - * Copyright (c) 2010 Joshua Haberman. See LICENSE for details. - */ - -#include "upb_stream.h" - -#include "upb_def.h" - -#define CHECKSRC(x) if(!x) goto src_err -#define CHECKSINK(x) if(!x) goto sink_err - -void upb_streamdata(upb_src *src, upb_sink *sink, upb_status *status) { - upb_fielddef *f; - upb_string *str = NULL; - int depth = 0; - while(1) { - while((f = upb_src_getdef(src)) != NULL) { - CHECKSINK(upb_sink_putdef(sink, f)); - if(upb_issubmsg(f)) { - upb_src_startmsg(src); - upb_sink_startmsg(sink); - ++depth; - } else if(upb_isstring(f)) { - str = upb_string_tryrecycle(str); - CHECKSRC(upb_src_getstr(src, str)); - CHECKSINK(upb_sink_putstr(sink, str)); - } else { - // Primitive type. - upb_value val; - CHECKSRC(upb_src_getval(src, upb_value_addrof(&val))); - CHECKSINK(upb_sink_putval(sink, val)); - } - } - // If we're not EOF now, the loop terminated due to an error. - CHECKSRC(upb_src_eof(src)); - if (depth == 0) break; - --depth; - upb_src_endmsg(src); - upb_sink_endmsg(sink); - } - upb_string_unref(str); - return; - -src_err: - upb_string_unref(str); - upb_copyerr(status, upb_src_status(src)); - return; - -sink_err: - upb_string_unref(str); - upb_copyerr(status, upb_sink_status(sink)); - return; -} diff --git a/core/upb_stream.h b/core/upb_stream.h index cd00c1e..1eb111e 100644 --- a/core/upb_stream.h +++ b/core/upb_stream.h @@ -19,7 +19,7 @@ #ifndef UPB_SRCSINK_H #define UPB_SRCSINK_H -#include "upb_stream_vtbl.h" +#include "upb.h" #ifdef __cplusplus extern "C" { @@ -28,98 +28,149 @@ extern "C" { // Forward-declare. We can't include upb_def.h; it would be circular. struct _upb_fielddef; -/* upb_sink *******************************************************************/ +/* upb_handlers ***************************************************************/ -// A upb_sink is a component that receives a stream of protobuf data. -// It is an abstract interface that is implemented either by the system or -// by users. -// -// TODO: unknown fields. +// upb_handlers define the interface by which a upb_src passes data to a +// upb_sink. -// Constants that a sink returns to indicate to its caller whether it should +// Constants that a handler returns to indicate to its caller whether it should // continue or not. typedef enum { // Caller should continue sending values to the sink. - UPB_SINK_CONTINUE, + UPB_CONTINUE, - // Return from upb_sink_putdef() to skip the next value (which may be a - // submessage). - UPB_SINK_SKIP, + // Skips to the end of the current submessage (or if we are at the top + // level, skips to the end of the entire message). + UPB_SKIP, // Caller should stop sending values; check sink status for details. // If processing resumes later, it should resume with the next value. - UPB_SINK_STOP, -} upb_sinkret_t; - -// Puts the given fielddef into the stream. -upb_sinkret_t upb_sink_putdef(upb_sink *sink, struct _upb_fielddef *def); - -// Puts the given value into the stream. -upb_sinkret_t upb_sink_putval(upb_sink *sink, upb_value val); -upb_sinkret_t upb_sink_putstr(upb_sink *sink, upb_string *str); - -// Starts/ends a submessage. upb_sink_startmsg may seem redundant, but a -// client could have a submessage already serialized, and therefore put it -// as a string instead of its individual elements. -upb_sinkret_t upb_sink_startmsg(upb_sink *sink); -upb_sinkret_t upb_sink_endmsg(upb_sink *sink); - -// Returns the current error status for the stream. -upb_status *upb_sink_status(upb_sink *sink); - - -/* upb_src ********************************************************************/ - -// A upb_src is a resumable push parser for protobuf data. It works by first -// accepting registration of a upb_sink to which it will push data, then -// in a second phase is parses the actual data. + UPB_STOP, + + // When returned from a startsubmsg handler, indicates that the submessage + // should be handled by a different set of handlers, which have been + // registered on the provided upb_handlers object. May not be returned + // from any other callback. + UPB_DELEGATE, +} upb_flow_t; + +// upb_handlers +struct _upb_handlers; +typedef struct _upb_handlers upb_handlers; + +typedef void (*upb_startmsg_handler_t)(void *closure); +typedef void (*upb_endmsg_handler_t)(void *closure); +typedef upb_flow_t (*upb_value_handler_t)(void *closure, + struct _upb_fielddef *f, + upb_value val); +typedef upb_flow_t (*upb_startsubmsg_handler_t)(void *closure, + struct _upb_fielddef *f, + upb_handlers *delegate_to); +typedef upb_flow_t (*upb_endsubmsg_handler_t)(void *closure); +typedef upb_flow_t (*upb_unknownval_handler_t)(void *closure, + upb_field_number_t fieldnum, + upb_value val); + +// An empty set of handlers, for convenient copy/paste: // - -// Sets the given sink as the target of this src. It will be called when the -// upb_src_parse() is run. -void upb_src_setsink(upb_src *src, upb_sink *sink); - -// Pushes data from this src to the previously registered sink, returning -// true if all data was processed. If false is returned, check -// upb_src_status() for details; if it is a resumable status, upb_src_run -// may be called again to resume processing. -bool upb_src_run(upb_src *src); +// static void startmsg(void *closure) { +// // Called when the top-level message begins. +// } +// +// static void endmsg(void *closure) { +// // Called when the top-level message ends. +// } +// +// static upb_flow_t value(void *closure, upb_fielddef *f, upb_value val) { +// // Called for every value in the stream. +// return UPB_CONTINUE; +// } +// +// static upb_flow_t startsubmsg(void *closure, upb_fielddef *f, +// upb_handlers *delegate_to) { +// // Called when a submessage begins; can delegate by returning UPB_DELEGATE. +// return UPB_CONTINUE; +// } +// +// static upb_flow_t endsubmsg(void *closure) { +// // Called when a submessage ends. +// return UPB_CONTINUE; +// } +// +// static upb_flow_t unknownval(void *closure, upb_field_number_t fieldnum, +// upb_value val) { +// Called with an unknown value is encountered. +// return UPB_CONTINUE; +// } +typedef struct { + upb_startmsg_handler_t startmsg; + upb_endmsg_handler_t endmsg; + upb_value_handler_t value; + upb_startsubmsg_handler_t startsubmsg; + upb_endsubmsg_handler_t endsubmsg; + upb_unknownval_handler_t unknownval; +} upb_handlerset; + +// Functions to register handlers on a upb_handlers object. +INLINE void upb_handlers_init(upb_handlers *h); +INLINE void upb_handlers_uninit(upb_handlers *h); +INLINE void upb_handlers_reset(upb_handlers *h); +INLINE bool upb_handlers_isempty(upb_handlers *h); +INLINE void upb_register_handlerset(upb_handlers *h, upb_handlerset *set); +INLINE void upb_set_handler_closure(upb_handlers *h, void *closure); + +// An object that transparently handles delegation so that the caller needs +// only follow the protocol as if delegation did not exist. +struct _upb_dispatcher; +typedef struct _upb_dispatcher upb_dispatcher; +INLINE void upb_dispatcher_init(upb_dispatcher *d); +INLINE void upb_dispatcher_reset(upb_dispatcher *d, upb_handlers *h); +INLINE void upb_dispatch_startmsg(upb_dispatcher *d); +INLINE void upb_dispatch_endmsg(upb_dispatcher *d); +INLINE upb_flow_t upb_dispatch_startsubmsg(upb_dispatcher *d, struct _upb_fielddef *f); +INLINE upb_flow_t upb_dispatch_endsubmsg(upb_dispatcher *d); +INLINE upb_flow_t upb_dispatch_value(upb_dispatcher *d, struct _upb_fielddef *f, + upb_value val); +INLINE upb_flow_t upb_dispatch_unknownval(upb_dispatcher *d, + upb_field_number_t fieldnum, upb_value val); /* upb_bytesrc ****************************************************************/ +struct _upb_bytesrc; +typedef struct _upb_bytesrc upb_bytesrc; + // Returns the next string in the stream. false is returned on error or eof. // The string must be at least "minlen" bytes long unless the stream is eof. -bool upb_bytesrc_get(upb_bytesrc *src, upb_string *str, upb_strlen_t minlen); +INLINE bool upb_bytesrc_get(upb_bytesrc *src, upb_string *str, upb_strlen_t minlen); // Appends the next "len" bytes in the stream in-place to "str". This should // be used when the caller needs to build a contiguous string of the existing // data in "str" with more data. The call fails if fewer than len bytes are // available in the stream. -bool upb_bytesrc_append(upb_bytesrc *src, upb_string *str, upb_strlen_t len); +INLINE bool upb_bytesrc_append(upb_bytesrc *src, upb_string *str, upb_strlen_t len); // Returns the current error status for the stream. // Note! The "eof" flag works like feof() in C; it cannot report end-of-file // until a read has failed due to eof. It cannot preemptively tell you that // the next call will fail due to eof. Since these are the semantics that C // and UNIX provide, we're stuck with them if we want to support eg. stdio. -INLINE upb_status *upb_bytesrc_status(upb_bytesrc *src) { return &src->status; } -INLINE bool upb_bytesrc_eof(upb_bytesrc *src) { return src->eof; } +INLINE upb_status *upb_bytesrc_status(upb_bytesrc *src); +INLINE bool upb_bytesrc_eof(upb_bytesrc *src); /* upb_bytesink ***************************************************************/ +struct _upb_bytesink; +typedef struct _upb_bytesink upb_bytesink; + // Puts the given string. Returns the number of bytes that were actually, // consumed, which may be fewer than were in the string, or <0 on error. -int32_t upb_bytesink_put(upb_bytesink *sink, upb_string *str); +INLINE int32_t upb_bytesink_put(upb_bytesink *sink, upb_string *str); // Returns the current error status for the stream. -upb_status *upb_bytesink_status(upb_bytesink *sink); - -/* Utility functions **********************************************************/ - -// Streams data from src to sink until EOF or error. -void upb_streamdata(upb_src *src, upb_sink *sink, upb_status *status); +INLINE upb_status *upb_bytesink_status(upb_bytesink *sink); +#include "upb_stream_vtbl.h" #ifdef __cplusplus } /* extern "C" */ diff --git a/core/upb_stream_vtbl.h b/core/upb_stream_vtbl.h index 96f6cfe..91464a7 100644 --- a/core/upb_stream_vtbl.h +++ b/core/upb_stream_vtbl.h @@ -5,59 +5,21 @@ * interfaces. Only components that are implementing these interfaces need * to worry about this file. * - * This is tedious; this is the place in upb where I most wish I had a C++ - * feature. In C++ the compiler would generate this all for me. If there's - * any consolation, it's that I have a bit of flexibility you don't have in - * C++: I could, with preprocessor magic alone "de-virtualize" this interface - * for a particular source file. Say I had a C file that called a upb_src, - * but didn't want to pay the virtual function overhead. I could define: - * - * #define upb_src_getdef(src) upb_decoder_getdef((upb_decoder*)src) - * #define upb_src_stargmsg(src) upb_decoder_startmsg(upb_decoder*)src) - * // etc. - * - * The source file is compatible with the regular upb_src interface, but here - * we bind it to a particular upb_src (upb_decoder), which could lead to - * improved performance at a loss of flexibility for this one upb_src client. - * * Copyright (c) 2010 Joshua Haberman. See LICENSE for details. */ #ifndef UPB_SRCSINK_VTBL_H_ #define UPB_SRCSINK_VTBL_H_ -#include "upb.h" +#include +#include "upb_stream.h" #ifdef __cplusplus extern "C" { #endif -struct upb_src; -typedef struct upb_src upb_src; -struct upb_sink; -typedef struct upb_sink upb_sink; -struct upb_bytesrc; -typedef struct upb_bytesrc upb_bytesrc; -struct upb_bytesink; -typedef struct upb_bytesink upb_bytesink; - // Typedefs for function pointers to all of the virtual functions. -// upb_src. -typedef struct _upb_fielddef *(*upb_src_getdef_fptr)(upb_src *src); -typedef bool (*upb_src_getval_fptr)(upb_src *src, upb_valueptr val); -typedef bool (*upb_src_getstr_fptr)(upb_src *src, upb_string *str); -typedef bool (*upb_src_skipval_fptr)(upb_src *src); -typedef bool (*upb_src_startmsg_fptr)(upb_src *src); -typedef bool (*upb_src_endmsg_fptr)(upb_src *src); - -// upb_sink. -typedef bool (*upb_sink_putdef_fptr)(upb_sink *sink, struct _upb_fielddef *def); -typedef bool (*upb_sink_putval_fptr)(upb_sink *sink, upb_value val); -typedef bool (*upb_sink_putstr_fptr)(upb_sink *sink, upb_string *str); -typedef bool (*upb_sink_startmsg_fptr)(upb_sink *sink); -typedef bool (*upb_sink_endmsg_fptr)(upb_sink *sink); - // upb_bytesrc. typedef bool (*upb_bytesrc_get_fptr)( upb_bytesrc *src, upb_string *str, upb_strlen_t minlen); @@ -68,23 +30,6 @@ typedef bool (*upb_bytesrc_append_fptr)( typedef int32_t (*upb_bytesink_put_fptr)(upb_bytesink *sink, upb_string *str); // Vtables for the above interfaces. -typedef struct { - upb_src_getdef_fptr getdef; - upb_src_getval_fptr getval; - upb_src_getstr_fptr getstr; - upb_src_skipval_fptr skipval; - upb_src_startmsg_fptr startmsg; - upb_src_endmsg_fptr endmsg; -} upb_src_vtable; - -typedef struct { - upb_sink_putdef_fptr putdef; - upb_sink_putval_fptr putval; - upb_sink_putstr_fptr putstr; - upb_sink_startmsg_fptr startmsg; - upb_sink_endmsg_fptr endmsg; -} upb_sink_vtable; - typedef struct { upb_bytesrc_get_fptr get; upb_bytesrc_append_fptr append; @@ -97,42 +42,18 @@ typedef struct { // "Base Class" definitions; components that implement these interfaces should // contain one of these structures. -struct upb_src { - upb_src_vtable *vtbl; - upb_status status; - bool eof; -}; - -struct upb_sink { - upb_sink_vtable *vtbl; - upb_status status; - bool eof; -}; - -struct upb_bytesrc { +struct _upb_bytesrc { upb_bytesrc_vtable *vtbl; upb_status status; bool eof; }; -struct upb_bytesink { +struct _upb_bytesink { upb_bytesink_vtable *vtbl; upb_status status; bool eof; }; -INLINE void upb_src_init(upb_src *s, upb_src_vtable *vtbl) { - s->vtbl = vtbl; - s->eof = false; - upb_status_init(&s->status); -} - -INLINE void upb_sink_init(upb_sink *s, upb_sink_vtable *vtbl) { - s->vtbl = vtbl; - s->eof = false; - upb_status_init(&s->status); -} - INLINE void upb_bytesrc_init(upb_bytesrc *s, upb_bytesrc_vtable *vtbl) { s->vtbl = vtbl; s->eof = false; @@ -146,46 +67,6 @@ INLINE void upb_bytesink_init(upb_bytesink *s, upb_bytesink_vtable *vtbl) { } // Implementation of virtual function dispatch. -INLINE struct _upb_fielddef *upb_src_getdef(upb_src *src) { - return src->vtbl->getdef(src); -} -INLINE bool upb_src_getval(upb_src *src, upb_valueptr val) { - return src->vtbl->getval(src, val); -} -INLINE bool upb_src_getstr(upb_src *src, upb_string *str) { - return src->vtbl->getstr(src, str); -} -INLINE bool upb_src_skipval(upb_src *src) { return src->vtbl->skipval(src); } -INLINE bool upb_src_startmsg(upb_src *src) { return src->vtbl->startmsg(src); } -INLINE bool upb_src_endmsg(upb_src *src) { return src->vtbl->endmsg(src); } - -// Implementation of type-specific upb_src accessors. If we encounter a upb_src -// where these can be implemented directly in a measurably more efficient way, -// we can make these part of the vtable also. -// -// For <64-bit types we have to use a temporary to accommodate baredecoder, -// which does not know the actual width of the type. -INLINE bool upb_src_getbool(upb_src *src, bool *_bool) { - upb_value val; - bool ret = upb_src_getval(src, upb_value_addrof(&val)); - *_bool = val._bool; - return ret; -} - -INLINE bool upb_src_getint32(upb_src *src, int32_t *i32) { - upb_value val; - bool ret = upb_src_getval(src, upb_value_addrof(&val)); - *i32 = val.int32; - return ret; -} - -// TODO. -bool upb_src_getint32(upb_src *src, int32_t *val); -bool upb_src_getint64(upb_src *src, int64_t *val); -bool upb_src_getuint32(upb_src *src, uint32_t *val); -bool upb_src_getuint64(upb_src *src, uint64_t *val); -bool upb_src_getfloat(upb_src *src, float *val); -bool upb_src_getdouble(upb_src *src, double *val); // upb_bytesrc INLINE bool upb_bytesrc_get( @@ -198,24 +79,108 @@ INLINE bool upb_bytesrc_append( return bytesrc->vtbl->append(bytesrc, str, len); } -// upb_sink -INLINE bool upb_sink_putdef(upb_sink *sink, struct _upb_fielddef *def) { - return sink->vtbl->putdef(sink, def); +INLINE upb_status *upb_bytesrc_status(upb_bytesrc *src) { return &src->status; } +INLINE bool upb_bytesrc_eof(upb_bytesrc *src) { return src->eof; } + +// upb_handlers +struct _upb_handlers { + upb_handlerset *set; + void *closure; +}; + +INLINE void upb_handlers_init(upb_handlers *h) { + (void)h; +} +INLINE void upb_handlers_uninit(upb_handlers *h) { + (void)h; +} + +INLINE void upb_handlers_reset(upb_handlers *h) { + h->set = NULL; + h->closure = NULL; +} + +INLINE bool upb_handlers_isempty(upb_handlers *h) { + return !h->set && !h->closure; +} + +INLINE void upb_register_handlerset(upb_handlers *h, upb_handlerset *set) { + h->set = set; +} + +INLINE void upb_set_handler_closure(upb_handlers *h, void *closure) { + h->closure = closure; +} + +// upb_dispatcher +typedef struct { + upb_handlers handlers; + int depth; +} upb_dispatcher_frame; + +struct _upb_dispatcher { + upb_dispatcher_frame stack[UPB_MAX_NESTING], *top, *limit; +}; + +INLINE void upb_dispatcher_init(upb_dispatcher *d) { + d->limit = d->stack + sizeof(d->stack); } -INLINE bool upb_sink_putval(upb_sink *sink, upb_value val) { - return sink->vtbl->putval(sink, val); + +INLINE void upb_dispatcher_reset(upb_dispatcher *d, upb_handlers *h) { + d->top = d->stack; + d->top->depth = 1; // Never want to trigger end-of-delegation. + d->top->handlers = *h; } -INLINE bool upb_sink_putstr(upb_sink *sink, upb_string *str) { - return sink->vtbl->putstr(sink, str); + +INLINE void upb_dispatch_startmsg(upb_dispatcher *d) { + assert(d->stack == d->top); + d->top->handlers.set->startmsg(d->top->handlers.closure); } -INLINE bool upb_sink_startmsg(upb_sink *sink) { - return sink->vtbl->startmsg(sink); + +INLINE void upb_dispatch_endmsg(upb_dispatcher *d) { + assert(d->stack == d->top); + d->top->handlers.set->endmsg(d->top->handlers.closure); } -INLINE bool upb_sink_endmsg(upb_sink *sink) { - return sink->vtbl->endmsg(sink); + +INLINE upb_flow_t upb_dispatch_startsubmsg(upb_dispatcher *d, + struct _upb_fielddef *f) { + upb_handlers handlers; + upb_handlers_init(&handlers); + upb_handlers_reset(&handlers); + upb_flow_t ret = d->top->handlers.set->startsubmsg(d->top->handlers.closure, f, &handlers); + assert((ret == UPB_DELEGATE) == !upb_handlers_isempty(&handlers)); + if (ret == UPB_DELEGATE) { + ++d->top; + d->top->handlers = handlers; + d->top->depth = 0; + d->top->handlers.set->startmsg(d->top->handlers.closure); + ret = UPB_CONTINUE; + } + ++d->top->depth; + upb_handlers_uninit(&handlers); + return ret; +} + +INLINE upb_flow_t upb_dispatch_endsubmsg(upb_dispatcher *d) { + if (--d->top->depth == 0) { + d->top->handlers.set->endmsg(d->top->handlers.closure); + --d->top; + } + return d->top->handlers.set->endsubmsg(d->top->handlers.closure); } -INLINE upb_status *upb_sink_status(upb_sink *sink) { return &sink->status; } +INLINE upb_flow_t upb_dispatch_value(upb_dispatcher *d, + struct _upb_fielddef *f, + upb_value val) { + return d->top->handlers.set->value(d->top->handlers.closure, f, val); +} + +INLINE upb_flow_t upb_dispatch_unknownval(upb_dispatcher *d, + upb_field_number_t fieldnum, + upb_value val) { + return d->top->handlers.set->unknownval(d->top->handlers.closure, + fieldnum, val); +} // upb_bytesink INLINE int32_t upb_bytesink_put(upb_bytesink *sink, upb_string *str) { diff --git a/core/upb_string.c b/core/upb_string.c index 847a3ee..4f5f5c2 100644 --- a/core/upb_string.c +++ b/core/upb_string.c @@ -29,6 +29,7 @@ upb_string *upb_string_new() { upb_string *str = malloc(sizeof(*str)); str->ptr = NULL; str->cached_mem = NULL; + str->len = 0; #ifndef UPB_HAVE_MSIZE str->size = 0; #endif @@ -132,6 +133,14 @@ upb_string *upb_strdup(upb_string *s) { return str; } +void upb_strcat(upb_string *s, upb_string *append) { + uint32_t old_size = upb_string_len(s); + uint32_t append_size = upb_string_len(append); + uint32_t new_size = old_size + append_size; + char *buf = upb_string_getrwbuf(s, new_size); + memcpy(buf + old_size, upb_string_getrobuf(append), append_size); +} + upb_string *upb_strreadfile(const char *filename) { FILE *f = fopen(filename, "rb"); if(!f) return NULL; diff --git a/core/upb_string.h b/core/upb_string.h index bd89f67..ee345e3 100644 --- a/core/upb_string.h +++ b/core/upb_string.h @@ -18,6 +18,11 @@ * string). * - strings are not thread-safe by default, but can be made so by calling a * function. This is not the default because it causes extra CPU overhead. + * + * Reference-counted strings have recently fallen out of favor because of the + * performance impacts of doing thread-safe reference counting with atomic + * operations. We side-step this issue by not performing atomic operations + * unless the string has been marked thread-safe. */ #ifndef UPB_STRING_H @@ -34,7 +39,7 @@ extern "C" { #endif // All members of this struct are private, and may only be read/written through -// the associated functions. Also, strings may *only* be allocated on the heap. +// the associated functions. struct _upb_string { // The pointer to our currently active data. This may be memory we own // or a pointer into memory we don't own. -- cgit v1.2.3