C++ Rest SDK
The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design. This project aims to help C++ developers connect to and interact with services.
producerconsumerstream.h
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * ==--==
17 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
18 *
19 * This file defines a basic memory-based stream buffer, which allows consumer / producer pairs to communicate
20 * data via a buffer.
21 *
22 * For the latest on this and related APIs, please see http://casablanca.codeplex.com.
23 *
24 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
25 ****/
26 #pragma once
27 
28 #ifndef _CASA_PRODUCER_CONSUMER_STREAMS_H
29 #define _CASA_PRODUCER_CONSUMER_STREAMS_H
30 
31 #include <vector>
32 #include <queue>
33 #include <algorithm>
34 #include <iterator>
35 
36 #include "pplx/pplxtasks.h"
37 #include "cpprest/astreambuf.h"
38 
39 namespace Concurrency { namespace streams {
40 
41  namespace details {
42 
47  template<typename _CharType>
49  {
50  public:
51  typedef typename ::concurrency::streams::char_traits<_CharType> traits;
52  typedef typename basic_streambuf<_CharType>::int_type int_type;
53  typedef typename basic_streambuf<_CharType>::pos_type pos_type;
54  typedef typename basic_streambuf<_CharType>::off_type off_type;
55 
59  basic_producer_consumer_buffer(size_t alloc_size)
60  : streambuf_state_manager<_CharType>(std::ios_base::out | std::ios_base::in),
61  m_alloc_size(alloc_size),
62  m_allocBlock(nullptr),
63  m_total(0), m_total_read(0), m_total_written(0),
64  m_synced(0)
65  {
66  }
67 
72  {
73  // Note: there is no need to call 'wait()' on the result of close(),
74  // since we happen to know that close() will return without actually
75  // doing anything asynchronously. Should the implementation of _close_write()
76  // change in that regard, this logic may also have to change.
77  this->_close_read();
78  this->_close_write();
79 
80  _ASSERTE(m_requests.empty());
81  m_blocks.clear();
82  }
83 
87  virtual bool can_seek() const { return false; }
88 
92  virtual bool has_size() const { return false; }
93 
99  virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const
100  {
101  return 0;
102  }
103 
110  virtual void set_buffer_size(size_t , std::ios_base::openmode = std::ios_base::in)
111  {
112  return;
113  }
114 
120  virtual size_t in_avail() const { return m_total; }
121 
122 
130  virtual pos_type getpos(std::ios_base::openmode mode) const
131  {
132  if ( ((mode & std::ios_base::in) && !this->can_read()) ||
133  ((mode & std::ios_base::out) && !this->can_write()))
134  return static_cast<pos_type>(traits::eof());
135 
136  if (mode == std::ios_base::in)
137  return (pos_type)m_total_read;
138  else if (mode == std::ios_base::out)
139  return (pos_type)m_total_written;
140  else
141  return (pos_type)traits::eof();
142  }
143 
144  // Seeking is not supported
145  virtual pos_type seekpos(pos_type, std::ios_base::openmode) { return (pos_type)traits::eof(); }
146  virtual pos_type seekoff(off_type , std::ios_base::seekdir , std::ios_base::openmode ) { return (pos_type)traits::eof(); }
147 
153  virtual _CharType* _alloc(size_t count)
154  {
155  if (!this->can_write())
156  {
157  return nullptr;
158  }
159 
160  // We always allocate a new block even if the count could be satisfied by
161  // the current write block. While this does lead to wasted space it allows for
162  // easier book keeping
163 
164  _ASSERTE(!m_allocBlock);
165  m_allocBlock = std::make_shared<_block>(count);
166  return m_allocBlock->wbegin();
167  }
168 
173  virtual void _commit(size_t count)
174  {
176 
177  // The count does not reflect the actual size of the block.
178  // Since we do not allow any more writes to this block it would suffice.
179  // If we ever change the algorithm to reuse blocks then this needs to be revisited.
180 
181  _ASSERTE((bool)m_allocBlock);
182  m_allocBlock->update_write_head(count);
183  m_blocks.push_back(m_allocBlock);
184  m_allocBlock = nullptr;
185 
186  update_write_head(count);
187  }
188 
202  virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
203  {
204  count = 0;
205  ptr = nullptr;
206 
207  if (!this->can_read()) return false;
208 
210 
211  if (m_blocks.empty())
212  {
213  // If the write head has been closed then have reached the end of the
214  // stream (return true), otherwise more data could be written later (return false).
215  return !this->can_write();
216  }
217  else
218  {
219  auto block = m_blocks.front();
220 
221  count = block->rd_chars_left();
222  ptr = block->rbegin();
223 
224  _ASSERTE(ptr != nullptr);
225  return true;
226  }
227  }
228 
235  virtual void release(_Out_writes_opt_ (count) _CharType *ptr, _In_ size_t count)
236  {
237  if (ptr == nullptr) return;
238 
240  auto block = m_blocks.front();
241 
242  _ASSERTE(block->rd_chars_left() >= count);
243  block->m_read += count;
244 
245  update_read_head(count);
246  }
247 
248  protected:
249 
250  virtual pplx::task<bool> _sync()
251  {
253 
254  m_synced = in_avail();
255 
256  fulfill_outstanding();
257 
258  return pplx::task_from_result(true);
259  }
260 
261  virtual pplx::task<int_type> _putc(_CharType ch)
262  {
263  return pplx::task_from_result((this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof());
264  }
265 
266  virtual pplx::task<size_t> _putn(const _CharType *ptr, size_t count)
267  {
268  return pplx::task_from_result<size_t>(this->write(ptr, count));
269  }
270 
271 
272  virtual pplx::task<size_t> _getn(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
273  {
275  enqueue_request(_request(count, [this, ptr, count, tce]()
276  {
277  // VS 2010 resolves read to a global function. Explicit
278  // invocation through the "this" pointer fixes the issue.
279  tce.set(this->read(ptr, count));
280  }));
281  return pplx::create_task(tce);
282  }
283 
284  virtual size_t _sgetn(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
285  {
287  return can_satisfy(count) ? this->read(ptr, count) : (size_t)traits::requires_async();
288  }
289 
290  virtual size_t _scopy(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
291  {
293  return can_satisfy(count) ? this->read(ptr, count, false) : (size_t)traits::requires_async();
294  }
295 
296  virtual pplx::task<int_type> _bumpc()
297  {
299  enqueue_request(_request(1, [this, tce]()
300  {
301  tce.set(this->read_byte(true));
302  }));
303  return pplx::create_task(tce);
304  }
305 
306  virtual int_type _sbumpc()
307  {
309  return can_satisfy(1) ? this->read_byte(true) : traits::requires_async();
310  }
311 
312  virtual pplx::task<int_type> _getc()
313  {
315  enqueue_request(_request(1, [this, tce]()
316  {
317  tce.set(this->read_byte(false));
318  }));
319  return pplx::create_task(tce);
320  }
321 
322  int_type _sgetc()
323  {
325  return can_satisfy(1) ? this->read_byte(false) : traits::requires_async();
326  }
327 
328  virtual pplx::task<int_type> _nextc()
329  {
331  enqueue_request(_request(1, [this, tce]()
332  {
333  this->read_byte(true);
334  tce.set(this->read_byte(false));
335  }));
336  return pplx::create_task(tce);
337  }
338 
339  virtual pplx::task<int_type> _ungetc()
340  {
341  return pplx::task_from_result<int_type>(traits::eof());
342  }
343 
344  private:
345 
349  pplx::task<void> _close_write()
350  {
351  // First indicate that there could be no more writes.
352  // Fulfill outstanding relies on that to flush all the
353  // read requests.
354  this->m_stream_can_write = false;
355 
356  {
358 
359  // This runs on the thread that called close.
360  this->fulfill_outstanding();
361  }
362 
363  return pplx::task_from_result();
364  }
365 
370  void update_write_head(size_t count)
371  {
372  m_total += count;
373  m_total_written += count;
374  fulfill_outstanding();
375  }
376 
380  size_t write(const _CharType *ptr, size_t count)
381  {
382  if (!this->can_write() || (count == 0)) return 0;
383 
384  // If no one is going to read, why bother?
385  // Just pretend to be writing!
386  if (!this->can_read()) return count;
387 
389 
390  // Allocate a new block if necessary
391  if ( m_blocks.empty() || m_blocks.back()->wr_chars_left() < count )
392  {
393  msl::safeint3::SafeInt<size_t> alloc = m_alloc_size.Max(count);
394  m_blocks.push_back(std::make_shared<_block>(alloc));
395  }
396 
397  // The block at the back is always the write head
398  auto last = m_blocks.back();
399  auto countWritten = last->write(ptr, count);
400  _ASSERTE(countWritten == count);
401 
402  update_write_head(countWritten);
403  return countWritten;
404  }
405 
410  void fulfill_outstanding()
411  {
412  while ( !m_requests.empty() )
413  {
414  auto req = m_requests.front();
415 
416  // If we cannot satisfy the request then we need
417  // to wait for the producer to write data
418  if (!can_satisfy(req.size())) return;
419 
420  // We have enough data to satisfy this request
421  req.complete();
422 
423  // Remove it from the request queue
424  m_requests.pop();
425  }
426  }
427 
431  class _block
432  {
433  public:
434  _block(size_t size)
435  : m_read(0), m_pos(0), m_size(size), m_data(new _CharType[size])
436  {
437  }
438 
439  ~_block()
440  {
441  delete [] m_data;
442  }
443 
444  // Read head
445  size_t m_read;
446 
447  // Write head
448  size_t m_pos;
449 
450  // Allocation size (of m_data)
451  size_t m_size;
452 
453  // The data store
454  _CharType * m_data;
455 
456  // Pointer to the read head
457  _CharType * rbegin()
458  {
459  return m_data + m_read;
460  }
461 
462  // Pointer to the write head
463  _CharType * wbegin()
464  {
465  return m_data + m_pos;
466  }
467 
468  // Read up to count characters from the block
469  size_t read(_Out_writes_ (count) _CharType * dest, _In_ size_t count, bool advance = true)
470  {
471  msl::safeint3::SafeInt<size_t> avail(rd_chars_left());
472  auto countRead = static_cast<size_t>(avail.Min(count));
473 
474  _CharType * beg = rbegin();
475  _CharType * end = rbegin() + countRead;
476 
477 #ifdef _WIN32
478  // Avoid warning C4996: Use checked iterators under SECURE_SCL
479  std::copy(beg, end, stdext::checked_array_iterator<_CharType *>(dest, count));
480 #else
481  std::copy(beg, end, dest);
482 #endif // _WIN32
483 
484  if (advance)
485  {
486  m_read += countRead;
487  }
488 
489  return countRead;
490  }
491 
492  // Write count characters into the block
493  size_t write(const _CharType * src, size_t count)
494  {
495  msl::safeint3::SafeInt<size_t> avail(wr_chars_left());
496  auto countWritten = static_cast<size_t>(avail.Min(count));
497 
498  const _CharType * srcEnd = src + countWritten;
499 
500 #ifdef _WIN32
501  // Avoid warning C4996: Use checked iterators under SECURE_SCL
502  std::copy(src, srcEnd, stdext::checked_array_iterator<_CharType *>(wbegin(), static_cast<size_t>(avail)));
503 #else
504  std::copy(src, srcEnd, wbegin());
505 #endif // _WIN32
506 
507  update_write_head(countWritten);
508  return countWritten;
509  }
510 
511  void update_write_head(size_t count)
512  {
513  m_pos += count;
514  }
515 
516  size_t rd_chars_left() const { return m_pos-m_read; }
517  size_t wr_chars_left() const { return m_size-m_pos; }
518 
519  private:
520 
521  // Copy is not supported
522  _block(const _block&);
523  _block& operator=(const _block&);
524  };
525 
529  class _request
530  {
531  public:
532 
533  typedef std::function<void()> func_type;
534  _request(size_t count, const func_type& func)
535  : m_func(func), m_count(count)
536  {
537  }
538 
539  void complete()
540  {
541  m_func();
542  }
543 
544  size_t size() const
545  {
546  return m_count;
547  }
548 
549  private:
550 
551  func_type m_func;
552  size_t m_count;
553  };
554 
555  void enqueue_request(_request req)
556  {
558 
559  if (can_satisfy(req.size()))
560  {
561  // We can immediately fulfill the request.
562  req.complete();
563  }
564  else
565  {
566  // We must wait for data to arrive.
567  m_requests.push(req);
568  }
569  }
570 
574  bool can_satisfy(size_t count)
575  {
576  return (m_synced > 0) || (this->in_avail() >= count) || !this->can_write();
577  }
578 
584  int_type read_byte(bool advance = true)
585  {
586  _CharType value;
587  auto read_size = this->read(&value, 1, advance);
588  return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
589  }
590 
597  size_t read(_Out_writes_ (count) _CharType *ptr, _In_ size_t count, bool advance = true)
598  {
599  _ASSERTE(can_satisfy(count));
600 
601  size_t read = 0;
602 
603  for (auto iter = begin(m_blocks); iter != std::end(m_blocks); ++iter)
604  {
605  auto block = *iter;
606  auto read_from_block = block->read(ptr + read, count - read, advance);
607 
608  read += read_from_block;
609 
610  _ASSERTE(count >= read);
611  if (read == count) break;
612  }
613 
614  if (advance)
615  {
616  update_read_head(read);
617  }
618 
619  return read;
620  }
621 
626  void update_read_head(size_t count)
627  {
628  m_total -= count;
629  m_total_read += count;
630 
631  if ( m_synced > 0 )
632  m_synced = (m_synced > count) ? (m_synced-count) : 0;
633 
634  // The block at the front is always the read head.
635  // Purge empty blocks so that the block at the front reflects the read head
636  while (!m_blocks.empty())
637  {
638  // If front block is not empty - we are done
639  if (m_blocks.front()->rd_chars_left() > 0) break;
640 
641  // The block has no more data to be read. Relase the block
642  m_blocks.pop_front();
643  }
644  }
645 
646  // The in/out mode for the buffer
647  std::ios_base::openmode m_mode;
648 
649  // Default block size
650  msl::safeint3::SafeInt<size_t> m_alloc_size;
651 
652  // Block used for alloc/commit
653  std::shared_ptr<_block> m_allocBlock;
654 
655  // Total available data
656  size_t m_total;
657 
658  size_t m_total_read;
659  size_t m_total_written;
660 
661  // Keeps track of the number of chars that have been flushed but still
662  // remain to be consumed by a read operation.
663  size_t m_synced;
664 
665  // The producer-consumer buffer is intended to be used concurrently by a reader
666  // and a writer, who are not coordinating their accesses to the buffer (coordination
667  // being what the buffer is for in the first place). Thus, we have to protect
668  // against some of the internal data elements against concurrent accesses
669  // and the possibility of inconsistent states. A simple non-recursive lock
670  // should be sufficient for those purposes.
671  pplx::extensibility::critical_section_t m_lock;
672 
673  // Memory blocks
674  std::deque<std::shared_ptr<_block>> m_blocks;
675 
676  // Queue of requests
677  std::queue<_request> m_requests;
678  };
679 
680  } // namespace details
681 
691  template<typename _CharType>
692  class producer_consumer_buffer : public streambuf<_CharType>
693  {
694  public:
695  typedef _CharType char_type;
696 
701  producer_consumer_buffer(size_t alloc_size = 512)
702  : streambuf<_CharType>(std::make_shared<details::basic_producer_consumer_buffer<_CharType>>(alloc_size))
703  {
704  }
705  };
706 
707 }} // namespaces
708 
709 #endif
The basic_producer_consumer_buffer class serves as a memory-based steam buffer that supports both wri...
Definition: producerconsumerstream.h:48
virtual void release(_Out_writes_opt_(count) _CharType *ptr, _In_ size_t count)
Releases a block of data acquired using ::acquire method. This frees the stream buffer to de-allocate...
Definition: producerconsumerstream.h:235
A generic RAII wrapper for locks that implements the critical_section interface cpprest_synchronizati...
Definition: pplxlinux.h:264
virtual bool can_seek() const
can_seek is used to determine whether a stream buffer supports seeking.
Definition: producerconsumerstream.h:87
virtual size_t in_avail() const
For any input stream, in_avail returns the number of characters that are immediately available to be ...
Definition: producerconsumerstream.h:120
virtual pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode)
Seeks to a position given by a relative offset.
Definition: producerconsumerstream.h:146
The producer_consumer_buffer class serves as a memory-based steam buffer that supports both writing a...
Definition: producerconsumerstream.h:692
virtual ~basic_producer_consumer_buffer()
Destructor
Definition: producerconsumerstream.h:71
virtual _CharType * _alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: producerconsumerstream.h:153
Reference-counted stream buffer.
Definition: astreambuf.h:804
virtual utility::size64_t size() const
Gets the size of the stream, if known. Calls to has_size will determine whether the result of size ca...
Definition: astreambuf.h:676
virtual bool can_read() const
can_read is used to determine whether a stream buffer will support read operations (get)...
Definition: astreambuf.h:373
virtual pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: astreambuf.h:711
virtual pos_type seekpos(pos_type, std::ios_base::openmode)
Seeks to the given position.
Definition: producerconsumerstream.h:145
virtual bool acquire(_Out_ _CharType *&ptr, _Out_ size_t &count)
Gets a pointer to the next already allocated contiguous block of data.
Definition: producerconsumerstream.h:202
virtual bool has_size() const
has_size is used to determine whether a stream buffer supports size().
Definition: producerconsumerstream.h:92
virtual bool can_write() const
can_write is used to determine whether a stream buffer will support write operations (put)...
Definition: astreambuf.h:381
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:176
bool set(_ResultType _Result) const
Sets the task completion event.
Definition: pplxtasks.h:2702
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:4173
virtual void set_buffer_size(size_t, std::ios_base::openmode=std::ios_base::in)
Sets the stream buffer implementation to buffer or not buffer.
Definition: producerconsumerstream.h:110
virtual size_t buffer_size(std::ios_base::openmode=std::ios_base::in) const
Get the stream buffer size, if one has been set.
Definition: producerconsumerstream.h:99
_CharType * alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: astreambuf.h:646
Definition: astreambuf.h:37
producer_consumer_buffer(size_t alloc_size=512)
Create a producer_consumer_buffer.
Definition: producerconsumerstream.h:701
virtual pos_type getpos(std::ios_base::openmode mode) const
Gets the current read or write position in the stream.
Definition: producerconsumerstream.h:130
basic_producer_consumer_buffer(size_t alloc_size)
Constructor
Definition: producerconsumerstream.h:59
virtual void _commit(size_t count)
Submits a block already allocated by the stream buffer.
Definition: producerconsumerstream.h:173