28 #ifndef _CASA_PRODUCER_CONSUMER_STREAMS_H
29 #define _CASA_PRODUCER_CONSUMER_STREAMS_H
36 #include "pplx/pplxtasks.h"
37 #include "cpprest/astreambuf.h"
47 template<
typename _CharType>
51 typedef typename ::concurrency::streams::char_traits<_CharType> traits;
52 typedef typename basic_streambuf<_CharType>::int_type int_type;
53 typedef typename basic_streambuf<_CharType>::pos_type pos_type;
54 typedef typename basic_streambuf<_CharType>::off_type off_type;
61 m_alloc_size(alloc_size),
62 m_allocBlock(nullptr),
63 m_total(0), m_total_read(0), m_total_written(0),
80 _ASSERTE(m_requests.empty());
87 virtual bool can_seek()
const {
return false; }
92 virtual bool has_size()
const {
return false; }
99 virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in)
const
120 virtual size_t in_avail()
const {
return m_total; }
130 virtual pos_type
getpos(std::ios_base::openmode mode)
const
132 if ( ((mode & std::ios_base::in) && !this->
can_read()) ||
133 ((mode & std::ios_base::out) && !this->
can_write()))
134 return static_cast<pos_type
>(traits::eof());
136 if (mode == std::ios_base::in)
137 return (pos_type)m_total_read;
138 else if (mode == std::ios_base::out)
139 return (pos_type)m_total_written;
141 return (pos_type)traits::eof();
145 virtual pos_type
seekpos(pos_type, std::ios_base::openmode) {
return (pos_type)traits::eof(); }
146 virtual pos_type
seekoff(off_type , std::ios_base::seekdir , std::ios_base::openmode ) {
return (pos_type)traits::eof(); }
164 _ASSERTE(!m_allocBlock);
165 m_allocBlock = std::make_shared<_block>(count);
166 return m_allocBlock->wbegin();
181 _ASSERTE((
bool)m_allocBlock);
182 m_allocBlock->update_write_head(count);
183 m_blocks.push_back(m_allocBlock);
184 m_allocBlock =
nullptr;
186 update_write_head(count);
202 virtual bool acquire(_Out_ _CharType*& ptr, _Out_
size_t& count)
207 if (!this->
can_read())
return false;
211 if (m_blocks.empty())
219 auto block = m_blocks.front();
221 count = block->rd_chars_left();
222 ptr = block->rbegin();
224 _ASSERTE(ptr !=
nullptr);
235 virtual void release(_Out_writes_opt_ (count) _CharType *ptr, _In_
size_t count)
237 if (ptr ==
nullptr)
return;
240 auto block = m_blocks.front();
242 _ASSERTE(block->rd_chars_left() >= count);
243 block->m_read += count;
245 update_read_head(count);
256 fulfill_outstanding();
258 return pplx::task_from_result(
true);
263 return pplx::task_from_result((this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof());
268 return pplx::task_from_result<size_t>(this->write(ptr, count));
272 virtual pplx::task<size_t> _getn(_Out_writes_ (count) _CharType *ptr, _In_
size_t count)
275 enqueue_request(_request(count, [
this, ptr, count, tce]()
279 tce.
set(this->read(ptr, count));
281 return pplx::create_task(tce);
284 virtual size_t _sgetn(_Out_writes_ (count) _CharType *ptr, _In_
size_t count)
287 return can_satisfy(count) ? this->read(ptr, count) : (size_t)traits::requires_async();
290 virtual size_t _scopy(_Out_writes_ (count) _CharType *ptr, _In_
size_t count)
293 return can_satisfy(count) ? this->read(ptr, count,
false) : (size_t)traits::requires_async();
299 enqueue_request(_request(1, [
this, tce]()
301 tce.
set(this->read_byte(
true));
303 return pplx::create_task(tce);
306 virtual int_type _sbumpc()
309 return can_satisfy(1) ? this->read_byte(
true) : traits::requires_async();
315 enqueue_request(_request(1, [
this, tce]()
317 tce.
set(this->read_byte(
false));
319 return pplx::create_task(tce);
325 return can_satisfy(1) ? this->read_byte(
false) : traits::requires_async();
331 enqueue_request(_request(1, [
this, tce]()
333 this->read_byte(
true);
334 tce.
set(this->read_byte(
false));
336 return pplx::create_task(tce);
341 return pplx::task_from_result<int_type>(traits::eof());
354 this->m_stream_can_write =
false;
360 this->fulfill_outstanding();
363 return pplx::task_from_result();
370 void update_write_head(
size_t count)
373 m_total_written += count;
374 fulfill_outstanding();
380 size_t write(
const _CharType *ptr,
size_t count)
382 if (!this->
can_write() || (count == 0))
return 0;
386 if (!this->
can_read())
return count;
391 if ( m_blocks.empty() || m_blocks.back()->wr_chars_left() < count )
394 m_blocks.push_back(std::make_shared<_block>(alloc));
398 auto last = m_blocks.back();
399 auto countWritten = last->write(ptr, count);
400 _ASSERTE(countWritten == count);
402 update_write_head(countWritten);
410 void fulfill_outstanding()
412 while ( !m_requests.empty() )
414 auto req = m_requests.front();
418 if (!can_satisfy(req.size()))
return;
435 : m_read(0), m_pos(0), m_size(size), m_data(new _CharType[size])
459 return m_data + m_read;
465 return m_data + m_pos;
469 size_t read(_Out_writes_ (count) _CharType * dest, _In_
size_t count,
bool advance =
true)
472 auto countRead =
static_cast<size_t>(avail.Min(count));
474 _CharType * beg = rbegin();
475 _CharType * end = rbegin() + countRead;
479 std::copy(beg, end, stdext::checked_array_iterator<_CharType *>(dest, count));
481 std::copy(beg, end, dest);
493 size_t write(
const _CharType * src,
size_t count)
496 auto countWritten =
static_cast<size_t>(avail.Min(count));
498 const _CharType * srcEnd = src + countWritten;
502 std::copy(src, srcEnd, stdext::checked_array_iterator<_CharType *>(wbegin(), static_cast<size_t>(avail)));
504 std::copy(src, srcEnd, wbegin());
507 update_write_head(countWritten);
511 void update_write_head(
size_t count)
516 size_t rd_chars_left()
const {
return m_pos-m_read; }
517 size_t wr_chars_left()
const {
return m_size-m_pos; }
522 _block(
const _block&);
523 _block& operator=(
const _block&);
533 typedef std::function<void()> func_type;
534 _request(
size_t count,
const func_type& func)
535 : m_func(func), m_count(count)
555 void enqueue_request(_request req)
559 if (can_satisfy(req.size()))
567 m_requests.push(req);
574 bool can_satisfy(
size_t count)
584 int_type read_byte(
bool advance =
true)
587 auto read_size = this->read(&value, 1, advance);
588 return read_size == 1 ?
static_cast<int_type
>(value) : traits::eof();
597 size_t read(_Out_writes_ (count) _CharType *ptr, _In_
size_t count,
bool advance =
true)
599 _ASSERTE(can_satisfy(count));
603 for (
auto iter = begin(m_blocks); iter != std::end(m_blocks); ++iter)
606 auto read_from_block = block->read(ptr + read, count - read, advance);
608 read += read_from_block;
610 _ASSERTE(count >= read);
611 if (read == count)
break;
616 update_read_head(read);
626 void update_read_head(
size_t count)
629 m_total_read += count;
632 m_synced = (m_synced > count) ? (m_synced-count) : 0;
636 while (!m_blocks.empty())
639 if (m_blocks.front()->rd_chars_left() > 0)
break;
642 m_blocks.pop_front();
647 std::ios_base::openmode m_mode;
653 std::shared_ptr<_block> m_allocBlock;
659 size_t m_total_written;
671 pplx::extensibility::critical_section_t m_lock;
674 std::deque<std::shared_ptr<_block>> m_blocks;
677 std::queue<_request> m_requests;
691 template<
typename _CharType>
695 typedef _CharType char_type;
702 :
streambuf<_CharType>(std::make_shared<details::basic_producer_consumer_buffer<_CharType>>(alloc_size))
The basic_producer_consumer_buffer class serves as a memory-based steam buffer that supports both wri...
Definition: producerconsumerstream.h:48
virtual void release(_Out_writes_opt_(count) _CharType *ptr, _In_ size_t count)
Releases a block of data acquired using ::acquire method. This frees the stream buffer to de-allocate...
Definition: producerconsumerstream.h:235
A generic RAII wrapper for locks that implements the critical_section interface cpprest_synchronizati...
Definition: pplxlinux.h:264
virtual bool can_seek() const
can_seek is used to determine whether a stream buffer supports seeking.
Definition: producerconsumerstream.h:87
virtual size_t in_avail() const
For any input stream, in_avail returns the number of characters that are immediately available to be ...
Definition: producerconsumerstream.h:120
virtual pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode)
Seeks to a position given by a relative offset.
Definition: producerconsumerstream.h:146
The producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing a...
Definition: producerconsumerstream.h:692
virtual ~basic_producer_consumer_buffer()
Destructor
Definition: producerconsumerstream.h:71
virtual _CharType * _alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: producerconsumerstream.h:153
Reference-counted stream buffer.
Definition: astreambuf.h:804
virtual utility::size64_t size() const
Gets the size of the stream, if known. Calls to has_size will determine whether the result of size ca...
Definition: astreambuf.h:676
virtual bool can_read() const
can_read is used to determine whether a stream buffer will support read operations (get)...
Definition: astreambuf.h:373
virtual pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: astreambuf.h:711
virtual pos_type seekpos(pos_type, std::ios_base::openmode)
Seeks to the given position.
Definition: producerconsumerstream.h:145
virtual bool acquire(_Out_ _CharType *&ptr, _Out_ size_t &count)
Gets a pointer to the next already allocated contiguous block of data.
Definition: producerconsumerstream.h:202
virtual bool has_size() const
has_size is used to determine whether a stream buffer supports size().
Definition: producerconsumerstream.h:92
virtual bool can_write() const
can_write is used to determine whether a stream buffer will support write operations (put)...
Definition: astreambuf.h:381
Definition: astreambuf.h:362
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:176
bool set(_ResultType _Result) const
Sets the task completion event.
Definition: pplxtasks.h:2702
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:4173
virtual void set_buffer_size(size_t, std::ios_base::openmode=std::ios_base::in)
Sets the stream buffer implementation to buffer or not buffer.
Definition: producerconsumerstream.h:110
virtual size_t buffer_size(std::ios_base::openmode=std::ios_base::in) const
Get the stream buffer size, if one has been set.
Definition: producerconsumerstream.h:99
_CharType * alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: astreambuf.h:646
Definition: astreambuf.h:37
producer_consumer_buffer(size_t alloc_size=512)
Create a producer_consumer_buffer.
Definition: producerconsumerstream.h:701
virtual pos_type getpos(std::ios_base::openmode mode) const
Gets the current read or write position in the stream.
Definition: producerconsumerstream.h:130
basic_producer_consumer_buffer(size_t alloc_size)
Constructor
Definition: producerconsumerstream.h:59
virtual void _commit(size_t count)
Submits a block already allocated by the stream buffer.
Definition: producerconsumerstream.h:173