19 #include <system_error>
22 #include <sys/types.h>
23 #include <sys/socket.h>
38 bufferReceiveAllowed =
false;
46 void ConnectionMock::release()
48 for(
auto i : receivedFds)
53 bufferReceiveAllowed =
false;
64 bufferReceiveAllowed =
false;
69 if ((!state) && receivedFds.size())
71 throw std::runtime_error(
"More buffer were received");
73 bufferReceiveAllowed = state;
76 ssize_t ConnectionMock::read(
void *
buffer,
size_t len)
85 char control[CMSG_SPACE(256 *
sizeof(
int))];
96 msgh.msg_control = control_un.control;
97 msgh.msg_controllen =
sizeof(control_un.control);
101 recvflag = MSG_CMSG_CLOEXEC;
106 auto size = recvmsg(fds[0], &msgh, recvflag);
112 for (
struct cmsghdr * cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(&msgh, cmsg))
114 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS)
117 while(cmsg->cmsg_len >= CMSG_LEN((fdCount + 1) *
sizeof(
int)))
121 int * fds = (
int*)CMSG_DATA(cmsg);
122 for(
int i = 0; i < fdCount; ++i)
124 if (!bufferReceiveAllowed)
126 pendingData = std::string((
char*)
buffer, size);
127 throw std::runtime_error(
"Received unexpected buffer");
130 fcntl(fds[i], F_SETFD, FD_CLOEXEC);
132 receivedFds.push_back(fds[i]);
141 if (receivedFds.empty())
143 throw std::runtime_error(
"Buffer not received");
145 int fd = receivedFds.front();
146 receivedFds.pop_front();
152 ssize_t l = str.size();
160 ssize_t rd = read(in, left);
163 throw std::runtime_error(
"Input closed while expecting " + str);
168 throw std::system_error(e, std::generic_category(),
"Read failed while expecting " + str);
173 if (strncmp(str.c_str(), buff, l))
175 throw std::runtime_error(
"Received unexpected content while expecting " + str +
": " + std::string(buff, l));
179 char ConnectionMock::readChar(
const std::string &expected)
182 ssize_t rd = read(buff, 1);
185 throw std::runtime_error(
"Input closed while expecting " + expected);
190 throw std::system_error(e, std::generic_category(),
"Read failed while expecting " + expected);
200 auto lambda = [&pos, &str]()->
char
205 while((
unsigned)pos < str.length() && str[pos] ==
'\n') pos++;
206 if ((
unsigned)pos != str.length())
208 throw std::runtime_error(
"Expected string contains garbage: " + str);
214 std::string ConnectionMock::receiveMore()
217 auto currentFlag = fcntl(fds[0], F_GETFL, 0);
218 if (! (currentFlag & O_NONBLOCK))
220 fcntl(fds[0], F_SETFL, currentFlag | O_NONBLOCK);
224 auto r = read(
buffer, 256);
225 if (! (currentFlag & O_NONBLOCK))
227 auto errno_cpy =
errno;
228 fcntl(fds[0], F_SETFL, currentFlag);
234 return pendingData + std::string(
buffer, r);
236 perror(
"receiveMore");
245 std::string received;
246 auto readchar = [
this, expected, &received]()->
char
248 char c = readChar(expected);
255 if (fragment != expectedCanonical)
257 fprintf(stderr,
"canonicalized as %s\n", fragment.c_str());
258 throw std::runtime_error(
"xml fragment does not match");
261 catch(std::runtime_error &e)
263 received += receiveMore();
264 throw std::runtime_error(std::string(e.what()) +
"\nexpected: " + expected +
"\nReceived: " + received);
270 ssize_t l = str.size();
271 ssize_t wr = write(fds[1], str.c_str(), l);
275 throw std::system_error(e, std::generic_category(),
"Write failed while sending " + str);
279 throw std::runtime_error(
"Input closed while sending " + str);
285 if (fds[0] != fds[1]) {
286 if (rd && fds[0] != -1) {
287 if (::
shutdown(fds[0], SHUT_RD) == -1) {
288 perror(
"shutdown read fd");
291 if (wr && fds[1] != -1) {
292 if (::
shutdown(fds[1], SHUT_WR) == -1) {
293 perror(
"shutdown write fd");
297 if (!(rd || wr))
return;
299 int how = (rd && wr) ? SHUT_RDWR : rd ? SHUT_RD : SHUT_WR;
300 if (::
shutdown(fds[0], how) == -1) {
302 perror(
"shutdown rdwr");
304 perror(
"shutdown rd");
306 perror(
"shutdown wr");
315 while(buffers[fdCount]) fdCount++;
325 throw std::runtime_error(
"Can't attach buffer to empty message");
331 struct cmsghdr * cmsgh;
334 cmsghdrlength = CMSG_SPACE((fdCount *
sizeof(
int)));
335 cmsgh = (
struct cmsghdr*)malloc(cmsghdrlength);
336 memset(cmsgh, 0, cmsghdrlength);
339 cmsgh->cmsg_len = CMSG_LEN(
sizeof(
int));
340 cmsgh->cmsg_level = SOL_SOCKET;
341 cmsgh->cmsg_type = SCM_RIGHTS;
342 msgh.msg_control = cmsgh;
343 msgh.msg_controllen = cmsghdrlength;
344 for(
int i = 0; i < fdCount; ++i)
347 ((
int *) CMSG_DATA(CMSG_FIRSTHDR(&msgh)))[i] =
fd;
350 iov[0].iov_base = (
void*)str.data();
351 iov[0].iov_len = str.size();
354 msgh.msg_name = NULL;
355 msgh.msg_namelen = 0;
360 int ret = sendmsg(fds[1], &msgh, 0);
364 throw std::system_error(e, std::generic_category(),
"Write with buffer failed for " + str);
367 if ((
unsigned)ret < str.size())
369 throw std::runtime_error(
"Input closed while buffer sending " + str);
377 bufferPtrs[1] =
nullptr;
378 send(str, bufferPtrs);
std::string parseXmlFragmentFromString(const std::string &str)
std::string parseXmlFragment(std::function< char()> readchar)
void allowBufferReceive(bool state)
void expectBuffer(SharedBuffer &fd)
void shutdown(bool rd, bool wr)
void expect(const std::string &content)
void send(const std::string &content)
void setFds(int rd, int wr)
void expectXml(const std::string &xml)
std::vector< uint8_t > buffer