C++ Rest SDK
The C++ REST SDK is a Microsoft project for cloud-based client-server communication in native code using a modern asynchronous C++ API design. This project aims to help C++ developers connect to and interact with services.
containerstream.h
1 /***
2 * ==++==
3 *
4 * Copyright (c) Microsoft Corporation. All rights reserved.
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * ==--==
17 * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
18 *
19 * This file defines a basic STL-container-based stream buffer. Reading from the buffer will not remove any data
20 * from it and seeking is thus supported.
21 *
22 * For the latest on this and related APIs, please see http://casablanca.codeplex.com.
23 *
24 * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
25 ****/
26 #pragma once
27 
28 #include <vector>
29 #include <queue>
30 #include <algorithm>
31 #include <iterator>
32 
33 #include "pplx/pplxtasks.h"
34 #include "cpprest/astreambuf.h"
35 #include "cpprest/streams.h"
36 
37 namespace Concurrency { namespace streams {
38 
39  // Forward declarations
40 
41  template<typename _CollectionType> class container_buffer;
42 
43  namespace details {
44 
52  template<typename _CollectionType>
53  class basic_container_buffer : public streams::details::streambuf_state_manager<typename _CollectionType::value_type>
54  {
55  public:
56  typedef typename _CollectionType::value_type _CharType;
57  typedef typename basic_streambuf<_CharType>::traits traits;
58  typedef typename basic_streambuf<_CharType>::int_type int_type;
59  typedef typename basic_streambuf<_CharType>::pos_type pos_type;
60  typedef typename basic_streambuf<_CharType>::off_type off_type;
61 
65  _CollectionType& collection()
66  {
67  return m_data;
68  }
69 
74  {
75  // Invoke the synchronous versions since we need to
76  // purge the request queue before deleting the buffer
77  this->_close_read();
78  this->_close_write();
79  }
80 
81 
82  protected:
86  virtual bool can_seek() const { return this->is_open(); }
87 
91  virtual bool has_size() const { return this->is_open(); }
92 
97  virtual utility::size64_t size() const
98  {
99  return utility::size64_t(m_data.size());
100  }
101 
107  virtual size_t buffer_size(std::ios_base::openmode = std::ios_base::in) const
108  {
109  return 0;
110  }
111 
118  virtual void set_buffer_size(size_t , std::ios_base::openmode = std::ios_base::in)
119  {
120  return;
121  }
122 
128  virtual size_t in_avail() const
129  {
130  // See the comment in seek around the restriction that we do not allow read head to
131  // seek beyond the current write_end.
132  _ASSERTE(m_current_position <= m_data.size());
133 
134  msl::safeint3::SafeInt<size_t> readhead(m_current_position);
135  msl::safeint3::SafeInt<size_t> writeend(m_data.size());
136  return (size_t)(writeend - readhead);
137  }
138 
139  virtual pplx::task<bool> _sync()
140  {
141  return pplx::task_from_result(true);
142  }
143 
144  virtual pplx::task<int_type> _putc(_CharType ch)
145  {
146  int_type retVal = (this->write(&ch, 1) == 1) ? static_cast<int_type>(ch) : traits::eof();
147  return pplx::task_from_result<int_type>(retVal);
148  }
149 
150  virtual pplx::task<size_t> _putn(const _CharType *ptr, size_t count)
151  {
152  return pplx::task_from_result<size_t>(this->write(ptr, count));
153  }
154 
160  _CharType* _alloc(size_t count)
161  {
162  if (!this->can_write()) return nullptr;
163 
164  // Allocate space
165  resize_for_write(m_current_position+count);
166 
167  // Let the caller copy the data
168  return (_CharType*)&m_data[m_current_position];
169  }
170 
175  void _commit(size_t actual )
176  {
177  // Update the write position and satisfy any pending reads
178  update_current_position(m_current_position+actual);
179  }
180 
194  virtual bool acquire(_Out_ _CharType*& ptr, _Out_ size_t& count)
195  {
196  ptr = nullptr;
197  count = 0;
198 
199  if (!this->can_read()) return false;
200 
201  count = in_avail();
202 
203  if (count > 0)
204  {
205  ptr = (_CharType*)&m_data[m_current_position];
206  return true;
207  }
208  else
209  {
210  // Can only be open for read OR write, not both. If there is no data then
211  // we have reached the end of the stream so indicate such with true.
212  return true;
213  }
214  }
215 
222  virtual void release(_Out_writes_opt_ (count) _CharType *ptr, _In_ size_t count)
223  {
224  if (ptr != nullptr)
225  update_current_position(m_current_position + count);
226  }
227 
228  virtual pplx::task<size_t> _getn(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
229  {
230  return pplx::task_from_result(this->read(ptr, count));
231  }
232 
233  size_t _sgetn(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
234  {
235  return this->read(ptr, count);
236  }
237 
238  virtual size_t _scopy(_Out_writes_ (count) _CharType *ptr, _In_ size_t count)
239  {
240  return this->read(ptr, count, false);
241  }
242 
243  virtual pplx::task<int_type> _bumpc()
244  {
245  return pplx::task_from_result(this->read_byte(true));
246  }
247 
248  virtual int_type _sbumpc()
249  {
250  return this->read_byte(true);
251  }
252 
253  virtual pplx::task<int_type> _getc()
254  {
255  return pplx::task_from_result(this->read_byte(false));
256  }
257 
258  int_type _sgetc()
259  {
260  return this->read_byte(false);
261  }
262 
263  virtual pplx::task<int_type> _nextc()
264  {
265  this->read_byte(true);
266  return pplx::task_from_result(this->read_byte(false));
267  }
268 
269  virtual pplx::task<int_type> _ungetc()
270  {
271  auto pos = seekoff(-1, std::ios_base::cur, std::ios_base::in);
272  if ( pos == (pos_type)traits::eof())
273  return pplx::task_from_result(traits::eof());
274  return this->getc();
275  }
276 
284  virtual pos_type getpos(std::ios_base::openmode mode) const
285  {
286  if ( ((mode & std::ios_base::in) && !this->can_read()) ||
287  ((mode & std::ios_base::out) && !this->can_write()))
288  return static_cast<pos_type>(traits::eof());
289 
290  return static_cast<pos_type>(m_current_position);
291  }
292 
300  virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
301  {
302  pos_type beg(0);
303 
304  // In order to support relative seeking from the end position we need to fix an end position.
305  // Technically, there is no end for the stream buffer as new writes would just expand the buffer.
306  // For now, we assume that the current write_end is the end of the buffer. We use this artificial
307  // end to restrict the read head from seeking beyond what is available.
308 
309  pos_type end(m_data.size());
310 
311  if (position >= beg)
312  {
313  auto pos = static_cast<size_t>(position);
314 
315  // Read head
316  if ((mode & std::ios_base::in) && this->can_read())
317  {
318  if (position <= end)
319  {
320  // We do not allow reads to seek beyond the end or before the start position.
321  update_current_position(pos);
322  return static_cast<pos_type>(m_current_position);
323  }
324  }
325 
326  // Write head
327  if ((mode & std::ios_base::out) && this->can_write())
328  {
329  // Allocate space
330  resize_for_write(pos);
331 
332  // Nothing to really copy
333 
334  // Update write head and satisfy read requests if any
335  update_current_position(pos);
336 
337  return static_cast<pos_type>(m_current_position);
338  }
339  }
340 
341  return static_cast<pos_type>(traits::eof());
342  }
343 
353  virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
354  {
355  pos_type beg = 0;
356  pos_type cur = static_cast<pos_type>(m_current_position);
357  pos_type end = static_cast<pos_type>(m_data.size());
358 
359  switch ( way )
360  {
361  case std::ios_base::beg:
362  return seekpos(beg + offset, mode);
363 
364  case std::ios_base::cur:
365  return seekpos(cur + offset, mode);
366 
367  case std::ios_base::end:
368  return seekpos(end + offset, mode);
369 
370  default:
371  return static_cast<pos_type>(traits::eof());
372  }
373  }
374 
375  private:
376  template<typename _CollectionType1> friend class streams::container_buffer;
377 
381  basic_container_buffer(std::ios_base::openmode mode)
382  : streambuf_state_manager<typename _CollectionType::value_type>(mode),
383  m_current_position(0)
384  {
385  validate_mode(mode);
386  }
387 
391  basic_container_buffer(_CollectionType data, std::ios_base::openmode mode)
392  : streambuf_state_manager<typename _CollectionType::value_type>(mode),
393  m_data(std::move(data)),
394  m_current_position((mode & std::ios_base::in) ? 0 : m_data.size())
395  {
396  validate_mode(mode);
397  }
398 
399  static void validate_mode(std::ios_base::openmode mode)
400  {
401  // Disallow simultaneous use of the stream buffer for writing and reading.
402  if ((mode & std::ios_base::in) && (mode & std::ios_base::out))
403  throw std::invalid_argument("this combination of modes on container stream not supported");
404  }
405 
409  bool can_satisfy(size_t)
410  {
411  // We can always satisfy a read, at least partially, unless the
412  // read position is at the very end of the buffer.
413  return (in_avail() > 0);
414  }
415 
420  int_type read_byte(bool advance = true)
421  {
422  _CharType value;
423  auto read_size = this->read(&value, 1, advance);
424  return read_size == 1 ? static_cast<int_type>(value) : traits::eof();
425  }
426 
432  size_t read(_Out_writes_ (count) _CharType *ptr, _In_ size_t count, bool advance = true)
433  {
434  if (!can_satisfy(count))
435  return 0;
436 
437  msl::safeint3::SafeInt<size_t> request_size(count);
438  msl::safeint3::SafeInt<size_t> read_size = request_size.Min(in_avail());
439 
440  size_t newPos = m_current_position + read_size;
441 
442  auto readBegin = begin(m_data) + m_current_position;
443  auto readEnd = begin(m_data) + newPos;
444 
445 #ifdef _WIN32
446  // Avoid warning C4996: Use checked iterators under SECURE_SCL
447  std::copy(readBegin, readEnd, stdext::checked_array_iterator<_CharType *>(ptr, count));
448 #else
449  std::copy(readBegin, readEnd, ptr);
450 #endif // _WIN32
451 
452  if (advance)
453  {
454  update_current_position(newPos);
455  }
456 
457  return (size_t) read_size;
458  }
459 
463  size_t write(const _CharType *ptr, size_t count)
464  {
465  if (!this->can_write() || (count == 0)) return 0;
466 
467  auto newSize = m_current_position + count;
468 
469  // Allocate space
470  resize_for_write(newSize);
471 
472  // Copy the data
473  std::copy(ptr, ptr + count, begin(m_data) + m_current_position);
474 
475  // Update write head and satisfy pending reads if any
476  update_current_position(newSize);
477 
478  return count;
479  }
480 
484  void resize_for_write(size_t newPos)
485  {
486  // Resize the container if required
487  if (newPos > m_data.size())
488  {
489  m_data.resize(newPos);
490  }
491  }
492 
496  void update_current_position(size_t newPos)
497  {
498  // The new write head
499  m_current_position = newPos;
500  _ASSERTE(m_current_position <= m_data.size());
501  }
502 
503  // The actual data store
504  _CollectionType m_data;
505 
506  // Read/write head
507  size_t m_current_position;
508  };
509 
510  } // namespace details
511 
522  template<typename _CollectionType>
523  class container_buffer : public streambuf<typename _CollectionType::value_type>
524  {
525  public:
526  typedef typename _CollectionType::value_type char_type;
527 
533  container_buffer(_CollectionType data, std::ios_base::openmode mode = std::ios_base::in)
534  : streambuf<typename _CollectionType::value_type>(
535  std::shared_ptr<details::basic_container_buffer<_CollectionType>>(new streams::details::basic_container_buffer<_CollectionType>(std::move(data), mode)))
536  {
537  }
538 
543  container_buffer(std::ios_base::openmode mode = std::ios_base::out)
544  : streambuf<typename _CollectionType::value_type>(
545  std::shared_ptr<details::basic_container_buffer<_CollectionType>>(new details::basic_container_buffer<_CollectionType>(mode)))
546  {
547  }
548 
549  _CollectionType& collection() const
550  {
551  auto listBuf = static_cast<details::basic_container_buffer<_CollectionType> *>(this->get_base().get());
552  return listBuf->collection();
553  }
554  };
555 
562  template<typename _CollectionType>
564  {
565  public:
566 
567  typedef typename _CollectionType::value_type char_type;
569 
575  static concurrency::streams::basic_istream<char_type> open_istream(_CollectionType data)
576  {
577  return concurrency::streams::basic_istream<char_type>(buffer_type(std::move(data), std::ios_base::in));
578  }
579 
584  static concurrency::streams::basic_ostream<char_type> open_ostream()
585  {
586  return concurrency::streams::basic_ostream<char_type>(buffer_type(std::ios_base::out));
587  }
588  };
589 
596 
599 
604  {
605  public:
606 
613  template<typename _CollectionType>
614  static concurrency::streams::istream open_istream(_CollectionType data)
615  {
616  return concurrency::streams::istream(streams::container_buffer<_CollectionType>(std::move(data), std::ios_base::in));
617  }
618 
624  template<typename _CollectionType>
625  static concurrency::streams::ostream open_ostream()
626  {
627  return concurrency::streams::ostream(streams::container_buffer<_CollectionType>());
628  }
629 };
630 
631 
632 }} // namespaces
container_buffer(std::ios_base::openmode mode=std::ios_base::out)
Creates a container_buffer starting from an empty collection.
Definition: containerstream.h:543
virtual pplx::task< int_type > getc()
Reads a single character from the stream without advancing the read position.
Definition: astreambuf.h:526
virtual bool is_open() const
Checks if the stream buffer is open.
Definition: astreambuf.h:390
Reference-counted stream buffer.
Definition: astreambuf.h:804
static concurrency::streams::istream open_istream(_CollectionType data)
Creates a single byte character input stream given an STL container.
Definition: containerstream.h:614
static concurrency::streams::basic_ostream< char_type > open_ostream()
Creates an output stream using an STL container as the storage.
Definition: containerstream.h:584
virtual utility::size64_t size() const
Gets the size of the stream, if known. Calls to has_size will determine whether the result of size ca...
Definition: containerstream.h:97
virtual size_t buffer_size(std::ios_base::openmode=std::ios_base::in) const
Get the stream buffer size, if one has been set.
Definition: containerstream.h:107
A static class to allow users to create input and out streams based off STL collections. The sole purpose of this class to avoid users from having to know anything about stream buffers.
Definition: containerstream.h:563
virtual bool can_read() const
can_read is used to determine whether a stream buffer will support read operations (get)...
Definition: astreambuf.h:373
virtual void set_buffer_size(size_t, std::ios_base::openmode=std::ios_base::in)
Sets the stream buffer implementation to buffer or not buffer.
Definition: containerstream.h:118
virtual pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: astreambuf.h:711
The basic_container_buffer class serves as a memory-based steam buffer that supports writing or readi...
Definition: containerstream.h:41
virtual pos_type seekpos(pos_type position, std::ios_base::openmode mode)
Seeks to the given position.
Definition: containerstream.h:300
virtual bool acquire(_Out_ _CharType *&ptr, _Out_ size_t &count)
Gets a pointer to the next already allocated contiguous block of data.
Definition: containerstream.h:194
virtual pplx::task< void > _close_write()
The real write head close operation, implementation should override it if there is any resource to be...
Definition: astreambuf.h:720
static concurrency::streams::ostream open_ostream()
Creates a single byte character output stream using an STL container as storage.
Definition: containerstream.h:625
virtual bool can_write() const
can_write is used to determine whether a stream buffer will support write operations (put)...
Definition: astreambuf.h:381
_CharType * _alloc(size_t count)
Allocates a contiguous memory block and returns it.
Definition: containerstream.h:160
virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
Seeks to a position given by a relative offset.
Definition: containerstream.h:353
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:176
virtual size_t in_avail() const
For any input stream, in_avail returns the number of characters that are immediately available to be ...
Definition: containerstream.h:128
virtual pos_type getpos(std::ios_base::openmode mode) const
Gets the current read or write position in the stream.
Definition: containerstream.h:284
The bytestream is a static class that allows an input stream to be constructed from any STL container...
Definition: containerstream.h:603
container_buffer(_CollectionType data, std::ios_base::openmode mode=std::ios_base::in)
Creates a container_buffer given a collection, copying its data into the buffer.
Definition: containerstream.h:533
Definition: astreambuf.h:37
_CollectionType & collection()
Returns the underlying data container
Definition: containerstream.h:65
virtual bool can_seek() const
can_seek is used to determine whether a stream buffer supports seeking.
Definition: containerstream.h:86
virtual void release(_Out_writes_opt_(count) _CharType *ptr, _In_ size_t count)
Releases a block of data acquired using ::acquire method. This frees the stream buffer to de-allocate...
Definition: containerstream.h:222
void _commit(size_t actual)
Submits a block already allocated by the stream buffer.
Definition: containerstream.h:175
static concurrency::streams::basic_istream< char_type > open_istream(_CollectionType data)
Creates an input stream given an STL container.
Definition: containerstream.h:575
container_stream< std::basic_string< char > > stringstream
The stringstream allows an input stream to be constructed from std::string or std::wstring For output...
Definition: containerstream.h:594
The basic_container_buffer class serves as a memory-based steam buffer that supports writing or readi...
Definition: containerstream.h:53
virtual bool has_size() const
has_size is used to determine whether a stream buffer supports size().
Definition: containerstream.h:91
virtual ~basic_container_buffer()
Destructor
Definition: containerstream.h:73