monotone

monotone Mtn Source Tree

Root/src/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2// 2008 Stephen Leake <stephen_leake@stephe-leake.org>
3//
4// This program is made available under the GNU GPL version 2.0 or
5// greater. See the accompanying file COPYING for details.
6//
7// This program is distributed WITHOUT ANY WARRANTY; without even the
8// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
9// PURPOSE.
10
11#include "base.hh"
12#include "netsync.hh"
13
14#include <queue>
15
16#include "netxx/sockopt.h"
17#include "netxx/stream.h"
18
19#include "app_state.hh"
20#include "database.hh"
21#include "lua.hh"
22#include "network/automate_session.hh"
23#include "network/connection_info.hh"
24#include "network/listener.hh"
25#include "network/netsync_session.hh"
26#include "network/reactor.hh"
27#include "network/session.hh"
28#include "options.hh"
29#include "platform.hh"
30#include "project.hh"
31
32using std::deque;
33using std::string;
34
35using boost::lexical_cast;
36using boost::shared_ptr;
37
38deque<server_initiated_sync_request> server_initiated_sync_requests;
39LUAEXT(server_request_sync, )
40{
41 char const * w = luaL_checkstring(LS, 1);
42 char const * a = luaL_checkstring(LS, 2);
43 char const * i = luaL_checkstring(LS, 3);
44 char const * e = luaL_checkstring(LS, 4);
45
46 server_initiated_sync_request request;
47 request.address = string(a);
48 request.include = string(i);
49 request.exclude = string(e);
50
51 request.role = source_and_sink_role;
52 string what(w);
53 if (what == "sync")
54 request.role = source_and_sink_role;
55 else if (what == "push")
56 request.role = source_role;
57 else if (what == "pull")
58 request.role = sink_role;
59
60 server_initiated_sync_requests.push_back(request);
61 return 0;
62}
63
64
65static shared_ptr<Netxx::StreamBase>
66build_stream_to_server(options & opts, lua_hooks & lua,
67 shared_conn_info const & info,
68 Netxx::Timeout timeout)
69{
70 shared_ptr<Netxx::StreamBase> server;
71
72 if (info->client.get_use_argv())
73 {
74 vector<string> args = info->client.get_argv();
75 I(!args.empty());
76 string cmd = args[0];
77 args.erase(args.begin());
78 return shared_ptr<Netxx::StreamBase>
79 (new Netxx::PipeStream(cmd, args));
80 }
81 else
82 {
83#ifdef USE_IPV6
84 bool use_ipv6=true;
85#else
86 bool use_ipv6=false;
87#endif
88 string host(info->client.get_uri().host);
89 I(!host.empty());
90 Netxx::Address addr(host.c_str(),
91 info->client.get_port(),
92 use_ipv6);
93 return shared_ptr<Netxx::StreamBase>
94 (new Netxx::Stream(addr, timeout));
95 }
96}
97
98static void
99call_server(app_state & app,
100 project_t & project,
101 key_store & keys,
102 protocol_role role,
103 shared_conn_info const & info,
104 shared_conn_counts const & counts)
105{
106 Netxx::PipeCompatibleProbe probe;
107 transaction_guard guard(project.db);
108
109 Netxx::Timeout timeout(static_cast<long>(constants::netsync_timeout_seconds)),
110 instant(0,1);
111
112 P(F("connecting to '%s'") % info->client.get_uri().resource());
113 P(F(" include pattern '%s'") % info->client.get_include_pattern());
114 P(F(" exclude pattern '%s'") % info->client.get_exclude_pattern());
115
116 shared_ptr<Netxx::StreamBase> server
117 = build_stream_to_server(app.opts, app.lua, info, timeout);
118
119 // 'false' here means not to revert changes when the SockOpt
120 // goes out of scope.
121 Netxx::SockOpt socket_options(server->get_socketfd(), false);
122 socket_options.set_non_blocking();
123
124 shared_ptr<session> sess(new session(app, project, keys,
125 client_voice,
126 info->client.get_uri().resource(), server));
127 shared_ptr<wrapped_session> wrapped;
128 switch (info->client.get_connection_type())
129 {
130 case netsync_connection:
131 wrapped.reset(new netsync_session(sess.get(),
132 app.opts, app.lua, project,
133 keys, role,
134 info->client.get_include_pattern(),
135 info->client.get_exclude_pattern(),
136 counts));
137 break;
138 case automate_connection:
139 wrapped.reset(new automate_session(app, sess.get(),
140 &info->client.get_input_stream(),
141 &info->client.get_output_stream()));
142 break;
143 }
144 sess->set_inner(wrapped);
145
146 reactor react;
147 react.add(sess, guard);
148
149 while (true)
150 {
151 react.ready(guard);
152
153 if (react.size() == 0)
154 {
155 // Commit whatever work we managed to accomplish anyways.
156 guard.commit();
157
158 // We failed during processing. This should only happen in
159 // client voice when we have a decode exception, or received an
160 // error from our server (which is translated to a decode
161 // exception). We call these cases E() errors.
162 E(false, origin::network,
163 F("processing failure while talking to peer '%s', disconnecting")
164 % sess->get_peer());
165 return;
166 }
167
168 bool io_ok = react.do_io();
169
170 E(io_ok, origin::network,
171 F("timed out waiting for I/O with peer '%s', disconnecting")
172 % sess->get_peer());
173
174 if (react.size() == 0)
175 {
176 // Commit whatever work we managed to accomplish anyways.
177 guard.commit();
178
179 // ensure that the tickers have finished and write any last ticks
180 ui.ensure_clean_line();
181
182 // We had an I/O error. We must decide if this represents a
183 // user-reported error or a clean disconnect. See protocol
184 // state diagram in session::process_bye_cmd.
185
186 if (sess->protocol_state == session_base::confirmed_state)
187 {
188 P(F("successful exchange with '%s'")
189 % sess->get_peer());
190 return;
191 }
192 else if (sess->encountered_error)
193 {
194 P(F("peer '%s' disconnected after we informed them of error")
195 % sess->get_peer());
196 return;
197 }
198 else
199 E(false, origin::network,
200 (F("I/O failure while talking to peer '%s', disconnecting")
201 % sess->get_peer()));
202 }
203 }
204}
205
206static shared_ptr<session>
207session_from_server_sync_item(app_state & app,
208 project_t & project,
209 key_store & keys,
210 server_initiated_sync_request const & request)
211{
212 shared_conn_info info;
213 netsync_connection_info::setup_from_sync_request(app.opts, project.db,
214 app.lua, request,
215 info);
216
217 try
218 {
219 P(F("connecting to '%s'") % info->client.get_uri().resource());
220 shared_ptr<Netxx::StreamBase> server
221 = build_stream_to_server(app.opts, app.lua, info,
222 Netxx::Timeout(constants::netsync_timeout_seconds));
223
224 // 'false' here means not to revert changes when
225 // the SockOpt goes out of scope.
226 Netxx::SockOpt socket_options(server->get_socketfd(), false);
227 socket_options.set_non_blocking();
228
229 shared_ptr<session>
230 sess(new session(app, project, keys,
231 client_voice,
232 info->client.get_uri().resource(), server));
233 shared_ptr<wrapped_session>
234 wrapped(new netsync_session(sess.get(),
235 app.opts, app.lua, project,
236 keys, request.role,
237 info->client.get_include_pattern(),
238 info->client.get_exclude_pattern(),
239 connection_counts::create(),
240 true));
241 sess->set_inner(wrapped);
242 return sess;
243 }
244 catch (Netxx::NetworkException & e)
245 {
246 P(F("network error: %s") % e.what());
247 return shared_ptr<session>();
248 }
249}
250
251enum listener_status { listener_listening, listener_not_listening };
252listener_status desired_listener_status;
253LUAEXT(server_set_listening, )
254{
255 if (lua_isboolean(LS, 1))
256 {
257 bool want_listen = lua_toboolean(LS, 1);
258 if (want_listen)
259 desired_listener_status = listener_listening;
260 else
261 desired_listener_status = listener_not_listening;
262 return 0;
263 }
264 else
265 {
266 return luaL_error(LS, "bad argument (not a boolean)");
267 }
268}
269
270static void
271serve_connections(app_state & app,
272 options & opts,
273 lua_hooks & lua,
274 project_t & project,
275 key_store & keys,
276 protocol_role role,
277 std::vector<utf8> const & addresses)
278{
279#ifdef USE_IPV6
280 bool use_ipv6=true;
281#else
282 bool use_ipv6=false;
283#endif
284
285 shared_ptr<transaction_guard> guard(new transaction_guard(project.db));
286
287 reactor react;
288 shared_ptr<listener> listen(new listener(app, project, keys,
289 react, role, addresses,
290 guard, use_ipv6));
291 react.add(listen, *guard);
292 desired_listener_status = listener_listening;
293 listener_status actual_listener_status = listener_listening;
294
295 while (true)
296 {
297 if (!guard)
298 guard = shared_ptr<transaction_guard>
299 (new transaction_guard(project.db));
300 I(guard);
301
302 react.ready(*guard);
303
304 while (!server_initiated_sync_requests.empty())
305 {
306 server_initiated_sync_request request
307 = server_initiated_sync_requests.front();
308 server_initiated_sync_requests.pop_front();
309 shared_ptr<session> sess
310 = session_from_server_sync_item(app, project, keys,
311 request);
312
313 if (sess)
314 {
315 react.add(sess, *guard);
316 L(FL("Opened connection to %s") % sess->get_peer());
317 }
318 }
319
320 if (desired_listener_status != actual_listener_status)
321 {
322 switch (desired_listener_status)
323 {
324 case listener_listening:
325 react.add(listen, *guard);
326 actual_listener_status = listener_listening;
327 break;
328 case listener_not_listening:
329 react.remove(listen);
330 actual_listener_status = listener_not_listening;
331 break;
332 }
333 }
334 if (!react.size())
335 break;
336
337 react.do_io();
338
339 react.prune();
340
341 int num_sessions;
342 if (actual_listener_status == listener_listening)
343 num_sessions = react.size() - 1;
344 else
345 num_sessions = react.size();
346 if (num_sessions == 0)
347 {
348 // Let the guard die completely if everything's gone quiet.
349 guard->commit();
350 guard.reset();
351 }
352 }
353}
354
355static void
356serve_single_connection(project_t & project,
357 shared_ptr<session> sess)
358{
359 sess->begin_service();
360 P(F("beginning service on '%s'") % sess->get_peer());
361
362 transaction_guard guard(project.db);
363
364 reactor react;
365 react.add(sess, guard);
366
367 while (react.size() > 0)
368 {
369 react.ready(guard);
370 react.do_io();
371 react.prune();
372 }
373 guard.commit();
374}
375
376
377
378
379void
380run_netsync_protocol(app_state & app,
381 options & opts, lua_hooks & lua,
382 project_t & project, key_store & keys,
383 protocol_voice voice,
384 protocol_role role,
385 shared_conn_info & info,
386 shared_conn_counts const & counts)
387{
388 // We do not want to be killed by SIGPIPE from a network disconnect.
389 ignore_sigpipe();
390
391 try
392 {
393 if (voice == server_voice)
394 {
395 if (opts.bind_stdio)
396 {
397 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
398
399 shared_ptr<session>
400 sess(new session(app, project, keys,
401 server_voice,
402 "stdio", str));
403 serve_single_connection(project, sess);
404 }
405 else
406 serve_connections(app, opts, lua, project, keys,
407 role, info->server.addrs);
408 }
409 else
410 {
411 I(voice == client_voice);
412 call_server(app, project, keys, role, info, counts);
413 info->client.set_connection_successful();
414 }
415 }
416 catch (Netxx::NetworkException & e)
417 {
418 throw recoverable_failure(origin::network,
419 (F("network error: %s") % e.what()).str());
420 }
421 catch (Netxx::Exception & e)
422 {
423 throw oops((F("network error: %s") % e.what()).str());;
424 }
425}
426
427// Local Variables:
428// mode: C++
429// fill-column: 76
430// c-file-style: "gnu"
431// indent-tabs-mode: nil
432// End:
433// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status