From edd1f5a61f3323992072fb71b6088f7ec485ff7d Mon Sep 17 00:00:00 2001 From: Joshua Haberman Date: Fri, 11 Jun 2010 12:12:39 -0700 Subject: Work on decoder buffering. --- src/upb_decoder.c | 148 +++++++++++++++++++++++++++++------------------------- src/upb_srcsink.h | 5 ++ 2 files changed, 85 insertions(+), 68 deletions(-) diff --git a/src/upb_decoder.c b/src/upb_decoder.c index 3a1c8f9..601c4c8 100644 --- a/src/upb_decoder.c +++ b/src/upb_decoder.c @@ -42,19 +42,21 @@ struct upb_decoder { // We keep a stack of messages we have recursed into. upb_decoder_frame stack[UPB_MAX_NESTING], *top, *limit; - // The buffers of input data. See buffering code below for details. + // The buffer of input data. NULL is equivalent to the empty string. upb_string *buf; - upb_string *nextbuf; - uint8_t tmpbuf[UPB_MAX_ENCODED_SIZE]; // Used to bridge buf and nextbuf. + + // Holds residual bytes when fewer than UPB_MAX_ENCODED_SIZE bytes remain. + uint8_t tmpbuf[UPB_MAX_ENCODED_SIZE]; // The number of bytes we have yet to consume from "buf". This can be // negative if we have skipped more bytes than are in the buffer, or if we // have started to consume bytes from "nextbuf". int32_t buf_bytesleft; + int32_t buf_offset; // The overall stream offset of the end of "buf". If "buf" is NULL, it is as // if "buf" was the empty string. - uint32_t buf_endoffset; + uint32_t buf_stream_offset; // Fielddef for the key we just read. upb_fielddef *field; @@ -76,29 +78,39 @@ struct upb_decoder { static upb_strlen_t upb_decoder_offset(upb_decoder *d) { - return d->buf_endoffset - d->buf_bytesleft; + return d->buf_stream_offset - d->buf_offset; } -// Discards the current buffer if we are done with it, make the next buffer -// current if there is one. -static void upb_decoder_advancebuf(upb_decoder *d) +static bool upb_decoder_nextbuf(upb_decoder *d) { - if(d->buf_bytesleft <= 0) { - if(d->buf) upb_bytesrc_recycle(d->bytesrc, d->buf); - d->buf = d->nextbuf; - d->nextbuf = NULL; - if(d->buf) d->buf_bytesleft += upb_string_len(d->buf); + assert(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE); + + // Copy residual bytes to temporary buffer. + if(d->buf_bytesleft > 0) { + memcpy(d->tmpbuf, upb_string_getrobuf(d->buf) + d->buf_offset, + d->buf_bytesleft); } -} -static bool upb_decoder_pullnextbuf(upb_decoder *d) -{ - if(!d->nextbuf && !upb_bytesrc_eof(d->bytesrc)) { - d->nextbuf = upb_bytesrc_get(d->bytesrc, UPB_MAX_ENCODED_SIZE); - if(!d->nextbuf && !upb_bytesrc_eof(d->bytesrc)) { - // There was an error in the byte stream, halt the decoder. + // Recycle old buffer, pull new one. + if(d->buf) { + upb_bytesrc_recycle(d->bytesrc, d->buf); + d->buf_offset -= upb_string_len(d->buf); + d->buf_stream_offset += upb_string_len(d->buf); + } + d->buf = upb_bytesrc_get(d->bytesrc, UPB_MAX_ENCODED_SIZE); + + // Handle cases arising from error or EOF. + if(d->buf) { + d->buf_bytesleft += upb_string_len(d->buf); + } else { + if(!upb_bytesrc_eof(d->bytesrc)) { + // Error from bytesrc. upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc)); return false; + } else if(d->buf_bytesleft == 0) { + // EOF from bytesrc and we don't have any residual bytes left. + d->src.eof = true; + return false; } } return true; @@ -106,11 +118,7 @@ static bool upb_decoder_pullnextbuf(upb_decoder *d) static bool upb_decoder_skipbytes(upb_decoder *d, int32_t bytes) { - d->buf_bytesleft -= bytes; - while(d->buf_bytesleft <= 0 && !upb_bytesrc_eof(d->bytesrc)) { - if(!upb_decoder_pullnextbuf(d)) return false; - upb_decoder_advancebuf(d); - } + // TODO. return true; } @@ -124,22 +132,28 @@ static upb_strlen_t upb_decoder_append(uint8_t *buf, upb_string *frombuf, static const uint8_t *upb_decoder_getbuf_full(upb_decoder *d, uint32_t *bytes) { - if(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE) { - upb_decoder_pullnextbuf(d); - upb_decoder_advancebuf(d); - } - if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE) { + if(d->buf_bytesleft < UPB_MAX_ENCODED_SIZE) + if(!upb_decoder_nextbuf(d)) return NULL; + + assert(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE); + + if(d->buf_offset >= 0) { + // Common case: the main buffer contains at least UPB_MAX_ENCODED_SIZE + // contiguous bytes, so we can read directly out of it. *bytes = d->buf_bytesleft; - return (uint8_t*)upb_string_getrobuf(d->buf) + upb_string_len(d->buf) - - d->buf_bytesleft; + return (uint8_t*)upb_string_getrobuf(d->buf) + d->buf_offset; } else { - upb_strlen_t len = 0; - if(d->buf) - len += upb_decoder_append(d->tmpbuf, d->buf, len, UPB_MAX_ENCODED_SIZE); - if(d->nextbuf) - len += upb_decoder_append(d->tmpbuf, d->nextbuf, len, UPB_MAX_ENCODED_SIZE); - *bytes = len; - memset(d->tmpbuf + len, 0x80, UPB_MAX_ENCODED_SIZE - len); + upb_strlen_t residual_bytes = -d->buf_offset; + if(d->buf) { + memcpy(d->tmpbuf + residual_bytes, upb_string_getrobuf(d->buf), + UPB_MAX_ENCODED_SIZE - residual_bytes); + *bytes = 10; + } else { + // All we have are residual bytes; pad them with 0x80. + memset(d->tmpbuf + residual_bytes, 0x80, + UPB_MAX_ENCODED_SIZE - residual_bytes); + *bytes = residual_bytes; + } return d->tmpbuf; } } @@ -154,12 +168,11 @@ static const uint8_t *upb_decoder_getbuf_full(upb_decoder *d, uint32_t *bytes) // indicate how many bytes were consumed. static const uint8_t *upb_decoder_getbuf(upb_decoder *d, uint32_t *bytes) { - if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE) { - // The common case; only when we get to the last ten bytes of the buffer - // do we have to do tricky things. + if(d->buf_bytesleft >= UPB_MAX_ENCODED_SIZE && d->buf_offset >= 0) { + // Common case: the main buffer contains at least UPB_MAX_ENCODED_SIZE + // contiguous bytes, so we can read directly out of it. *bytes = d->buf_bytesleft; - return (uint8_t*)upb_string_getrobuf(d->buf) + upb_string_len(d->buf) - - d->buf_bytesleft; + return (uint8_t*)upb_string_getrobuf(d->buf) + d->buf_offset; } else { return upb_decoder_getbuf_full(d, bytes); } @@ -168,9 +181,13 @@ static const uint8_t *upb_decoder_getbuf(upb_decoder *d, uint32_t *bytes) static bool upb_decoder_consume(upb_decoder *d, uint32_t bytes) { assert(bytes <= UPB_MAX_ENCODED_SIZE); - //if() + d->buf_offset += bytes; d->buf_bytesleft -= bytes; - //if(d->buf_bytesleft > upb_string_length()) + if(d->buf_offset < 0) { + // We still have residual bytes we have not consumed. + memmove(d->tmpbuf, d->tmpbuf + bytes, -d->buf_offset); + } + return true; } @@ -208,8 +225,7 @@ INLINE bool upb_decoder_readv64(upb_decoder *d, uint32_t *low, uint32_t *high) return false; done: - upb_decoder_consume(d, buf - start); - return true; + return upb_decoder_consume(d, buf - start); } // Gets a varint -- called when we only need 32 bits of it. Note that a 32-bit @@ -291,7 +307,6 @@ static uint8_t upb_decoder_skipv64(upb_decoder *d) /* upb_src implementation for upb_decoder. ************************************/ -bool upb_decoder_get_v_uint32(upb_decoder *d, uint32_t *key); bool upb_decoder_skipval(upb_decoder *d); upb_fielddef *upb_decoder_getdef(upb_decoder *d) @@ -365,20 +380,20 @@ bool upb_decoder_getval(upb_decoder *d, upb_valueptr val) d->buf_bytesleft -= total_len; *val.str = d->str; } else { - // The string spans buffers, so we must copy from the current buffer, - // the next buffer (if we have one), and finally from the bytesrc. - uint8_t *str = (uint8_t*)upb_string_getrwbuf(d->str, total_len); - upb_strlen_t len = 0; - len += upb_decoder_append(str, d->buf, len, total_len); - upb_decoder_advancebuf(d); - if(d->buf) len += upb_decoder_append(str, d->buf, len, total_len); - upb_string_getrwbuf(d->str, len); // Cheap resize. - if(len < total_len) { - if(!upb_bytesrc_append(d->bytesrc, d->str, total_len - len)) { - upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc)); - return false; - } - } + //// The string spans buffers, so we must copy from the current buffer, + //// the next buffer (if we have one), and finally from the bytesrc. + //uint8_t *str = (uint8_t*)upb_string_getrwbuf(d->str, total_len); + //upb_strlen_t len = 0; + //len += upb_decoder_append(str, d->buf, len, total_len); + //upb_decoder_advancebuf(d); + //if(d->buf) len += upb_decoder_append(str, d->buf, len, total_len); + //upb_string_getrwbuf(d->str, len); // Cheap resize. + //if(len < total_len) { + // if(!upb_bytesrc_append(d->bytesrc, d->str, total_len - len)) { + // upb_copyerr(&d->src.status, upb_bytesrc_status(d->bytesrc)); + // return false; + // } + //} } d->field = NULL; } else { @@ -521,7 +536,6 @@ upb_decoder *upb_decoder_new(upb_msgdef *msgdef) d->toplevel_msgdef = msgdef; d->limit = &d->stack[UPB_MAX_NESTING]; d->buf = NULL; - d->nextbuf = NULL; d->str = upb_string_new(); upb_src_init(&d->src, &upb_decoder_src_vtbl); return d; @@ -535,7 +549,6 @@ void upb_decoder_free(upb_decoder *d) void upb_decoder_reset(upb_decoder *d, upb_bytesrc *bytesrc) { if(d->buf) upb_bytesrc_recycle(d->bytesrc, d->buf); - if(d->nextbuf) upb_bytesrc_recycle(d->bytesrc, d->nextbuf); d->top = d->stack; d->top->msgdef = d->toplevel_msgdef; // The top-level message is not delimited (we can keep receiving data for it @@ -544,8 +557,7 @@ void upb_decoder_reset(upb_decoder *d, upb_bytesrc *bytesrc) d->top->end_offset = UINT32_MAX - 1; d->bytesrc = bytesrc; d->buf = NULL; - d->nextbuf = NULL; d->buf_bytesleft = 0; - d->buf_endoffset = 0; + d->buf_stream_offset = 0; + d->buf_offset = 0; } - diff --git a/src/upb_srcsink.h b/src/upb_srcsink.h index 8e5a09d..199149d 100644 --- a/src/upb_srcsink.h +++ b/src/upb_srcsink.h @@ -26,6 +26,11 @@ extern "C" { #endif +// Note! The "eof" flags work like feof() in C; they cannot report end-of-file +// until a read has failed due to eof. They cannot preemptively tell you that +// the next call will fail due to eof. Since these are the semantics that C +// and UNIX provide, we're stuck with them if we want to support eg. stdio. + /* upb_src ********************************************************************/ // TODO: decide how to handle unknown fields. -- cgit v1.2.3