27 #ifndef _CASA_FILE_STREAMS_H
28 #define _CASA_FILE_STREAMS_H
30 #include "cpprest/details/fileio.h"
31 #include "cpprest/astreambuf.h"
32 #include "cpprest/streams.h"
36 #ifndef _LWRCASE_CNCRRNCY
37 #define _LWRCASE_CNCRRNCY
58 m_lastOperation = pplx::task_from_result();
63 template <
typename Func>
64 auto enqueue_operation(Func &&op) -> decltype(op())
75 res = m_lastOperation.then([=] {
79 m_lastOperation = res.then([&] (decltype(op())) {
89 m_lastOperation.
wait();
98 template<
typename _CharType>
102 typedef typename basic_streambuf<_CharType>::traits traits;
103 typedef typename basic_streambuf<_CharType>::int_type int_type;
104 typedef typename basic_streambuf<_CharType>::pos_type pos_type;
105 typedef typename basic_streambuf<_CharType>::off_type off_type;
132 virtual utility::size64_t
size()
const
136 return _get_size(m_info,
sizeof(_CharType));
145 virtual size_t buffer_size(std::ios_base::openmode direction = std::ios_base::in)
const
147 if ( direction == std::ios_base::in )
148 return m_info->m_buffer_size;
162 if ( direction == std::ios_base::out )
return;
164 m_info->m_buffer_size =
size;
166 if ( size == 0 && m_info->m_buffer !=
nullptr )
168 delete m_info->m_buffer;
169 m_info->m_buffer =
nullptr;
182 return _in_avail_unprot();
185 size_t _in_avail_unprot()
const
187 if ( !this->
is_open() )
return 0;
189 if ( m_info->m_buffer ==
nullptr || m_info->m_buffill == 0 )
return 0;
190 if ( m_info->m_bufoff > m_info->m_rdpos || (m_info->m_bufoff+m_info->m_buffill) < m_info->m_rdpos )
return 0;
196 return buffill - bufpos;
199 _file_info * _close_stream()
202 auto fileInfo = m_info;
210 auto callback =
new _filestream_callback_close(result_tce);
212 if ( !_close_fsb_nolock(&fileInfo, callback) )
215 return pplx::task_from_result();
217 return pplx::create_task(result_tce);
221 void _invoke_parent_close_read()
228 return m_readOps.enqueue_operation([
this]
230 _invoke_parent_close_read();
234 return pplx::task_from_result();
240 auto fileInfo = _close_stream();
242 return _close_file(fileInfo);
253 return flush_internal();
272 auto fileInfo = this->_close_stream();
274 return this->_close_file(fileInfo);
287 auto callback =
new _filestream_callback_write<size_t>(m_info, result_tce);
290 std::shared_ptr<_CharType> sharedCh;
293 sharedCh = std::make_shared<_CharType>(ch);
294 }
catch (
const std::bad_alloc &)
300 size_t written = _putn_fsb(m_info, callback, sharedCh.get(), 1,
sizeof(_CharType));
301 if (written ==
sizeof(_CharType))
304 return pplx::task_from_result<int_type>(ch);
307 return pplx::create_task(result_tce).then([sharedCh](
size_t)
309 return static_cast<int_type
>(*sharedCh);
344 virtual bool acquire(_Out_ _CharType*& ptr, _Out_
size_t& count)
357 virtual void release(_Out_writes_ (count) _CharType *, _In_
size_t count)
371 auto callback =
new _filestream_callback_write<size_t>(m_info, result_tce);
373 size_t written = _putn_fsb(m_info, callback, ptr, count,
sizeof(_CharType));
375 if ( written != 0 && written != -1 )
378 written = written/
sizeof(_CharType);
379 return pplx::task_from_result<size_t>(written);
381 return pplx::create_task(result_tce);
389 auto sharedData = std::make_shared<std::vector<_CharType>>(ptr, ptr + count);
390 return _putn(ptr, count).then([sharedData](
size_t size)
397 return _putn(ptr, count);
408 if ( _in_avail_unprot() > 0 )
414 if ( _in_avail_unprot() > 0 )
416 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
417 _CharType ch = m_info->m_buffer[bufoff*
sizeof(_CharType)];
418 m_info->m_rdpos += 1;
419 return pplx::task_from_result<int_type>(ch);
424 auto callback =
new _filestream_callback_bumpc(m_info, result_tce);
426 size_t ch = _getn_fsb(m_info, callback, &callback->m_ch, 1,
sizeof(_CharType));
428 if ( ch ==
sizeof(_CharType) )
431 m_info->m_rdpos += 1;
432 _CharType ch1 = (_CharType)callback->m_ch;
434 return pplx::task_from_result<int_type>(ch1);
436 return pplx::create_task(result_tce);
448 if ( m_info->m_atend )
return traits::eof();
450 if ( _in_avail_unprot() == 0 )
return traits::requires_async();
454 if ( _in_avail_unprot() == 0 )
return traits::requires_async();
456 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
457 _CharType ch = m_info->m_buffer[bufoff*
sizeof(_CharType)];
458 m_info->m_rdpos += 1;
464 if ( _in_avail_unprot() > 0 )
470 if ( _in_avail_unprot() > 0 )
472 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
473 _CharType ch = m_info->m_buffer[bufoff*
sizeof(_CharType)];
474 return pplx::task_from_result<int_type>(ch);
479 auto callback =
new _filestream_callback_getc(m_info, result_tce);
481 size_t ch = _getn_fsb(m_info, callback, &callback->m_ch, 1,
sizeof(_CharType));
483 if ( ch ==
sizeof(_CharType) )
486 _CharType ch1 = (_CharType)callback->m_ch;
488 return pplx::task_from_result<int_type>(ch1);
490 return pplx::create_task(result_tce);
513 if ( m_info->m_atend )
return traits::eof();
515 if ( _in_avail_unprot() == 0 )
return traits::requires_async();
519 if ( _in_avail_unprot() == 0 )
return traits::requires_async();
521 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
522 _CharType ch = m_info->m_buffer[bufoff*
sizeof(_CharType)];
533 _seekrdpos_fsb(m_info, m_info->m_rdpos+1,
sizeof(_CharType));
534 if ( m_info->m_atend )
536 return this->_getcImpl();
547 if ( m_info->m_rdpos == 0 )
549 _seekrdpos_fsb(m_info, m_info->m_rdpos-1,
sizeof(_CharType));
550 return this->_getcImpl();
563 if ( m_info->m_atend || count == 0 )
564 return pplx::task_from_result<size_t>(0);
566 if ( _in_avail_unprot() >= count )
572 if ( _in_avail_unprot() >= count )
574 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
575 std::memcpy((
void *)ptr, this->m_info->m_buffer+bufoff*
sizeof(_CharType), count*
sizeof(_CharType));
577 m_info->m_rdpos += count;
578 return pplx::task_from_result<size_t>(count);
583 auto callback =
new _filestream_callback_read(m_info, result_tce);
585 size_t read = _getn_fsb(m_info, callback, ptr, count,
sizeof(_CharType));
587 if ( read != 0 && read != -1)
591 m_info->m_rdpos += read/
sizeof(_CharType);
592 return pplx::task_from_result<size_t>(read/
sizeof(_CharType));
594 return pplx::create_task(result_tce);
605 size_t _sgetn(_Out_writes_ (count) _CharType *ptr, _In_
size_t count)
608 if ( m_info->m_atend )
return 0;
610 if ( count == 0 ||
in_avail() == 0 )
return 0;
614 size_t available = _in_avail_unprot();
615 size_t copy = (count < available) ? count : available;
617 auto bufoff = m_info->m_rdpos - m_info->m_bufoff;
618 std::memcpy((
void *)ptr, this->m_info->m_buffer+bufoff*
sizeof(_CharType), copy*
sizeof(_CharType));
620 m_info->m_rdpos += copy;
621 m_info->m_atend = (copy < count);
632 virtual size_t _scopy(_CharType *,
size_t)
644 virtual pos_type
getpos(std::ios_base::openmode mode)
const
657 virtual pos_type
seekpos(pos_type pos, std::ios_base::openmode mode)
659 if ( mode == std::ios_base::in )
662 return (pos_type)_seekrdpos_fsb(m_info,
size_t(pos),
sizeof(_CharType));
664 else if ( (m_info->m_mode & std::ios::ios_base::app) == 0 )
666 return (pos_type)_seekwrpos_fsb(m_info,
size_t(pos),
sizeof(_CharType));
680 virtual pos_type
seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
682 if ( mode == std::ios_base::in )
687 case std::ios_base::beg:
688 return (pos_type)_seekrdpos_fsb(m_info,
size_t(offset),
sizeof(_CharType));
689 case std::ios_base::cur:
690 return (pos_type)_seekrdpos_fsb(m_info,
size_t(m_info->m_rdpos+offset),
sizeof(_CharType));
691 case std::ios_base::end:
692 return (pos_type)_seekrdtoend_fsb(m_info, int64_t(offset),
sizeof(_CharType));
698 else if ( (m_info->m_mode & std::ios::ios_base::app) == 0 )
702 case std::ios_base::beg:
703 return (pos_type)_seekwrpos_fsb(m_info,
size_t(offset),
sizeof(_CharType));
704 case std::ios_base::cur:
705 return (pos_type)_seekwrpos_fsb(m_info,
size_t(m_info->m_wrpos+offset),
sizeof(_CharType));
706 case std::ios_base::end:
707 return (pos_type)_seekwrpos_fsb(m_info,
size_t(-1),
sizeof(_CharType));
713 return (pos_type)traits::eof();
721 return flush_internal().then([](){
return true;});
725 template<
typename _CharType1>
friend class ::concurrency::streams::file_buffer;
730 auto callback = utility::details::make_unique<_filestream_callback_write_b>(m_info, result_tce);
732 if ( !_sync_fsb(m_info, callback.get()) )
734 return pplx::task_from_exception<void>(std::runtime_error(
"failure to flush stream"));
737 return pplx::create_task(result_tce);
740 basic_file_buffer(_In_ _file_info *info) : streambuf_state_manager<_CharType>(info->m_mode), m_info(info) { }
742 #if !defined(__cplusplus_winrt)
744 const utility::string_t &_Filename,
745 std::ios_base::openmode _Mode = std::ios_base::out,
747 int _Prot = (
int)std::ios_base::_Openprot
754 auto callback =
new _filestream_callback_open(result_tce);
755 _open_fsb_str(callback, _Filename.c_str(), _Mode, _Prot);
756 return pplx::create_task(result_tce);
761 ::Windows::Storage::StorageFile^ file,
762 std::ios_base::openmode _Mode = std::ios_base::out)
765 auto callback =
new _filestream_callback_open(result_tce);
766 _open_fsb_stf_str(callback, file, _Mode, 0);
767 return pplx::create_task(result_tce);
771 class _filestream_callback_open :
public details::_filestream_callback
776 virtual void on_opened(_In_ _file_info *info)
778 m_op.set(std::shared_ptr<basic_file_buffer<_CharType>>(
new basic_file_buffer<_CharType>(info)));
782 virtual void on_error(
const std::exception_ptr & e)
784 m_op.set_exception(e);
792 class _filestream_callback_close :
public details::_filestream_callback
797 virtual void on_closed()
803 virtual void on_error(
const std::exception_ptr & e)
805 m_op.set_exception(e);
813 template<
typename ResultType>
814 class _filestream_callback_write :
public details::_filestream_callback
819 virtual void on_completed(
size_t result)
821 m_op.set((ResultType)result /
sizeof(_CharType));
825 virtual void on_error(
const std::exception_ptr & e)
827 m_op.set_exception(e);
836 class _filestream_callback_write_b :
public details::_filestream_callback
841 virtual void on_completed(
size_t)
847 virtual void on_error(
const std::exception_ptr & e)
849 m_op.set_exception(e);
859 class _filestream_callback_read :
public details::_filestream_callback
864 virtual void on_completed(
size_t result)
866 result = result /
sizeof(_CharType);
867 m_info->m_rdpos += result;
872 virtual void on_error(
const std::exception_ptr & e)
874 m_op.set_exception(e);
883 class _filestream_callback_bumpc :
public details::_filestream_callback
888 virtual void on_completed(
size_t result)
890 if ( result ==
sizeof(_CharType) )
892 m_info->m_rdpos += 1;
897 m_op.set(traits::eof());
902 virtual void on_error(
const std::exception_ptr & e)
904 m_op.set_exception(e);
915 class _filestream_callback_getc :
public details::_filestream_callback
920 virtual void on_completed(
size_t result)
922 if ( result ==
sizeof(_CharType) )
928 m_op.set(traits::eof());
935 virtual void on_error(
const std::exception_ptr & e)
937 m_op.set_exception(e);
947 async_operation_queue m_readOps;
958 template<
typename _CharType>
962 #if !defined(__cplusplus_winrt)
971 const utility::string_t &file_name,
972 std::ios_base::openmode mode = std::ios_base::out,
974 int prot = _SH_DENYRD
980 auto bfb = details::basic_file_buffer<_CharType>::open(file_name, mode, prot);
981 return bfb.then([](
pplx::task<std::shared_ptr<details::basic_streambuf<_CharType>>> op) -> streambuf<_CharType>
983 return streambuf<_CharType>(op.get());
996 ::Windows::Storage::StorageFile^ file,
997 std::ios_base::openmode mode = std::ios_base::out)
999 auto bfb = details::basic_file_buffer<_CharType>::open(file, mode);
1000 return bfb.then([](
pplx::task<std::shared_ptr<details::basic_streambuf<_CharType>>> op) -> streambuf<_CharType>
1002 return streambuf<_CharType>(op.get());
1015 template<
typename _CharType>
1020 #if !defined(__cplusplus_winrt)
1030 const utility::string_t &file_name,
1031 std::ios_base::openmode mode = std::ios_base::in,
1033 int prot = (
int) std::ios_base::_Openprot
1039 mode |= std::ios_base::in;
1057 const utility::string_t &file_name,
1058 std::ios_base::openmode mode = std::ios_base::out,
1060 int prot = (
int) std::ios_base::_Openprot
1066 mode |= std::ios_base::out;
1082 ::Windows::Storage::StorageFile^ file,
1083 std::ios_base::openmode mode = std::ios_base::in)
1085 mode |= std::ios_base::in;
1102 ::Windows::Storage::StorageFile^ file,
1103 std::ios_base::openmode mode = std::ios_base::out)
1105 mode |= std::ios_base::out;
1107 .then([](streams::streambuf<_CharType> buf) -> basic_ostream<_CharType>
1109 return basic_ostream<_CharType>(buf);
1115 typedef file_stream<uint8_t> fstream;
A generic RAII wrapper for locks that implements the critical_section interface cpprest_synchronizati...
Definition: pplxlinux.h:264
int_type _sgetc()
Reads a single byte from the stream without advancing the read position.
Definition: filestream.h:510
The task_completion_event class allows you to delay the execution of a task until a condition is sati...
Definition: pplxtasks.h:2934
virtual void set_buffer_size(size_t size, std::ios_base::openmode direction=std::ios_base::in)
Sets the stream buffer implementation to buffer or not buffer.
Definition: filestream.h:160
virtual utility::size64_t size() const
Gets the size of the stream, if known. Calls to has_size will determine whether the result of size ca...
Definition: filestream.h:132
virtual bool is_open() const
Checks if the stream buffer is open.
Definition: astreambuf.h:390
Reference-counted stream buffer.
Definition: astreambuf.h:804
virtual bool can_seek() const
can_seek is used to determine whether a stream buffer supports seeking.
Definition: filestream.h:125
virtual size_t buffer_size(std::ios_base::openmode direction=std::ios_base::in) const
Gets the stream buffer size, if one has been set.
Definition: filestream.h:145
Base interface for all asynchronous output streams.
Definition: astreambuf.h:792
virtual pplx::task< bool > _sync()
For output streams, flush any internally buffered data to the underlying medium.
Definition: filestream.h:719
virtual pplx::task< int_type > _nextc()
Advances the read position, then return the next character without advancing again.
Definition: filestream.h:530
virtual pplx::task< size_t > _putn(const _CharType *ptr, size_t count)
Writes a number of characters to the stream.
Definition: filestream.h:368
size_t _sgetn(_Out_writes_(count) _CharType *ptr, _In_ size_t count)
Reads up to a given number of characters from the stream.
Definition: filestream.h:605
virtual pplx::task< int_type > _bumpc()
Reads a single byte from the stream and advance the read position.
Definition: filestream.h:405
virtual bool can_read() const
can_read is used to determine whether a stream buffer will support read operations (get)...
Definition: astreambuf.h:373
pplx::task< int_type > _getc()
Reads a single byte from the stream without advancing the read position.
Definition: filestream.h:498
void _commit(size_t)
Submits a block already allocated by the stream buffer.
Definition: filestream.h:327
virtual pplx::task< int_type > _ungetc()
Retreats the read position, then return the current character without advancing.
Definition: filestream.h:544
virtual pplx::task< int_type > _putc(_CharType ch)
Writes a single byte to an output stream.
Definition: filestream.h:284
virtual pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: astreambuf.h:711
File stream class containing factory functions for file streams.
Definition: filestream.h:1016
virtual bool acquire(_Out_ _CharType *&ptr, _Out_ size_t &count)
Gets a pointer to the next already allocated contiguous block of data.
Definition: filestream.h:344
pplx::task< void > _close_read()
The real read head close operation, implementation should override it if there is any resource to be ...
Definition: filestream.h:226
static pplx::task< streambuf< _CharType > > open(const utility::string_t &file_name, std::ios_base::openmode mode=std::ios_base::out, int prot=0)
Open a new stream buffer representing the given file.
Definition: filestream.h:970
Private stream buffer implementation for file streams. The class itself should not be used in applica...
Definition: filestream.h:99
virtual pplx::task< size_t > _getn(_Out_writes_(count) _CharType *ptr, _In_ size_t count)
Reads up to a given number of characters from the stream.
Definition: filestream.h:560
virtual pplx::task< void > _close_write()
The real write head close operation, implementation should override it if there is any resource to be...
Definition: astreambuf.h:720
virtual size_t in_avail() const
For any input stream, in_avail returns the number of characters that are immediately available to be ...
Definition: filestream.h:178
virtual pos_type seekpos(pos_type pos, std::ios_base::openmode mode)
Seeks to the given position.
Definition: filestream.h:657
virtual bool can_write() const
can_write is used to determine whether a stream buffer will support write operations (put)...
Definition: astreambuf.h:381
virtual void release(_Out_writes_(count) _CharType *, _In_ size_t count)
Releases a block of data acquired using ::acquire method. This frees the stream buffer to de-allocate...
Definition: filestream.h:357
virtual bool has_size() const
has_size is used to determine whether a stream buffer supports size().
Definition: filestream.h:130
Definition: astreambuf.h:362
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:176
task_status wait() const
Waits for this task to reach a terminal state. It is possible for wait to execute the task inline...
Definition: pplxtasks.h:4426
Definition: filestream.h:52
virtual pos_type seekoff(off_type offset, std::ios_base::seekdir way, std::ios_base::openmode mode)
Seeks to a position given by a relative offset.
Definition: filestream.h:680
Base interface for all asynchronous input streams.
Definition: astreambuf.h:791
The Parallel Patterns Library (PPL) task class. A task object represents work that can be executed as...
Definition: pplxtasks.h:4173
virtual pos_type getpos(std::ios_base::openmode mode) const
Gets the current read or write position in the stream.
Definition: filestream.h:644
static pplx::task< streams::basic_ostream< _CharType > > open_ostream(const utility::string_t &file_name, std::ios_base::openmode mode=std::ios_base::out, int prot=0)
Open a new ouput stream representing the given file. If the file does not exist, it will be create un...
Definition: filestream.h:1056
virtual size_t _scopy(_CharType *, size_t)
Copies up to a given number of characters from the stream.
Definition: filestream.h:632
Definition: astreambuf.h:37
static pplx::task< streams::basic_istream< _CharType > > open_istream(const utility::string_t &file_name, std::ios_base::openmode mode=std::ios_base::in, int prot=0)
Open a new input stream representing the given file. The file should already exist on disk...
Definition: filestream.h:1029
pplx::task< void > _close_write()
The real write head close operation, implementation should override it if there is any resource to be...
Definition: filestream.h:247
Stream buffer for file streams.
Definition: filestream.h:48
virtual int_type _sbumpc()
Reads a single byte from the stream and advance the read position.
Definition: filestream.h:445
_CharType * _alloc(size_t)
Allocates a contiguous memory block and returns it.
Definition: filestream.h:318
Extending the standard char_traits type with one that adds values and types that are unique to "C++ R...
Definition: astreambuf.h:50
bool is_done() const
Determines if the task is completed.
Definition: pplxtasks.h:4454