monotone

monotone Mtn Source Tree

Root/netxx_pipe.cc

1// -*- mode: C++; c-file-style: "gnu"; indent-tabs-mode: nil -*-
2// copyright (C) 2005 Christof Petig <christof@petig-baender.de>
3// all rights reserved.
4// licensed to the public under the terms of the GNU GPL (>= 2)
5// see the file COPYING for details
6
7#include <netxx_pipe.hh>
8#include "sanity.hh"
9#include "platform.hh"
10#include <netxx/streamserver.h>
11
12Netxx::PipeStream::PipeStream(int _readfd, int _writefd)
13 : readfd(_readfd), writefd(_writefd), child()
14{}
15
16#ifndef __WIN32__
17#include <unistd.h>
18#include <fcntl.h>
19#include <sys/wait.h>
20#include <errno.h>
21
22// create pipes for stdio and fork subprocess,
23// returns -1 on error, 0 to child and PID to parent
24static pid_t
25pipe_and_fork(int *fd1,int *fd2)
26{
27 pid_t result=-1;
28 fd1[0]=-1;
29 fd1[1]=-1;
30 fd2[0]=-1;
31 fd2[1]=-1;
32 if (pipe(fd1))
33 return -1;
34 if (pipe(fd2))
35 {
36 close(fd1[0]);
37 close(fd1[1]);
38 return -1;
39 }
40 result=fork();
41 if (result<0)
42 {
43 close(fd1[0]);
44 close(fd1[1]);
45 close(fd2[0]);
46 close(fd2[1]);
47 return -1;
48 }
49 else if (!result)
50 { // fd1[1] for writing, fd2[0] for reading
51 close(fd1[0]);
52 close(fd2[1]);
53 if (dup2(fd2[0],0)!=0 || dup2(fd1[1],1)!=1)
54 {
55 perror("dup2");
56 exit(-1); // kill the useless child
57 }
58 close(fd1[1]);
59 close(fd2[0]);
60 }
61 else
62 { // fd1[0] for reading, fd2[1] for writing
63 close(fd1[1]);
64 close(fd2[0]);
65 }
66 return result;
67}
68#endif
69
70#ifdef WIN32
71#include <windows.h>
72#include <io.h>
73#include <fcntl.h>
74// error if function does not return the correct value
75// (unfortunately E has inverse logic)
76#define FAIL_IF(FUN,ARGS,CHECK) \
77 E(!((FUN ARGS) CHECK), F(#FUN " failed %d\n") % GetLastError())
78// error if condition is not met and GetLastError is not expected value
79#define FAIL_IF4(FUN,ARGS,CHECK,OK) \
80 E(!((FUN ARGS) CHECK && GetLastError()!=OK), F(#FUN " failed %d\n") % GetLastError())
81#endif
82
83Netxx::PipeStream::PipeStream (const std::string &cmd, const std::vector<std::string> &args)
84 : readfd(), writefd(), child()
85{
86 // unfortunately neither munge_argv_into_cmdline nor execvp
87 // do take a vector<string> as argument
88 const unsigned newsize=64;
89 const char *newargv[newsize];
90 I(args.size()<(sizeof(newargv)/sizeof(newargv[0])));
91 unsigned newargc=0;
92 newargv[newargc++]=cmd.c_str();
93 for (std::vector<std::string>::const_iterator i=args.begin();i!=args.end();++i)
94 newargv[newargc++]=i->c_str();
95 newargv[newargc]=0;
96#ifdef WIN32
97
98 // mark some files as inheritable by the child
99 SECURITY_ATTRIBUTES inherit;
100 memset(&inherit,0,sizeof inherit);
101 inherit.nLength=sizeof inherit;
102 inherit.bInheritHandle = TRUE;
103
104 HANDLE hStdinR=0, hStdinW=0;
105 FAIL_IF(CreatePipe,(&hStdinR,&hStdinW,&inherit,sizeof readbuf),==0);
106 char pipename[256];
107 static int serial;
108 // yes, since we have to use named pipes because of nonblocking read
109 // (so called overlapped I/O) we could as well use one bidirectional
110 // pipe. I prefer two pipes to resemble the unix case.
111 snprintf(pipename,sizeof pipename,"\\\\.\\pipe\\netxx_pipe_%ld_%d",
112GetCurrentProcessId(),++serial);
113 HANDLE hStdoutR=0,hStdoutW=0;
114 FAIL_IF(hStdoutR=CreateNamedPipe,(pipename,
115 PIPE_ACCESS_INBOUND|FILE_FLAG_OVERLAPPED,
116 PIPE_TYPE_BYTE | PIPE_WAIT,
117 1,sizeof readbuf,sizeof readbuf,1000,0),
118 ==INVALID_HANDLE_VALUE);
119 FAIL_IF(hStdoutW=CreateFile,(pipename,GENERIC_WRITE,0,&inherit,OPEN_EXISTING,
120FILE_ATTRIBUTE_NORMAL,0),==INVALID_HANDLE_VALUE);
121
122 // mark this file handle as not inheritable
123 SetHandleInformation( hStdinW, HANDLE_FLAG_INHERIT, 0);
124 // set up the child with the pipes as stdin/stdout and inheriting stderr
125 PROCESS_INFORMATION piProcInfo;
126 STARTUPINFO siStartInfo;
127 memset(&piProcInfo,0,sizeof piProcInfo);
128 memset(&siStartInfo,0,sizeof siStartInfo);
129 siStartInfo.cb = sizeof siStartInfo;
130 siStartInfo.hStdError = (HANDLE)_get_osfhandle(2);
131 siStartInfo.hStdOutput = hStdoutW;
132 siStartInfo.hStdInput = hStdinR;
133 siStartInfo.dwFlags |= STARTF_USESTDHANDLES;
134 std::string cmdline=munge_argv_into_cmdline(newargv);
135 L(F("cmdline '%s'\n") % cmdline);
136 FAIL_IF(CreateProcess,(0,const_cast<CHAR*>(cmdline.c_str()),
137 0,0,TRUE,0,0,0,&siStartInfo,&piProcInfo),==0);
138 child=long(piProcInfo.hProcess);
139 // create normal file descriptors for these handles
140 // MinGW defines this function to take a long, not a HANDLE :-(
141 FAIL_IF(readfd=_open_osfhandle,(long(hStdoutR),O_BINARY|O_RDONLY),==-1);
142 FAIL_IF(writefd=_open_osfhandle,(long(hStdinW),O_BINARY|O_WRONLY),==-1);
143
144 // create infrastructure for overlapping I/O
145 memset(&overlap,0,sizeof overlap);
146 overlap.hEvent=CreateEvent(0,TRUE,TRUE,0);
147 bytes_available=0;
148 I(overlap.hEvent!=0);
149#else
150
151 int fd1[2],fd2[2];
152 child=pipe_and_fork(fd1,fd2);
153 E(child>=0, F("pipe/fork failed %s") % strerror(errno));
154 if (!child)
155 {
156 execvp(newargv[0],const_cast<char*const*>(newargv));
157 perror(newargv[0]);
158 exit(errno);
159 }
160 readfd=fd1[0];
161 writefd=fd2[1];
162 fcntl(readfd,F_SETFL,fcntl(readfd,F_GETFL)|O_NONBLOCK);
163#endif
164}
165
166// non blocking read
167Netxx::signed_size_type
168Netxx::PipeStream::read (void *buffer, size_type length)
169{
170#ifdef WIN32
171 if (length>bytes_available)
172 length=bytes_available;
173 if (length)
174 {
175 memcpy(buffer,readbuf,length);
176 if (length<bytes_available)
177 memmove(readbuf,readbuf+length,bytes_available-length);
178 bytes_available-=length;
179 }
180 return length;
181#else
182
183 return ::read(readfd,buffer,length);
184#endif
185}
186
187Netxx::signed_size_type
188Netxx::PipeStream::write(const void *buffer, size_type length)
189{
190 return ::write(writefd,buffer,length);
191}
192
193void
194Netxx::PipeStream::close (void)
195{
196 ::close(readfd);
197 ::close(writefd);
198 // wait for Process to end
199#ifdef WIN32
200
201 WaitForSingleObject((HANDLE)child, INFINITE);
202#else
203
204 if (child)
205 waitpid(child,0,0);
206#endif
207}
208
209Netxx::socket_type
210Netxx::PipeStream::get_socketfd (void) const
211{
212 return Netxx::socket_type(-1);
213}
214
215const Netxx::ProbeInfo*
216Netxx::PipeStream::get_probe_info (void) const
217{
218 return 0;
219}
220
221#ifdef WIN32
222
223// to emulate the semantics of the select call we wait up to timeout for the
224// first byte and ask for more bytes with no timeout
225// perhaps there is a more efficient/less complicated way (tell me if you know)
226Netxx::Probe::result_type
227Netxx::PipeCompatibleProbe::ready(const Timeout &timeout, ready_type rt)
228{
229 if (!is_pipe)
230 return Probe::ready(timeout,rt);
231 if (rt==ready_none)
232 rt=ready_t; // remembered from add
233 if (rt&ready_write)
234 return std::make_pair(pipe->get_writefd(),ready_write);
235 if (rt&ready_read)
236 {
237 if (pipe->bytes_available)
238 return std::make_pair(pipe->get_readfd(),ready_read);
239
240 HANDLE h_read=(HANDLE)_get_osfhandle(pipe->get_readfd());
241 DWORD bytes_read=0;
242 // ask for the first byte
243 FAIL_IF4( ReadFile,(h_read,pipe->readbuf,1,&bytes_read,&pipe->overlap),==0,ERROR_IO_PENDING);
244 if (!bytes_read)
245 {
246 // wait with timeout for the first byte
247 int milliseconds=timeout.get_sec()*1000+timeout.get_usec()/1000;
248 L(F("WaitForSingleObject(,%d)\n") % milliseconds);
249 FAIL_IF( WaitForSingleObject,(pipe->overlap.hEvent,milliseconds),==WAIT_FAILED);
250 FAIL_IF4( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0,ERROR_IO_INCOMPLETE);
251 L(F("GetOverlappedResult(,,%d,)\n") % bytes_read);
252 if (!bytes_read)
253 {
254 FAIL_IF( CancelIo,(h_read),==0);
255 return std::make_pair(socket_type(-1),ready_none);
256 }
257 }
258 I(bytes_read==1);
259 pipe->bytes_available=bytes_read;
260 // ask for more bytes but do _not_ wait
261 FAIL_IF4( ReadFile,(h_read,pipe->readbuf+1,sizeof pipe->readbuf-1,&bytes_read,&pipe->overlap),==0,ERROR_IO_PENDING);
262 FAIL_IF( CancelIo,(h_read),==0);
263 if (!bytes_read)
264 {
265 FAIL_IF( GetOverlappedResult,(h_read,&pipe->overlap,&bytes_read,FALSE),==0);
266 I(!bytes_read);
267 }
268 else
269 {
270 // do we need to call and add GetOverlappedResult here?
271 pipe->bytes_available+=bytes_read;
272 }
273 L(F("%d bytes available\n") % pipe->bytes_available);
274 return std::make_pair(pipe->get_readfd(),ready_read);
275 }
276 return std::make_pair(socket_type(-1),ready_none);
277}
278
279void
280Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
281 {
282 assert(!is_pipe);
283 assert(!pipe);
284 is_pipe=true;
285 pipe=&ps;
286 ready_t=rt;
287 }
288
289void
290Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
291 {
292 try
293 {
294 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
295 }
296 catch (...)
297 {
298 assert(!is_pipe);
299 Probe::add(sb,rt);
300 }
301 }
302
303void
304Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
305 {
306 assert(!ip_pipe);
307 Probe::add(ss,rt);
308 }
309#else // unix
310void
311Netxx::PipeCompatibleProbe::add(PipeStream &ps, ready_type rt)
312 {
313 if (rt==ready_none || rt&ready_read)
314 add_socket(ps.get_readfd(),ready_read);
315 if (rt==ready_none || rt&ready_write)
316 add_socket(ps.get_writefd(),ready_write);
317 }
318
319void
320Netxx::PipeCompatibleProbe::add(const StreamBase &sb, ready_type rt)
321 {
322 try
323 {
324 add(const_cast<PipeStream&>(dynamic_cast<const PipeStream&>(sb)),rt);
325 }
326 catch (...)
327 {
328 Probe::add(sb,rt);
329 }
330 }
331
332void
333Netxx::PipeCompatibleProbe::add(const StreamServer &ss, ready_type rt)
334 {
335 Probe::add(ss,rt);
336 }
337#endif
338
339#ifdef BUILD_UNIT_TESTS
340#include "unit_tests.hh"
341
342static void
343simple_pipe_test()
344{ try
345 {
346 Netxx::PipeStream pipe("cat",std::vector<std::string>());
347
348 std::string result;
349 Netxx::PipeCompatibleProbe probe;
350 Netxx::Timeout timeout(2L), short_time(0,1000);
351
352 // time out because no data is available
353 probe.clear();
354 probe.add(pipe, Netxx::Probe::ready_read);
355 Netxx::Probe::result_type res = probe.ready(short_time);
356 I(res.second==Netxx::Probe::ready_none);
357
358 // write should be possible
359 probe.clear();
360 probe.add(pipe, Netxx::Probe::ready_write);
361 res = probe.ready(short_time);
362 I(res.second&Netxx::Probe::ready_write);
363 I(res.first==pipe.get_writefd());
364
365 // try binary transparency
366 for (int c=0; c<256; ++c)
367 {
368 char buf[1024];
369 buf[0]=c;
370 buf[1]=255-c;
371 pipe.write(buf,2);
372
373 std::string result;
374 while (result.size()<2)
375 { // wait for data to arrive
376 probe.clear();
377 probe.add(pipe, Netxx::Probe::ready_read);
378 res = probe.ready(timeout);
379 E(res.second&Netxx::Probe::ready_read, F("timeout reading data %d") % c);
380 I(res.first==pipe.get_readfd());
381 int bytes=pipe.read(buf,sizeof buf);
382 result+=std::string(buf,bytes);
383 }
384 I(result.size()==2);
385 I((unsigned char)(result[0])==c);
386 I((unsigned char)(result[1])==255-c);
387 }
388 pipe.close();
389 } catch (informative_failure &e) // for some reason boost does not provide
390// enough information
391 { W(F("Failure %s\n") % e.what);
392 throw;
393 }
394}
395
396void
397add_pipe_tests(test_suite * suite)
398{
399 I(suite);
400 suite->add(BOOST_TEST_CASE(&simple_pipe_test));
401}
402#endif

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status