Wire Sysio Wire Sysion 1.0.0
Loading...
Searching...
No Matches
parsebyparts.cpp
Go to the documentation of this file.
1// Example of parsing JSON to document by parts.
2
3// Using C++11 threads
4// Temporarily disable for clang (older version) due to incompatibility with libstdc++
5#if (__cplusplus >= 201103L || (defined(_MSC_VER) && _MSC_VER >= 1700)) && !defined(__clang__)
6
9#include "rapidjson/writer.h"
11#include <condition_variable>
12#include <iostream>
13#include <mutex>
14#include <thread>
15
16using namespace rapidjson;
17
18template<unsigned parseFlags = kParseDefaultFlags>
19class AsyncDocumentParser {
20public:
21 AsyncDocumentParser(Document& d)
22 : stream_(*this)
23 , d_(d)
24 , parseThread_()
25 , mutex_()
26 , notEmpty_()
27 , finish_()
28 , completed_()
29 {
30 // Create and execute thread after all member variables are initialized.
31 parseThread_ = std::thread(&AsyncDocumentParser::Parse, this);
32 }
33
34 ~AsyncDocumentParser() {
35 if (!parseThread_.joinable())
36 return;
37
38 {
39 std::unique_lock<std::mutex> lock(mutex_);
40
41 // Wait until the buffer is read up (or parsing is completed)
42 while (!stream_.Empty() && !completed_)
43 finish_.wait(lock);
44
45 // Automatically append '\0' as the terminator in the stream.
46 static const char terminator[] = "";
47 stream_.src_ = terminator;
48 stream_.end_ = terminator + 1;
49 notEmpty_.notify_one(); // unblock the AsyncStringStream
50 }
51
52 parseThread_.join();
53 }
54
55 void ParsePart(const char* buffer, size_t length) {
56 std::unique_lock<std::mutex> lock(mutex_);
57
58 // Wait until the buffer is read up (or parsing is completed)
59 while (!stream_.Empty() && !completed_)
60 finish_.wait(lock);
61
62 // Stop further parsing if the parsing process is completed.
63 if (completed_)
64 return;
65
66 // Set the buffer to stream and unblock the AsyncStringStream
67 stream_.src_ = buffer;
68 stream_.end_ = buffer + length;
69 notEmpty_.notify_one();
70 }
71
72private:
73 void Parse() {
74 d_.ParseStream<parseFlags>(stream_);
75
76 // The stream may not be fully read, notify finish anyway to unblock ParsePart()
77 std::unique_lock<std::mutex> lock(mutex_);
78 completed_ = true; // Parsing process is completed
79 finish_.notify_one(); // Unblock ParsePart() or destructor if they are waiting.
80 }
81
82 struct AsyncStringStream {
83 typedef char Ch;
84
85 AsyncStringStream(AsyncDocumentParser& parser) : parser_(parser), src_(), end_(), count_() {}
86
87 char Peek() const {
88 std::unique_lock<std::mutex> lock(parser_.mutex_);
89
90 // If nothing in stream, block to wait.
91 while (Empty())
92 parser_.notEmpty_.wait(lock);
93
94 return *src_;
95 }
96
97 char Take() {
98 std::unique_lock<std::mutex> lock(parser_.mutex_);
99
100 // If nothing in stream, block to wait.
101 while (Empty())
102 parser_.notEmpty_.wait(lock);
103
104 count_++;
105 char c = *src_++;
106
107 // If all stream is read up, notify that the stream is finish.
108 if (Empty())
109 parser_.finish_.notify_one();
110
111 return c;
112 }
113
114 size_t Tell() const { return count_; }
115
116 // Not implemented
117 char* PutBegin() { return 0; }
118 void Put(char) {}
119 void Flush() {}
120 size_t PutEnd(char*) { return 0; }
121
122 bool Empty() const { return src_ == end_; }
123
124 AsyncDocumentParser& parser_;
125 const char* src_;
126 const char* end_;
127 size_t count_;
128 };
129
130 AsyncStringStream stream_;
131 Document& d_;
132 std::thread parseThread_;
133 std::mutex mutex_;
134 std::condition_variable notEmpty_;
135 std::condition_variable finish_;
136 bool completed_;
137};
138
139int main() {
140 Document d;
141
142 {
143 AsyncDocumentParser<> parser(d);
144
145 const char json1[] = " { \"hello\" : \"world\", \"t\" : tr";
146 //const char json1[] = " { \"hello\" : \"world\", \"t\" : trX"; // For test parsing error
147 const char json2[] = "ue, \"f\" : false, \"n\": null, \"i\":123, \"pi\": 3.14";
148 const char json3[] = "16, \"a\":[1, 2, 3, 4] } ";
149
150 parser.ParsePart(json1, sizeof(json1) - 1);
151 parser.ParsePart(json2, sizeof(json2) - 1);
152 parser.ParsePart(json3, sizeof(json3) - 1);
153 }
154
155 if (d.HasParseError()) {
156 std::cout << "Error at offset " << d.GetErrorOffset() << ": " << GetParseError_En(d.GetParseError()) << std::endl;
157 return EXIT_FAILURE;
158 }
159
160 // Stringify the JSON to cout
161 OStreamWrapper os(std::cout);
163 d.Accept(writer);
164 std::cout << std::endl;
165
166 return EXIT_SUCCESS;
167}
168
169#else // Not supporting C++11
170
171#include <iostream>
172int main() {
173 std::cout << "This example requires C++11 compiler" << std::endl;
174}
175
176#endif
mie::Vuint Put(const uint64_t *x, size_t n)
Definition bench.cpp:96
JSON writer.
Definition writer.h:89
os_t os
RAPIDJSON_NAMESPACE_BEGIN const RAPIDJSON_ERROR_CHARTYPE * GetParseError_En(ParseErrorCode parseErrorCode)
Maps error code of parsing into error message.
Definition en.h:36
#define Ch(x, y, z)
Definition hash_impl.h:17
main RapidJSON namespace
int main()
void lock()
CK_ULONG d