monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2// 2008 Stephen Leake <stephen_leake@stephe-leake.org>
3//
4// This program is made available under the GNU GPL version 2.0 or
5// greater. See the accompanying file COPYING for details.
6//
7// This program is distributed WITHOUT ANY WARRANTY; without even the
8// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
9// PURPOSE.
10
11#include "base.hh"
12#include "netsync.hh"
13
14#include <queue>
15
16#include "netxx/sockopt.h"
17#include "netxx/stream.h"
18
19#include "app_state.hh"
20#include "database.hh"
21#include "lua.hh"
22#include "network/automate_session.hh"
23#include "network/connection_info.hh"
24#include "network/listener.hh"
25#include "network/netsync_session.hh"
26#include "network/reactor.hh"
27#include "network/session.hh"
28#include "options.hh"
29#include "platform.hh"
30#include "project.hh"
31
32using std::deque;
33using std::string;
34
35using boost::lexical_cast;
36using boost::shared_ptr;
37
38deque<server_initiated_sync_request> server_initiated_sync_requests;
39LUAEXT(server_request_sync, )
40{
41 char const * w = luaL_checkstring(LS, 1);
42 char const * a = luaL_checkstring(LS, 2);
43 char const * i = luaL_checkstring(LS, 3);
44 char const * e = luaL_checkstring(LS, 4);
45
46 server_initiated_sync_request request;
47 request.address = string(a);
48 request.include = string(i);
49 request.exclude = string(e);
50
51 request.role = source_and_sink_role;
52 string what(w);
53 if (what == "sync")
54 request.role = source_and_sink_role;
55 else if (what == "push")
56 request.role = source_role;
57 else if (what == "pull")
58 request.role = sink_role;
59
60 server_initiated_sync_requests.push_back(request);
61 return 0;
62}
63
64
65static shared_ptr<Netxx::StreamBase>
66build_stream_to_server(options & opts, lua_hooks & lua,
67 shared_conn_info const & info,
68 Netxx::Timeout timeout)
69{
70 shared_ptr<Netxx::StreamBase> server;
71
72 if (info->client.get_use_argv())
73 {
74 vector<string> args = info->client.get_argv();
75 I(!args.empty());
76 string cmd = args[0];
77 args.erase(args.begin());
78 return shared_ptr<Netxx::StreamBase>
79 (new Netxx::PipeStream(cmd, args));
80 }
81 else
82 {
83#ifdef USE_IPV6
84 bool use_ipv6=true;
85#else
86 bool use_ipv6=false;
87#endif
88 string host(info->client.get_uri().host);
89 I(!host.empty());
90 Netxx::Address addr(host.c_str(),
91 info->client.get_port(),
92 use_ipv6);
93 return shared_ptr<Netxx::StreamBase>
94 (new Netxx::Stream(addr, timeout));
95 }
96}
97
98static void
99call_server(app_state & app,
100 project_t & project,
101 key_store & keys,
102 protocol_role role,
103 shared_conn_info const & info,
104 shared_conn_counts const & counts)
105{
106 Netxx::PipeCompatibleProbe probe;
107 transaction_guard guard(project.db);
108
109 Netxx::Timeout timeout(static_cast<long>(constants::netsync_timeout_seconds)),
110 instant(0,1);
111
112 P(F("connecting to %s") % info->client.get_uri().resource());
113
114 shared_ptr<Netxx::StreamBase> server
115 = build_stream_to_server(app.opts, app.lua, info, timeout);
116
117 // 'false' here means not to revert changes when the SockOpt
118 // goes out of scope.
119 Netxx::SockOpt socket_options(server->get_socketfd(), false);
120 socket_options.set_non_blocking();
121
122 shared_ptr<session> sess(new session(app, project, keys,
123 client_voice,
124 info->client.get_uri().resource(), server));
125 shared_ptr<wrapped_session> wrapped;
126 switch (info->client.get_connection_type())
127 {
128 case netsync_connection:
129 wrapped.reset(new netsync_session(sess.get(),
130 app.opts, app.lua, project,
131 keys, role,
132 info->client.get_include_pattern(),
133 info->client.get_exclude_pattern(),
134 counts));
135 break;
136 case automate_connection:
137 wrapped.reset(new automate_session(app, sess.get(),
138 &info->client.get_input_stream(),
139 &info->client.get_output_stream()));
140 break;
141 }
142 sess->set_inner(wrapped);
143
144 reactor react;
145 react.add(sess, guard);
146
147 while (true)
148 {
149 react.ready(guard);
150
151 if (react.size() == 0)
152 {
153 // Commit whatever work we managed to accomplish anyways.
154 guard.commit();
155
156 // We failed during processing. This should only happen in
157 // client voice when we have a decode exception, or received an
158 // error from our server (which is translated to a decode
159 // exception). We call these cases E() errors.
160 E(false, origin::network,
161 F("processing failure while talking to peer %s, disconnecting")
162 % sess->get_peer());
163 return;
164 }
165
166 bool io_ok = react.do_io();
167
168 E(io_ok, origin::network,
169 F("timed out waiting for I/O with peer %s, disconnecting")
170 % sess->get_peer());
171
172 if (react.size() == 0)
173 {
174 // Commit whatever work we managed to accomplish anyways.
175 guard.commit();
176
177 // ensure that the tickers have finished and write any last ticks
178 ui.ensure_clean_line();
179
180 // We had an I/O error. We must decide if this represents a
181 // user-reported error or a clean disconnect. See protocol
182 // state diagram in session::process_bye_cmd.
183
184 if (sess->protocol_state == session_base::confirmed_state)
185 {
186 P(F("successful exchange with %s")
187 % sess->get_peer());
188 return;
189 }
190 else if (sess->encountered_error)
191 {
192 P(F("peer %s disconnected after we informed them of error")
193 % sess->get_peer());
194 return;
195 }
196 else
197 E(false, origin::network,
198 (F("I/O failure while talking to peer %s, disconnecting")
199 % sess->get_peer()));
200 }
201 }
202}
203
204static shared_ptr<session>
205session_from_server_sync_item(app_state & app,
206 project_t & project,
207 key_store & keys,
208 server_initiated_sync_request const & request)
209{
210 shared_conn_info info;
211 netsync_connection_info::setup_from_sync_request(app.opts, project.db,
212 app.lua, request,
213 info);
214
215 try
216 {
217 P(F("connecting to %s") % info->client.get_uri().resource());
218 shared_ptr<Netxx::StreamBase> server
219 = build_stream_to_server(app.opts, app.lua, info,
220 Netxx::Timeout(constants::netsync_timeout_seconds));
221
222 // 'false' here means not to revert changes when
223 // the SockOpt goes out of scope.
224 Netxx::SockOpt socket_options(server->get_socketfd(), false);
225 socket_options.set_non_blocking();
226
227 shared_ptr<session>
228 sess(new session(app, project, keys,
229 client_voice,
230 info->client.get_uri().resource(), server));
231 shared_ptr<wrapped_session>
232 wrapped(new netsync_session(sess.get(),
233 app.opts, app.lua, project,
234 keys, request.role,
235 info->client.get_include_pattern(),
236 info->client.get_exclude_pattern(),
237 connection_counts::create(),
238 true));
239 sess->set_inner(wrapped);
240 return sess;
241 }
242 catch (Netxx::NetworkException & e)
243 {
244 P(F("Network error: %s") % e.what());
245 return shared_ptr<session>();
246 }
247}
248
249enum listener_status { listener_listening, listener_not_listening };
250listener_status desired_listener_status;
251LUAEXT(server_set_listening, )
252{
253 if (lua_isboolean(LS, 1))
254 {
255 bool want_listen = lua_toboolean(LS, 1);
256 if (want_listen)
257 desired_listener_status = listener_listening;
258 else
259 desired_listener_status = listener_not_listening;
260 return 0;
261 }
262 else
263 {
264 return luaL_error(LS, "bad argument (not a boolean)");
265 }
266}
267
268static void
269serve_connections(app_state & app,
270 options & opts,
271 lua_hooks & lua,
272 project_t & project,
273 key_store & keys,
274 protocol_role role,
275 std::vector<utf8> const & addresses)
276{
277#ifdef USE_IPV6
278 bool use_ipv6=true;
279#else
280 bool use_ipv6=false;
281#endif
282
283 shared_ptr<transaction_guard> guard(new transaction_guard(project.db));
284
285 reactor react;
286 shared_ptr<listener> listen(new listener(app, project, keys,
287 react, role, addresses,
288 guard, use_ipv6));
289 react.add(listen, *guard);
290 desired_listener_status = listener_listening;
291 listener_status actual_listener_status = listener_listening;
292
293 while (true)
294 {
295 if (!guard)
296 guard = shared_ptr<transaction_guard>
297 (new transaction_guard(project.db));
298 I(guard);
299
300 react.ready(*guard);
301
302 while (!server_initiated_sync_requests.empty())
303 {
304 server_initiated_sync_request request
305 = server_initiated_sync_requests.front();
306 server_initiated_sync_requests.pop_front();
307 shared_ptr<session> sess
308 = session_from_server_sync_item(app, project, keys,
309 request);
310
311 if (sess)
312 {
313 react.add(sess, *guard);
314 L(FL("Opened connection to %s") % sess->get_peer());
315 }
316 }
317
318 if (desired_listener_status != actual_listener_status)
319 {
320 switch (desired_listener_status)
321 {
322 case listener_listening:
323 react.add(listen, *guard);
324 actual_listener_status = listener_listening;
325 break;
326 case listener_not_listening:
327 react.remove(listen);
328 actual_listener_status = listener_not_listening;
329 break;
330 }
331 }
332 if (!react.size())
333 break;
334
335 react.do_io();
336
337 react.prune();
338
339 int num_sessions;
340 if (actual_listener_status == listener_listening)
341 num_sessions = react.size() - 1;
342 else
343 num_sessions = react.size();
344 if (num_sessions == 0)
345 {
346 // Let the guard die completely if everything's gone quiet.
347 guard->commit();
348 guard.reset();
349 }
350 }
351}
352
353static void
354serve_single_connection(project_t & project,
355 shared_ptr<session> sess)
356{
357 sess->begin_service();
358 P(F("beginning service on %s") % sess->get_peer());
359
360 transaction_guard guard(project.db);
361
362 reactor react;
363 react.add(sess, guard);
364
365 while (react.size() > 0)
366 {
367 react.ready(guard);
368 react.do_io();
369 react.prune();
370 }
371 guard.commit();
372}
373
374
375
376
377void
378run_netsync_protocol(app_state & app,
379 options & opts, lua_hooks & lua,
380 project_t & project, key_store & keys,
381 protocol_voice voice,
382 protocol_role role,
383 shared_conn_info & info,
384 shared_conn_counts const & counts)
385{
386 // We do not want to be killed by SIGPIPE from a network disconnect.
387 ignore_sigpipe();
388
389 try
390 {
391 if (voice == server_voice)
392 {
393 if (opts.bind_stdio)
394 {
395 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
396
397 shared_ptr<session>
398 sess(new session(app, project, keys,
399 server_voice,
400 "stdio", str));
401 serve_single_connection(project, sess);
402 }
403 else
404 serve_connections(app, opts, lua, project, keys,
405 role, info->server.addrs);
406 }
407 else
408 {
409 I(voice == client_voice);
410 call_server(app, project, keys, role, info, counts);
411 info->client.set_connection_successful();
412 }
413 }
414 catch (Netxx::NetworkException & e)
415 {
416 throw recoverable_failure(origin::network,
417 (F("network error: %s") % e.what()).str());
418 }
419 catch (Netxx::Exception & e)
420 {
421 throw oops((F("network error: %s") % e.what()).str());;
422 }
423}
424
425// Local Variables:
426// mode: C++
427// fill-column: 76
428// c-file-style: "gnu"
429// indent-tabs-mode: nil
430// End:
431// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status