monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <map>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include "lexical_cast.hh"
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24
25#include "lua_hooks.hh"
26#include "key_store.hh"
27#include "project.hh"
28#include "database.hh"
29#include "cert.hh"
30#include "constants.hh"
31#include "enumerator.hh"
32#include "keys.hh"
33#include "lua.hh"
34#include "merkle_tree.hh"
35#include "netcmd.hh"
36#include "netio.hh"
37#include "numeric_vocab.hh"
38#include "refiner.hh"
39#include "revision.hh"
40#include "sanity.hh"
41#include "transforms.hh"
42#include "ui.hh"
43#include "xdelta.hh"
44#include "epoch.hh"
45#include "platform.hh"
46#include "hmac.hh"
47#include "globish.hh"
48#include "uri.hh"
49#include "options.hh"
50
51#include "botan/botan.h"
52
53#include "netxx/address.h"
54#include "netxx/peer.h"
55#include "netxx/probe.h"
56#include "netxx/socket.h"
57#include "netxx/sockopt.h"
58#include "netxx/stream.h"
59#include "netxx/streamserver.h"
60#include "netxx/timeout.h"
61#include "netxx_pipe.hh"
62// TODO: things to do that will break protocol compatibility
63// -- need some way to upgrade anonymous to keyed pull, without user having
64// to explicitly specify which they want
65// just having a way to respond "access denied, try again" might work
66// but perhaps better to have the anonymous command include a note "I
67// _could_ use key <...> if you prefer", and if that would lead to more
68// access, could reply "I do prefer". (Does this lead to too much
69// information exposure? Allows anonymous people to probe what branches
70// a key has access to.)
71// -- "warning" packet type?
72// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
73// access to all of it, you just get the parts you have access to
74// (maybe with warnings about skipped branches). to do this right,
75// should have a way for the server to send back to the client "right,
76// you're not getting the following branches: ...", so the client will
77// not include them in its merkle trie.
78// -- add some sort of vhost field to the client's first packet, saying who
79// they expect to talk to
80
81//
82// This is the "new" network synchronization (netsync) system in
83// monotone. It is based on synchronizing pairs of merkle trees over an
84// interactive connection.
85//
86// A netsync process between peers treats each peer as either a source, a
87// sink, or both. When a peer is only a source, it will not write any new
88// items to its database. when a peer is only a sink, it will not send any
89// items from its database. When a peer is both a source and sink, it may
90// send and write items freely.
91//
92// The post-state of a netsync is that each sink contains a superset of the
93// items in its corresponding source; when peers are behaving as both
94// source and sink, this means that the post-state of the sync is for the
95// peers to have identical item sets.
96//
97//
98// Data structure
99// --------------
100//
101// Each node in a merkle tree contains a fixed number of slots. this number
102// is derived from a global parameter of the protocol -- the tree fanout --
103// such that the number of slots is 2^fanout. For now we will assume that
104// fanout is 4 thus there are 16 slots in a node, because this makes
105// illustration easier. The other parameter of the protocol is the size of
106// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
107//
108// Each slot in a merkle tree node is in one of 3 states:
109//
110// - empty
111// - leaf
112// - subtree
113//
114// In addition, each leaf contains a hash code which identifies an element
115// of the set being synchronized. Each subtree slot contains a hash code of
116// the node immediately beneath it in the merkle tree. Empty slots contain
117// no hash codes.
118//
119// Since empty slots have no hash code, they are represented implicitly by
120// a bitmap at the head of each merkle tree node. As an additional
121// integrity check, each merkle tree node contains a label indicating its
122// prefix in the tree, and a hash of its own contents.
123//
124// In total, then, the byte-level representation of a <160,4> merkle tree
125// node is as follows:
126//
127// 20 bytes - hash of the remaining bytes in the node
128// 1 byte - type of this node (manifest, file, key, mcert, fcert)
129// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
130// 0-20 bytes - the prefix of this node, 4 bits * level,
131// rounded up to a byte
132// 1-N bytes - number of leaves under this node (uleb128)
133// 4 bytes - slot-state bitmap of the node
134// 0-320 bytes - between 0 and 16 live slots in the node
135//
136// So, in the worst case such a node is 367 bytes, with these parameters.
137//
138//
139// Protocol
140// --------
141//
142// The protocol is a binary command-packet system over TCP; each packet
143// consists of a single byte which identifies the protocol version, a byte
144// which identifies the command name inside that version, a size_t sent as
145// a uleb128 indicating the length of the packet, that many bytes of
146// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
147// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
148// 20-byte random key chosen by the client after authentication (discussed
149// below). Decoding involves simply buffering until a sufficient number of
150// bytes are received, then advancing the buffer pointer. Any time an
151// integrity check (the HMAC) fails, the protocol is assumed to have lost
152// synchronization, and the connection is dropped. The parties are free to
153// drop the TCP stream at any point, if too much data is received or too
154// much idle time passes; no commitments or transactions are made.
155//
156//
157// Authentication and setup
158// ------------------------
159//
160// The exchange begins in a non-authenticated state. The server sends a
161// "hello <id> <nonce>" command, which identifies the server's RSA key and
162// issues a nonce which must be used for a subsequent authentication.
163//
164// The client then responds with either:
165//
166// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
167// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
168// role it wishes to play in the synchronization, identifies the pattern it
169// wishes to sync with, signs the previous nonce with its own key, and informs
170// the server of the HMAC key it wishes to use for this session (encrypted
171// with the server's public key); or
172//
173// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
174// <hmac key>" command, which identifies the role it wishes to play in the
175// synchronization, the pattern it wishes to sync with, and the HMAC key it
176// wishes to use for this session (also encrypted with the server's public
177// key).
178//
179// The server then replies with a "confirm" command, which contains no
180// other data but will only have the correct HMAC integrity code if the
181// server received and properly decrypted the HMAC key offered by the
182// client. This transitions the peers into an authenticated state and
183// begins epoch refinement. If epoch refinement and epoch transmission
184// succeed, the peers switch to data refinement and data transmission.
185//
186//
187// Refinement
188// ----------
189//
190// Refinement is executed by "refiners"; there is a refiner for each
191// set of 'items' being exchanged: epochs, keys, certs, and revisions.
192// When refinement starts, each party knows only their own set of
193// items; when refinement completes, each party has learned of the
194// complete set of items it needs to send, and a count of items it's
195// expecting to receive.
196//
197// For more details on the refinement process, see refiner.cc.
198//
199//
200// Transmission
201// ------------
202//
203// Once the set of items to send has been determined (for keys, certs, and
204// revisions) each peer switches into a transmission mode. This mode
205// involves walking the revision graph in ancestry-order and sending all
206// the items the local peer has which the remote one does not. Since the
207// remote and local peers both know all the items which need to be
208// transferred (they learned during refinement) they know what to wait for
209// and what to send. The mechanisms of the transmission phase (notably,
210// enumerator.cc) simply ensure that things are sent in the proper order,
211// and without over-filling the output buffer too much.
212//
213//
214// Shutdown
215// --------
216//
217// After transmission completes, one special command, "bye", is used to
218// shut down a connection gracefully. The shutdown sequence based on "bye"
219// commands is documented below in session::process_bye_cmd.
220//
221//
222// Note on epochs
223// --------------
224//
225// One refinement and transmission phase preceeds all the others: epochs.
226// Epochs are exchanged and compared in order to be sure that further
227// refinement and transmission (on certs and revisions) makes sense; they
228// are a sort of "immune system" to prevent incompatible databases (say
229// between rebuilds due to bugs in monotone) from cross-contaminating. The
230// later refinements are only kicked off *after* all epochs are received
231// and compare correctly.
232//
233//
234// Note on dense coding
235// --------------------
236//
237// This protocol is "raw binary" (non-text) because coding density is
238// actually important here, and each packet consists of very
239// information-dense material that you wouldn't have a hope of typing in,
240// or interpreting manually anyways.
241//
242
243using std::auto_ptr;
244using std::deque;
245using std::make_pair;
246using std::map;
247using std::min;
248using std::pair;
249using std::set;
250using std::string;
251using std::vector;
252
253using boost::shared_ptr;
254using boost::lexical_cast;
255
256struct server_initiated_sync_request
257{
258 string what;
259 string address;
260 string include;
261 string exclude;
262};
263deque<server_initiated_sync_request> server_initiated_sync_requests;
264LUAEXT(server_request_sync, )
265{
266 char const * w = luaL_checkstring(L, 1);
267 char const * a = luaL_checkstring(L, 2);
268 char const * i = luaL_checkstring(L, 3);
269 char const * e = luaL_checkstring(L, 4);
270 server_initiated_sync_request request;
271 request.what = string(w);
272 request.address = string(a);
273 request.include = string(i);
274 request.exclude = string(e);
275 server_initiated_sync_requests.push_back(request);
276 return 0;
277}
278
279static inline void
280require(bool check, string const & context)
281{
282 if (!check)
283 throw bad_decode(F("check of '%s' failed") % context);
284}
285
286static void
287read_pubkey(string const & in,
288 rsa_keypair_id & id,
289 rsa_pub_key & pub)
290{
291 string tmp_id, tmp_key;
292 size_t pos = 0;
293 extract_variable_length_string(in, tmp_id, pos, "pubkey id");
294 extract_variable_length_string(in, tmp_key, pos, "pubkey value");
295 id = rsa_keypair_id(tmp_id);
296 pub = rsa_pub_key(tmp_key);
297}
298
299static void
300write_pubkey(rsa_keypair_id const & id,
301 rsa_pub_key const & pub,
302 string & out)
303{
304 insert_variable_length_string(id(), out);
305 insert_variable_length_string(pub(), out);
306}
307
308struct netsync_error
309{
310 string msg;
311 netsync_error(string const & s): msg(s) {}
312};
313
314struct
315session:
316 public refiner_callbacks,
317 public enumerator_callbacks
318{
319 protocol_role role;
320 protocol_voice const voice;
321 globish our_include_pattern;
322 globish our_exclude_pattern;
323 globish_matcher our_matcher;
324
325 project_t & project;
326 key_store & keys;
327 lua_hooks & lua;
328 bool use_transport_auth;
329 rsa_keypair_id const & signing_key;
330 vector<rsa_keypair_id> const & keys_to_push;
331
332 string peer_id;
333 shared_ptr<Netxx::StreamBase> str;
334
335 string_queue inbuf;
336 // deque of pair<string data, size_t cur_pos>
337 deque< pair<string,size_t> > outbuf;
338 // the total data stored in outbuf - this is
339 // used as a valve to stop too much data
340 // backing up
341 size_t outbuf_size;
342
343 netcmd cmd;
344 bool armed;
345 bool arm();
346
347 bool received_remote_key;
348 rsa_keypair_id remote_peer_key_name;
349 netsync_session_key session_key;
350 chained_hmac read_hmac;
351 chained_hmac write_hmac;
352 bool authenticated;
353
354 time_t last_io_time;
355 auto_ptr<ticker> byte_in_ticker;
356 auto_ptr<ticker> byte_out_ticker;
357 auto_ptr<ticker> cert_in_ticker;
358 auto_ptr<ticker> cert_out_ticker;
359 auto_ptr<ticker> revision_in_ticker;
360 auto_ptr<ticker> revision_out_ticker;
361 size_t bytes_in, bytes_out;
362 size_t certs_in, certs_out;
363 size_t revs_in, revs_out;
364 size_t keys_in, keys_out;
365 // used to identify this session to the netsync hooks.
366 // We can't just use saved_nonce, because that's blank for all
367 // anonymous connections and could lead to confusion.
368 size_t session_id;
369 static size_t session_count;
370
371 vector<revision_id> written_revisions;
372 vector<rsa_keypair_id> written_keys;
373 vector<cert> written_certs;
374
375 id saved_nonce;
376
377 enum
378 {
379 working_state,
380 shutdown_state,
381 confirmed_state
382 }
383 protocol_state;
384
385 bool encountered_error;
386
387 static const int no_error = 200;
388 static const int partial_transfer = 211;
389 static const int no_transfer = 212;
390
391 static const int not_permitted = 412;
392 static const int unknown_key = 422;
393 static const int mixing_versions = 432;
394
395 static const int role_mismatch = 512;
396 static const int bad_command = 521;
397
398 static const int failed_identification = 532;
399 //static const int bad_data = 541;
400
401 int error_code;
402
403 bool set_totals;
404
405 // Interface to refinement.
406 refiner epoch_refiner;
407 refiner key_refiner;
408 refiner cert_refiner;
409 refiner rev_refiner;
410
411 // Interface to ancestry grovelling.
412 revision_enumerator rev_enumerator;
413
414 // Enumerator_callbacks methods.
415 set<file_id> file_items_sent;
416 bool process_this_rev(revision_id const & rev);
417 bool queue_this_cert(id const & c);
418 bool queue_this_file(id const & f);
419 void note_file_data(file_id const & f);
420 void note_file_delta(file_id const & src, file_id const & dst);
421 void note_rev(revision_id const & rev);
422 void note_cert(id const & c);
423
424 session(options & opts,
425 lua_hooks & lua,
426 project_t & project,
427 key_store & keys,
428 protocol_role role,
429 protocol_voice voice,
430 globish const & our_include_pattern,
431 globish const & our_exclude_pattern,
432 string const & peer,
433 shared_ptr<Netxx::StreamBase> sock,
434 bool initiated_by_server = false);
435
436 virtual ~session();
437
438 id mk_nonce();
439 void mark_recent_io();
440
441 void set_session_key(string const & key);
442 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
443
444 void setup_client_tickers();
445 bool done_all_refinements();
446 bool queued_all_items();
447 bool received_all_items();
448 bool finished_working();
449 void maybe_step();
450 void maybe_say_goodbye(transaction_guard & guard);
451
452 void note_item_arrived(netcmd_item_type ty, id const & i);
453 void maybe_note_epochs_finished();
454 void note_item_sent(netcmd_item_type ty, id const & i);
455
456 Netxx::Probe::ready_type which_events() const;
457 bool read_some();
458 bool write_some();
459
460 void error(int errcode, string const & errmsg);
461
462 void write_netcmd_and_try_flush(netcmd const & cmd);
463
464 // Outgoing queue-writers.
465 void queue_bye_cmd(u8 phase);
466 void queue_error_cmd(string const & errmsg);
467 void queue_done_cmd(netcmd_item_type type, size_t n_items);
468 void queue_hello_cmd(rsa_keypair_id const & key_name,
469 rsa_pub_key const & pub_encoded,
470 id const & nonce);
471 void queue_anonymous_cmd(protocol_role role,
472 globish const & include_pattern,
473 globish const & exclude_pattern,
474 id const & nonce2);
475 void queue_auth_cmd(protocol_role role,
476 globish const & include_pattern,
477 globish const & exclude_pattern,
478 id const & client,
479 id const & nonce1,
480 id const & nonce2,
481 rsa_sha1_signature const & signature);
482 void queue_confirm_cmd();
483 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
484 void queue_data_cmd(netcmd_item_type type,
485 id const & item,
486 string const & dat);
487 void queue_delta_cmd(netcmd_item_type type,
488 id const & base,
489 id const & ident,
490 delta const & del);
491
492 // Incoming dispatch-called methods.
493 bool process_error_cmd(string const & errmsg);
494 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
495 rsa_pub_key const & server_key,
496 id const & nonce);
497 bool process_bye_cmd(u8 phase, transaction_guard & guard);
498 bool process_anonymous_cmd(protocol_role role,
499 globish const & their_include_pattern,
500 globish const & their_exclude_pattern);
501 bool process_auth_cmd(protocol_role role,
502 globish const & their_include_pattern,
503 globish const & their_exclude_pattern,
504 id const & client,
505 id const & nonce1,
506 rsa_sha1_signature const & signature);
507 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
508 bool process_done_cmd(netcmd_item_type type, size_t n_items);
509 bool process_data_cmd(netcmd_item_type type,
510 id const & item,
511 string const & dat);
512 bool process_delta_cmd(netcmd_item_type type,
513 id const & base,
514 id const & ident,
515 delta const & del);
516 bool process_usher_cmd(utf8 const & msg);
517
518 // The incoming dispatcher.
519 bool dispatch_payload(netcmd const & cmd,
520 transaction_guard & guard);
521
522 // Various helpers.
523 void assume_corresponding_role(protocol_role their_role);
524 void respond_to_confirm_cmd();
525 bool data_exists(netcmd_item_type type,
526 id const & item);
527 void load_data(netcmd_item_type type,
528 id const & item,
529 string & out);
530
531 void rebuild_merkle_trees(set<branch_name> const & branches);
532
533 void send_all_data(netcmd_item_type ty, set<id> const & items);
534 void begin_service();
535 bool process(transaction_guard & guard);
536
537 bool initiated_by_server;
538};
539size_t session::session_count = 0;
540
541session::session(options & opts,
542 lua_hooks & lua,
543 project_t & project,
544 key_store & keys,
545 protocol_role role,
546 protocol_voice voice,
547 globish const & our_include_pattern,
548 globish const & our_exclude_pattern,
549 string const & peer,
550 shared_ptr<Netxx::StreamBase> sock,
551 bool initiated_by_server) :
552 role(role),
553 voice(voice),
554 our_include_pattern(our_include_pattern),
555 our_exclude_pattern(our_exclude_pattern),
556 our_matcher(our_include_pattern, our_exclude_pattern),
557 project(project),
558 keys(keys),
559 lua(lua),
560 use_transport_auth(opts.use_transport_auth),
561 signing_key(opts.signing_key),
562 keys_to_push(opts.keys_to_push),
563 peer_id(peer),
564 str(sock),
565 inbuf(),
566 outbuf_size(0),
567 armed(false),
568 received_remote_key(false),
569 remote_peer_key_name(""),
570 session_key(constants::netsync_key_initializer),
571 read_hmac(netsync_session_key(constants::netsync_key_initializer),
572 use_transport_auth),
573 write_hmac(netsync_session_key(constants::netsync_key_initializer),
574 use_transport_auth),
575 authenticated(false),
576 last_io_time(::time(NULL)),
577 byte_in_ticker(NULL),
578 byte_out_ticker(NULL),
579 cert_in_ticker(NULL),
580 cert_out_ticker(NULL),
581 revision_in_ticker(NULL),
582 revision_out_ticker(NULL),
583 bytes_in(0), bytes_out(0),
584 certs_in(0), certs_out(0),
585 revs_in(0), revs_out(0),
586 keys_in(0), keys_out(0),
587 session_id(++session_count),
588 saved_nonce(""),
589 protocol_state(working_state),
590 encountered_error(false),
591 error_code(no_transfer),
592 set_totals(false),
593 epoch_refiner(epoch_item, voice, *this),
594 key_refiner(key_item, voice, *this),
595 cert_refiner(cert_item, voice, *this),
596 rev_refiner(revision_item, voice, *this),
597 rev_enumerator(project, *this),
598 initiated_by_server(initiated_by_server)
599{}
600
601session::~session()
602{
603 if (protocol_state == confirmed_state)
604 error_code = no_error;
605 else if (error_code == no_transfer &&
606 (revs_in || revs_out ||
607 certs_in || certs_out ||
608 keys_in || keys_out))
609 error_code = partial_transfer;
610
611 vector<cert> unattached_certs;
612 map<revision_id, vector<cert> > revcerts;
613 for (vector<revision_id>::iterator i = written_revisions.begin();
614 i != written_revisions.end(); ++i)
615 revcerts.insert(make_pair(*i, vector<cert>()));
616 for (vector<cert>::iterator i = written_certs.begin();
617 i != written_certs.end(); ++i)
618 {
619 map<revision_id, vector<cert> >::iterator j;
620 j = revcerts.find(revision_id(i->ident));
621 if (j == revcerts.end())
622 unattached_certs.push_back(*i);
623 else
624 j->second.push_back(*i);
625 }
626
627 // if (role == sink_role || role == source_and_sink_role)
628 if (!written_keys.empty()
629 || !written_revisions.empty()
630 || !written_certs.empty())
631 {
632
633 //Keys
634 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
635 i != written_keys.end(); ++i)
636 {
637 lua.hook_note_netsync_pubkey_received(*i, session_id);
638 }
639
640 //Revisions
641 for (vector<revision_id>::iterator i = written_revisions.begin();
642 i != written_revisions.end(); ++i)
643 {
644 vector<cert> & ctmp(revcerts[*i]);
645 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
646 for (vector<cert>::const_iterator j = ctmp.begin();
647 j != ctmp.end(); ++j)
648 certs.insert(make_pair(j->key, make_pair(j->name, j->value)));
649
650 revision_data rdat;
651 project.db.get_revision(*i, rdat);
652 lua.hook_note_netsync_revision_received(*i, rdat, certs,
653 session_id);
654 }
655
656 //Certs (not attached to a new revision)
657 for (vector<cert>::iterator i = unattached_certs.begin();
658 i != unattached_certs.end(); ++i)
659 lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
660 i->name, i->value, session_id);
661 }
662 lua.hook_note_netsync_end(session_id, error_code,
663 bytes_in, bytes_out,
664 certs_in, certs_out,
665 revs_in, revs_out,
666 keys_in, keys_out);
667}
668
669bool
670session::process_this_rev(revision_id const & rev)
671{
672 return (rev_refiner.items_to_send.find(rev.inner())
673 != rev_refiner.items_to_send.end());
674}
675
676bool
677session::queue_this_cert(id const & c)
678{
679 return (cert_refiner.items_to_send.find(c)
680 != cert_refiner.items_to_send.end());
681}
682
683bool
684session::queue_this_file(id const & f)
685{
686 return file_items_sent.find(file_id(f)) == file_items_sent.end();
687}
688
689void
690session::note_file_data(file_id const & f)
691{
692 if (role == sink_role)
693 return;
694 file_data fd;
695 project.db.get_file_version(f, fd);
696 queue_data_cmd(file_item, f.inner(), fd.inner()());
697 file_items_sent.insert(f);
698}
699
700void
701session::note_file_delta(file_id const & src, file_id const & dst)
702{
703 if (role == sink_role)
704 return;
705 file_delta fdel;
706 project.db.get_arbitrary_file_delta(src, dst, fdel);
707 queue_delta_cmd(file_item, src.inner(), dst.inner(), fdel.inner());
708 file_items_sent.insert(dst);
709}
710
711void
712session::note_rev(revision_id const & rev)
713{
714 if (role == sink_role)
715 return;
716 revision_t rs;
717 project.db.get_revision(rev, rs);
718 data tmp;
719 write_revision(rs, tmp);
720 queue_data_cmd(revision_item, rev.inner(), tmp());
721}
722
723void
724session::note_cert(id const & c)
725{
726 if (role == sink_role)
727 return;
728 revision<cert> cert;
729 string str;
730 project.db.get_revision_cert(c, cert);
731 write_cert(cert.inner(), str);
732 queue_data_cmd(cert_item, c, str);
733}
734
735
736id
737session::mk_nonce()
738{
739 I(this->saved_nonce().size() == 0);
740 char buf[constants::merkle_hash_length_in_bytes];
741 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
742 constants::merkle_hash_length_in_bytes);
743 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
744 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
745 return this->saved_nonce;
746}
747
748void
749session::mark_recent_io()
750{
751 last_io_time = ::time(NULL);
752}
753
754void
755session::set_session_key(string const & key)
756{
757 session_key = netsync_session_key(key);
758 read_hmac.set_key(session_key);
759 write_hmac.set_key(session_key);
760}
761
762void
763session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
764{
765 if (use_transport_auth)
766 {
767 string hmac_key;
768 keys.decrypt_rsa(signing_key, hmac_key_encrypted, hmac_key);
769 set_session_key(hmac_key);
770 }
771}
772
773void
774session::setup_client_tickers()
775{
776 // xgettext: please use short message and try to avoid multibytes chars
777 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
778 // xgettext: please use short message and try to avoid multibytes chars
779 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
780 if (role == sink_role)
781 {
782 // xgettext: please use short message and try to avoid multibytes chars
783 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
784 // xgettext: please use short message and try to avoid multibytes chars
785 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
786 }
787 else if (role == source_role)
788 {
789 // xgettext: please use short message and try to avoid multibytes chars
790 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
791 // xgettext: please use short message and try to avoid multibytes chars
792 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
793 }
794 else
795 {
796 I(role == source_and_sink_role);
797 // xgettext: please use short message and try to avoid multibytes chars
798 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
799 // xgettext: please use short message and try to avoid multibytes chars
800 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
801 }
802}
803
804bool
805session::done_all_refinements()
806{
807 bool all = rev_refiner.done
808 && cert_refiner.done
809 && key_refiner.done
810 && epoch_refiner.done;
811
812 if (all && !set_totals)
813 {
814 if (cert_out_ticker.get())
815 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
816
817 if (revision_out_ticker.get())
818 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
819
820 if (cert_in_ticker.get())
821 cert_in_ticker->set_total(cert_refiner.items_to_receive);
822
823 if (revision_in_ticker.get())
824 revision_in_ticker->set_total(rev_refiner.items_to_receive);
825
826 set_totals = true;
827 }
828 return all;
829}
830
831
832
833bool
834session::received_all_items()
835{
836 if (role == source_role)
837 return true;
838 bool all = rev_refiner.items_to_receive == 0
839 && cert_refiner.items_to_receive == 0
840 && key_refiner.items_to_receive == 0
841 && epoch_refiner.items_to_receive == 0;
842 return all;
843}
844
845bool
846session::finished_working()
847{
848 bool all = done_all_refinements()
849 && received_all_items()
850 && queued_all_items()
851 && rev_enumerator.done();
852 return all;
853}
854
855bool
856session::queued_all_items()
857{
858 if (role == sink_role)
859 return true;
860 bool all = rev_refiner.items_to_send.empty()
861 && cert_refiner.items_to_send.empty()
862 && key_refiner.items_to_send.empty()
863 && epoch_refiner.items_to_send.empty();
864 return all;
865}
866
867
868void
869session::maybe_note_epochs_finished()
870{
871 // Maybe there are outstanding epoch requests.
872 // These only matter if we're in sink or source-and-sink mode.
873 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
874 return;
875
876 // And maybe we haven't even finished the refinement.
877 if (!epoch_refiner.done)
878 return;
879
880 // If we ran into an error -- say a mismatched epoch -- don't do any
881 // further refinements.
882 if (encountered_error)
883 return;
884
885 // But otherwise, we're ready to go. Start the next
886 // set of refinements.
887 if (voice == client_voice)
888 {
889 L(FL("epoch refinement finished; beginning other refinements"));
890 key_refiner.begin_refinement();
891 cert_refiner.begin_refinement();
892 rev_refiner.begin_refinement();
893 }
894 else
895 L(FL("epoch refinement finished"));
896}
897
898static void
899decrement_if_nonzero(netcmd_item_type ty,
900 size_t & n)
901{
902 if (n == 0)
903 {
904 string typestr;
905 netcmd_item_type_to_string(ty, typestr);
906 E(false, F("underflow on count of %s items to receive") % typestr);
907 }
908 --n;
909 if (n == 0)
910 {
911 string typestr;
912 netcmd_item_type_to_string(ty, typestr);
913 L(FL("count of %s items to receive has reached zero") % typestr);
914 }
915}
916
917void
918session::note_item_arrived(netcmd_item_type ty, id const & ident)
919{
920 switch (ty)
921 {
922 case cert_item:
923 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
924 if (cert_in_ticker.get() != NULL)
925 ++(*cert_in_ticker);
926 ++certs_in;
927 break;
928 case revision_item:
929 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
930 if (revision_in_ticker.get() != NULL)
931 ++(*revision_in_ticker);
932 ++revs_in;
933 break;
934 case key_item:
935 decrement_if_nonzero(ty, key_refiner.items_to_receive);
936 ++keys_in;
937 break;
938 case epoch_item:
939 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
940 break;
941 default:
942 // No ticker for other things.
943 break;
944 }
945}
946
947
948
949void
950session::note_item_sent(netcmd_item_type ty, id const & ident)
951{
952 switch (ty)
953 {
954 case cert_item:
955 cert_refiner.items_to_send.erase(ident);
956 if (cert_out_ticker.get() != NULL)
957 ++(*cert_out_ticker);
958 ++certs_out;
959 break;
960 case revision_item:
961 rev_refiner.items_to_send.erase(ident);
962 if (revision_out_ticker.get() != NULL)
963 ++(*revision_out_ticker);
964 ++revs_out;
965 break;
966 case key_item:
967 key_refiner.items_to_send.erase(ident);
968 ++keys_out;
969 break;
970 case epoch_item:
971 epoch_refiner.items_to_send.erase(ident);
972 break;
973 default:
974 // No ticker for other things.
975 break;
976 }
977}
978
979void
980session::write_netcmd_and_try_flush(netcmd const & cmd)
981{
982 if (!encountered_error)
983 {
984 string buf;
985 cmd.write(buf, write_hmac);
986 outbuf.push_back(make_pair(buf, 0));
987 outbuf_size += buf.size();
988 }
989 else
990 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
991 // FIXME: this helps keep the protocol pipeline full but it seems to
992 // interfere with initial and final sequences. careful with it.
993 // write_some();
994 // read_some();
995}
996
997// This method triggers a special "error unwind" mode to netsync. In this
998// mode, all received data is ignored, and no new data is queued. We simply
999// stay connected long enough for the current write buffer to be flushed, to
1000// ensure that our peer receives the error message.
1001// Affects read_some, write_some, and process .
1002void
1003session::error(int errcode, string const & errmsg)
1004{
1005 error_code = errcode;
1006 throw netsync_error(errmsg);
1007}
1008
1009Netxx::Probe::ready_type
1010session::which_events() const
1011{
1012 // Only ask to read if we're not armed.
1013 if (outbuf.empty())
1014 {
1015 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1016 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1017 else
1018 return Netxx::Probe::ready_oobd;
1019 }
1020 else
1021 {
1022 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1023 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1024 else
1025 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1026 }
1027}
1028
1029bool
1030session::read_some()
1031{
1032 I(inbuf.size() < constants::netcmd_maxsz);
1033 char tmp[constants::bufsz];
1034 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1035 if (count > 0)
1036 {
1037 L(FL("read %d bytes from fd %d (peer %s)")
1038 % count % str->get_socketfd() % peer_id);
1039 if (encountered_error)
1040 {
1041 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1042 return true;
1043 }
1044 inbuf.append(tmp,count);
1045 mark_recent_io();
1046 if (byte_in_ticker.get() != NULL)
1047 (*byte_in_ticker) += count;
1048 bytes_in += count;
1049 return true;
1050 }
1051 else
1052 return false;
1053}
1054
1055bool
1056session::write_some()
1057{
1058 I(!outbuf.empty());
1059 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1060 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1061 min(writelen,
1062 constants::bufsz));
1063 if (count > 0)
1064 {
1065 if ((size_t)count == writelen)
1066 {
1067 outbuf_size -= outbuf.front().first.size();
1068 outbuf.pop_front();
1069 }
1070 else
1071 {
1072 outbuf.front().second += count;
1073 }
1074 L(FL("wrote %d bytes to fd %d (peer %s)")
1075 % count % str->get_socketfd() % peer_id);
1076 mark_recent_io();
1077 if (byte_out_ticker.get() != NULL)
1078 (*byte_out_ticker) += count;
1079 bytes_out += count;
1080 if (encountered_error && outbuf.empty())
1081 {
1082 // we've flushed our error message, so it's time to get out.
1083 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1084 return false;
1085 }
1086 return true;
1087 }
1088 else
1089 return false;
1090}
1091
1092// senders
1093
1094void
1095session::queue_error_cmd(string const & errmsg)
1096{
1097 L(FL("queueing 'error' command"));
1098 netcmd cmd;
1099 cmd.write_error_cmd(errmsg);
1100 write_netcmd_and_try_flush(cmd);
1101}
1102
1103void
1104session::queue_bye_cmd(u8 phase)
1105{
1106 L(FL("queueing 'bye' command, phase %d")
1107 % static_cast<size_t>(phase));
1108 netcmd cmd;
1109 cmd.write_bye_cmd(phase);
1110 write_netcmd_and_try_flush(cmd);
1111}
1112
1113void
1114session::queue_done_cmd(netcmd_item_type type,
1115 size_t n_items)
1116{
1117 string typestr;
1118 netcmd_item_type_to_string(type, typestr);
1119 L(FL("queueing 'done' command for %s (%d items)")
1120 % typestr % n_items);
1121 netcmd cmd;
1122 cmd.write_done_cmd(type, n_items);
1123 write_netcmd_and_try_flush(cmd);
1124}
1125
1126void
1127session::queue_hello_cmd(rsa_keypair_id const & key_name,
1128 rsa_pub_key const & pub,
1129 id const & nonce)
1130{
1131 if (use_transport_auth)
1132 cmd.write_hello_cmd(key_name, pub, nonce);
1133 else
1134 cmd.write_hello_cmd(key_name, rsa_pub_key(), nonce);
1135 write_netcmd_and_try_flush(cmd);
1136}
1137
1138void
1139session::queue_anonymous_cmd(protocol_role role,
1140 globish const & include_pattern,
1141 globish const & exclude_pattern,
1142 id const & nonce2)
1143{
1144 netcmd cmd;
1145 rsa_oaep_sha_data hmac_key_encrypted;
1146 if (use_transport_auth)
1147 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1148 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1149 hmac_key_encrypted);
1150 write_netcmd_and_try_flush(cmd);
1151 set_session_key(nonce2());
1152}
1153
1154void
1155session::queue_auth_cmd(protocol_role role,
1156 globish const & include_pattern,
1157 globish const & exclude_pattern,
1158 id const & client,
1159 id const & nonce1,
1160 id const & nonce2,
1161 rsa_sha1_signature const & signature)
1162{
1163 netcmd cmd;
1164 rsa_oaep_sha_data hmac_key_encrypted;
1165 I(use_transport_auth);
1166 project.db.encrypt_rsa(remote_peer_key_name, nonce2(), hmac_key_encrypted);
1167 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1168 nonce1, hmac_key_encrypted, signature);
1169 write_netcmd_and_try_flush(cmd);
1170 set_session_key(nonce2());
1171}
1172
1173void
1174session::queue_confirm_cmd()
1175{
1176 netcmd cmd;
1177 cmd.write_confirm_cmd();
1178 write_netcmd_and_try_flush(cmd);
1179}
1180
1181void
1182session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1183{
1184 string typestr;
1185 hexenc<prefix> hpref;
1186 node.get_hex_prefix(hpref);
1187 netcmd_item_type_to_string(node.type, typestr);
1188 L(FL("queueing refinement %s of %s node '%s', level %d")
1189 % (ty == refinement_query ? "query" : "response")
1190 % typestr % hpref() % static_cast<int>(node.level));
1191 netcmd cmd;
1192 cmd.write_refine_cmd(ty, node);
1193 write_netcmd_and_try_flush(cmd);
1194}
1195
1196void
1197session::queue_data_cmd(netcmd_item_type type,
1198 id const & item,
1199 string const & dat)
1200{
1201 string typestr;
1202 netcmd_item_type_to_string(type, typestr);
1203 hexenc<id> hid;
1204
1205 if (global_sanity.debug_p())
1206 encode_hexenc(item, hid);
1207
1208 if (role == sink_role)
1209 {
1210 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1211 % typestr % hid());
1212 return;
1213 }
1214
1215 L(FL("queueing %d bytes of data for %s item '%s'")
1216 % dat.size() % typestr % hid());
1217
1218 netcmd cmd;
1219 // TODO: This pair of functions will make two copies of a large
1220 // file, the first in cmd.write_data_cmd, and the second in
1221 // write_netcmd_and_try_flush when the data is copied from the
1222 // cmd.payload variable to the string buffer for output. This
1223 // double copy should be collapsed out, it may be better to use
1224 // a string_queue for output as well as input, as that will reduce
1225 // the amount of mallocs that happen when the string queue is large
1226 // enough to just store the data.
1227 cmd.write_data_cmd(type, item, dat);
1228 write_netcmd_and_try_flush(cmd);
1229 note_item_sent(type, item);
1230}
1231
1232void
1233session::queue_delta_cmd(netcmd_item_type type,
1234 id const & base,
1235 id const & ident,
1236 delta const & del)
1237{
1238 I(type == file_item);
1239 string typestr;
1240 netcmd_item_type_to_string(type, typestr);
1241 hexenc<id> base_hid,
1242 ident_hid;
1243
1244 if (global_sanity.debug_p())
1245 {
1246 encode_hexenc(base, base_hid);
1247 encode_hexenc(ident, ident_hid);
1248 }
1249
1250 if (role == sink_role)
1251 {
1252 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1253 % typestr % base_hid() % ident_hid());
1254 return;
1255 }
1256
1257 L(FL("queueing %s delta '%s' -> '%s'")
1258 % typestr % base_hid() % ident_hid());
1259 netcmd cmd;
1260 cmd.write_delta_cmd(type, base, ident, del);
1261 write_netcmd_and_try_flush(cmd);
1262 note_item_sent(type, ident);
1263}
1264
1265
1266// processors
1267
1268bool
1269session::process_error_cmd(string const & errmsg)
1270{
1271 // "xxx string" with xxx being digits means there's an error code
1272 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1273 {
1274 try
1275 {
1276 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1277 if (err >= 100)
1278 {
1279 error_code = err;
1280 throw bad_decode(F("received network error: %s")
1281 % errmsg.substr(4));
1282 }
1283 }
1284 catch (boost::bad_lexical_cast)
1285 { // ok, so it wasn't a number
1286 }
1287 }
1288 throw bad_decode(F("received network error: %s") % errmsg);
1289}
1290
1291static const var_domain known_servers_domain = var_domain("known-servers");
1292
1293bool
1294session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1295 rsa_pub_key const & their_key,
1296 id const & nonce)
1297{
1298 I(!this->received_remote_key);
1299 I(this->saved_nonce().size() == 0);
1300
1301 if (use_transport_auth)
1302 {
1303 id their_key_hash;
1304 key_hash_code(their_keyname, their_key, their_key_hash);
1305 var_value printable_key_hash(encode_hexenc(their_key_hash()));
1306 L(FL("server key has name %s, hash %s")
1307 % their_keyname % printable_key_hash);
1308 var_key their_key_key(known_servers_domain, var_name(peer_id));
1309 if (project.db.var_exists(their_key_key))
1310 {
1311 var_value expected_key_hash;
1312 project.db.get_var(their_key_key, expected_key_hash);
1313 if (expected_key_hash != printable_key_hash)
1314 {
1315 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1316 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1317 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1318 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1319 "it is also possible that the server key has just been changed\n"
1320 "remote host sent key %s\n"
1321 "I expected %s\n"
1322 "'%s unset %s %s' overrides this check")
1323 % printable_key_hash
1324 % expected_key_hash
1325 % ui.prog_name % their_key_key.first % their_key_key.second);
1326 E(false, F("server key changed"));
1327 }
1328 }
1329 else
1330 {
1331 P(F("first time connecting to server %s\n"
1332 "I'll assume it's really them, but you might want to double-check\n"
1333 "their key's fingerprint: %s")
1334 % peer_id
1335 % printable_key_hash);
1336 project.db.set_var(their_key_key, printable_key_hash);
1337 }
1338 if (project.db.put_key(their_keyname, their_key))
1339 W(F("saving public key for %s to database") % their_keyname);
1340
1341 {
1342 hexenc<id> hnonce;
1343 encode_hexenc(nonce, hnonce);
1344 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1345 % printable_key_hash % hnonce);
1346 }
1347
1348 I(project.db.public_key_exists(their_key_hash));
1349
1350 // save their identity
1351 this->received_remote_key = true;
1352 this->remote_peer_key_name = their_keyname;
1353 }
1354
1355 // clients always include in the synchronization set, every branch that the
1356 // user requested
1357 set<branch_name> all_branches, ok_branches;
1358 project.get_branch_list(all_branches);
1359 for (set<branch_name>::const_iterator i = all_branches.begin();
1360 i != all_branches.end(); i++)
1361 {
1362 if (our_matcher((*i)()))
1363 ok_branches.insert(*i);
1364 }
1365 rebuild_merkle_trees(ok_branches);
1366
1367 if (!initiated_by_server)
1368 setup_client_tickers();
1369
1370 if (use_transport_auth && signing_key() != "")
1371 {
1372 // get our key pair
1373 load_key_pair(keys, signing_key);
1374
1375 // make a signature with it;
1376 // this also ensures our public key is in the database
1377 rsa_sha1_signature sig;
1378 keys.make_signature(project.db, signing_key, nonce(), sig);
1379
1380 // get the hash identifier for our pubkey
1381 rsa_pub_key our_pub;
1382 project.db.get_key(signing_key, our_pub);
1383 id our_key_hash_raw;
1384 key_hash_code(signing_key, our_pub, our_key_hash_raw);
1385
1386 // make a new nonce of our own and send off the 'auth'
1387 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1388 our_key_hash_raw, nonce, mk_nonce(), sig);
1389 }
1390 else
1391 {
1392 queue_anonymous_cmd(this->role, our_include_pattern,
1393 our_exclude_pattern, mk_nonce());
1394 }
1395
1396 lua.hook_note_netsync_start(session_id, "client", this->role,
1397 peer_id, their_keyname,
1398 our_include_pattern, our_exclude_pattern);
1399 return true;
1400}
1401
1402bool
1403session::process_anonymous_cmd(protocol_role their_role,
1404 globish const & their_include_pattern,
1405 globish const & their_exclude_pattern)
1406{
1407 // Internally netsync thinks in terms of sources and sinks. Users like
1408 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1409 //
1410 // We therefore use the read/write terminology when dealing with the UI:
1411 // if the user asks to run a "read only" service, this means they are
1412 // willing to be a source but not a sink.
1413 //
1414 // nb: The "role" here is the role the *client* wants to play
1415 // so we need to check that the opposite role is allowed for us,
1416 // in our this->role field.
1417 //
1418
1419 lua.hook_note_netsync_start(session_id, "server", their_role,
1420 peer_id, rsa_keypair_id(),
1421 their_include_pattern, their_exclude_pattern);
1422
1423 // Client must be a sink and server must be a source (anonymous
1424 // read-only), unless transport auth is disabled.
1425 //
1426 // If running in no-transport-auth mode, we operate anonymously and
1427 // permit adoption of any role.
1428
1429 if (use_transport_auth)
1430 {
1431 if (their_role != sink_role)
1432 {
1433 this->saved_nonce = id("");
1434 error(not_permitted,
1435 F("rejected attempt at anonymous connection for write").str());
1436 }
1437
1438 if (this->role == sink_role)
1439 {
1440 this->saved_nonce = id("");
1441 error(role_mismatch,
1442 F("rejected attempt at anonymous connection while running as sink").str());
1443 }
1444 }
1445
1446 set<branch_name> all_branches, ok_branches;
1447 project.get_branch_list(all_branches);
1448 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1449 for (set<branch_name>::const_iterator i = all_branches.begin();
1450 i != all_branches.end(); i++)
1451 {
1452 if (their_matcher((*i)()))
1453 {
1454 if (use_transport_auth &&
1455 !lua.hook_get_netsync_read_permitted((*i)()))
1456 {
1457 error(not_permitted,
1458 (F("anonymous access to branch '%s' denied by server")
1459 % *i).str());
1460 }
1461 else
1462 ok_branches.insert(*i);
1463 }
1464 }
1465
1466 if (use_transport_auth)
1467 {
1468 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1469 % their_include_pattern % their_exclude_pattern);
1470 this->role = source_role;
1471 }
1472 else
1473 {
1474 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1475 % their_include_pattern % their_exclude_pattern);
1476 assume_corresponding_role(their_role);
1477 }
1478
1479 rebuild_merkle_trees(ok_branches);
1480
1481 this->remote_peer_key_name = rsa_keypair_id("");
1482 this->authenticated = true;
1483 return true;
1484}
1485
1486void
1487session::assume_corresponding_role(protocol_role their_role)
1488{
1489 // Assume the (possibly degraded) opposite role.
1490 switch (their_role)
1491 {
1492 case source_role:
1493 I(this->role != source_role);
1494 this->role = sink_role;
1495 break;
1496
1497 case source_and_sink_role:
1498 I(this->role == source_and_sink_role);
1499 break;
1500
1501 case sink_role:
1502 I(this->role != sink_role);
1503 this->role = source_role;
1504 break;
1505 }
1506}
1507
1508bool
1509session::process_auth_cmd(protocol_role their_role,
1510 globish const & their_include_pattern,
1511 globish const & their_exclude_pattern,
1512 id const & client,
1513 id const & nonce1,
1514 rsa_sha1_signature const & signature)
1515{
1516 I(!this->received_remote_key);
1517 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1518
1519 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1520
1521 if (!project.db.public_key_exists(client))
1522 {
1523 // If it's not in the db, it still could be in the keystore if we
1524 // have the private key that goes with it.
1525 rsa_keypair_id their_key_id;
1526 keypair their_keypair;
1527 if (keys.maybe_get_key_pair(client, their_key_id, their_keypair))
1528 project.db.put_key(their_key_id, their_keypair.pub);
1529 else
1530 {
1531 this->saved_nonce = id("");
1532
1533 lua.hook_note_netsync_start(session_id, "server", their_role,
1534 peer_id, rsa_keypair_id("-unknown-"),
1535 their_include_pattern,
1536 their_exclude_pattern);
1537 error(unknown_key,
1538 (F("remote public key hash '%s' is unknown")
1539 % encode_hexenc(client())).str());
1540 }
1541 }
1542
1543 // Get their public key.
1544 rsa_keypair_id their_id;
1545 rsa_pub_key their_key;
1546 project.db.get_pubkey(client, their_id, their_key);
1547
1548 lua.hook_note_netsync_start(session_id, "server", their_role,
1549 peer_id, their_id,
1550 their_include_pattern, their_exclude_pattern);
1551
1552 // Check that they replied with the nonce we asked for.
1553 if (!(nonce1 == this->saved_nonce))
1554 {
1555 this->saved_nonce = id("");
1556 error(failed_identification,
1557 F("detected replay attack in auth netcmd").str());
1558 }
1559
1560 // Internally netsync thinks in terms of sources and sinks. users like
1561 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1562 //
1563 // We therefore use the read/write terminology when dealing with the UI:
1564 // if the user asks to run a "read only" service, this means they are
1565 // willing to be a source but not a sink.
1566 //
1567 // nb: The "their_role" here is the role the *client* wants to play
1568 // so we need to check that the opposite role is allowed for us,
1569 // in our this->role field.
1570
1571 // Client as sink, server as source (reading).
1572
1573 if (their_role == sink_role || their_role == source_and_sink_role)
1574 {
1575 if (this->role != source_role && this->role != source_and_sink_role)
1576 {
1577 this->saved_nonce = id("");
1578 error(not_permitted,
1579 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1580 % their_id % their_include_pattern % their_exclude_pattern).str());
1581 }
1582 }
1583
1584 set<branch_name> all_branches, ok_branches;
1585 project.get_branch_list(all_branches);
1586 for (set<branch_name>::const_iterator i = all_branches.begin();
1587 i != all_branches.end(); i++)
1588 {
1589 if (their_matcher((*i)()))
1590 {
1591 if (!lua.hook_get_netsync_read_permitted((*i)(), their_id))
1592 {
1593 error(not_permitted,
1594 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1595 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1596 }
1597 else
1598 ok_branches.insert(*i);
1599 }
1600 }
1601
1602 // If we're source_and_sink_role, continue even with no branches readable
1603 // eg. serve --db=empty.db
1604 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1605 % their_id % their_include_pattern % their_exclude_pattern);
1606
1607 // Client as source, server as sink (writing).
1608
1609 if (their_role == source_role || their_role == source_and_sink_role)
1610 {
1611 if (this->role != sink_role && this->role != source_and_sink_role)
1612 {
1613 this->saved_nonce = id("");
1614 error(not_permitted,
1615 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1616 % their_id % their_include_pattern % their_exclude_pattern).str());
1617 }
1618
1619 if (!lua.hook_get_netsync_write_permitted(their_id))
1620 {
1621 this->saved_nonce = id("");
1622 error(not_permitted,
1623 (F("denied '%s' write permission for '%s' excluding '%s'")
1624 % their_id % their_include_pattern % their_exclude_pattern).str());
1625 }
1626
1627 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1628 % their_id % their_include_pattern % their_exclude_pattern);
1629 }
1630
1631 rebuild_merkle_trees(ok_branches);
1632
1633 this->received_remote_key = true;
1634
1635 // Check the signature.
1636 if (project.db.check_signature(their_id, nonce1(), signature) == cert_ok)
1637 {
1638 // Get our private key and sign back.
1639 L(FL("client signature OK, accepting authentication"));
1640 this->authenticated = true;
1641 this->remote_peer_key_name = their_id;
1642
1643 assume_corresponding_role(their_role);
1644 return true;
1645 }
1646 else
1647 {
1648 error(failed_identification, (F("bad client signature")).str());
1649 }
1650 return false;
1651}
1652
1653bool
1654session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1655{
1656 string typestr;
1657 netcmd_item_type_to_string(node.type, typestr);
1658 L(FL("processing refine cmd for %s node at level %d")
1659 % typestr % node.level);
1660
1661 switch (node.type)
1662 {
1663 case file_item:
1664 W(F("Unexpected 'refine' command on non-refined item type"));
1665 break;
1666
1667 case key_item:
1668 key_refiner.process_refinement_command(ty, node);
1669 break;
1670
1671 case revision_item:
1672 rev_refiner.process_refinement_command(ty, node);
1673 break;
1674
1675 case cert_item:
1676 cert_refiner.process_refinement_command(ty, node);
1677 break;
1678
1679 case epoch_item:
1680 epoch_refiner.process_refinement_command(ty, node);
1681 break;
1682 }
1683 return true;
1684}
1685
1686bool
1687session::process_bye_cmd(u8 phase,
1688 transaction_guard & guard)
1689{
1690
1691// Ideal shutdown
1692// ~~~~~~~~~~~~~~~
1693//
1694// I/O events state transitions
1695// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1696// client: C_WORKING
1697// server: S_WORKING
1698// 0. [refinement, data, deltas, etc.]
1699// client: C_SHUTDOWN
1700// (client checkpoints here)
1701// 1. client -> "bye 0"
1702// 2. "bye 0" -> server
1703// server: S_SHUTDOWN
1704// (server checkpoints here)
1705// 3. "bye 1" <- server
1706// 4. client <- "bye 1"
1707// client: C_CONFIRMED
1708// 5. client -> "bye 2"
1709// 6. "bye 2" -> server
1710// server: S_CONFIRMED
1711// 7. [server drops connection]
1712//
1713//
1714// Affects of I/O errors or disconnections
1715// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1716// C_WORKING: report error and fault
1717// S_WORKING: report error and recover
1718// C_SHUTDOWN: report error and fault
1719// S_SHUTDOWN: report success and recover
1720// (and warn that client might falsely see error)
1721// C_CONFIRMED: report success
1722// S_CONFIRMED: report success
1723
1724 switch (phase)
1725 {
1726 case 0:
1727 if (voice == server_voice &&
1728 protocol_state == working_state)
1729 {
1730 protocol_state = shutdown_state;
1731 guard.do_checkpoint();
1732 queue_bye_cmd(1);
1733 }
1734 else
1735 error(bad_command, "unexpected bye phase 0 received");
1736 break;
1737
1738 case 1:
1739 if (voice == client_voice &&
1740 protocol_state == shutdown_state)
1741 {
1742 protocol_state = confirmed_state;
1743 queue_bye_cmd(2);
1744 }
1745 else
1746 error(bad_command, "unexpected bye phase 1 received");
1747 break;
1748
1749 case 2:
1750 if (voice == server_voice &&
1751 protocol_state == shutdown_state)
1752 {
1753 protocol_state = confirmed_state;
1754 return false;
1755 }
1756 else
1757 error(bad_command, "unexpected bye phase 2 received");
1758 break;
1759
1760 default:
1761 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1762 }
1763
1764 return true;
1765}
1766
1767bool
1768session::process_done_cmd(netcmd_item_type type, size_t n_items)
1769{
1770 string typestr;
1771 netcmd_item_type_to_string(type, typestr);
1772 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1773 switch (type)
1774 {
1775 case file_item:
1776 W(F("Unexpected 'done' command on non-refined item type"));
1777 break;
1778
1779 case key_item:
1780 key_refiner.process_done_command(n_items);
1781 if (key_refiner.done && role != sink_role)
1782 send_all_data(key_item, key_refiner.items_to_send);
1783 break;
1784
1785 case revision_item:
1786 rev_refiner.process_done_command(n_items);
1787 break;
1788
1789 case cert_item:
1790 cert_refiner.process_done_command(n_items);
1791 break;
1792
1793 case epoch_item:
1794 epoch_refiner.process_done_command(n_items);
1795 if (epoch_refiner.done)
1796 {
1797 send_all_data(epoch_item, epoch_refiner.items_to_send);
1798 maybe_note_epochs_finished();
1799 }
1800 break;
1801 }
1802 return true;
1803}
1804
1805void
1806session::respond_to_confirm_cmd()
1807{
1808 epoch_refiner.begin_refinement();
1809}
1810
1811bool
1812session::data_exists(netcmd_item_type type,
1813 id const & item)
1814{
1815 switch (type)
1816 {
1817 case key_item:
1818 return key_refiner.local_item_exists(item)
1819 || project.db.public_key_exists(item);
1820 case file_item:
1821 return project.db.file_version_exists(file_id(item));
1822 case revision_item:
1823 return rev_refiner.local_item_exists(item)
1824 || project.db.revision_exists(revision_id(item));
1825 case cert_item:
1826 return cert_refiner.local_item_exists(item)
1827 || project.db.revision_cert_exists(revision_id(item));
1828 case epoch_item:
1829 return epoch_refiner.local_item_exists(item)
1830 || project.db.epoch_exists(epoch_id(item));
1831 }
1832 return false;
1833}
1834
1835void
1836session::load_data(netcmd_item_type type,
1837 id const & item,
1838 string & out)
1839{
1840 string typestr;
1841 netcmd_item_type_to_string(type, typestr);
1842 hexenc<id> hitem;
1843 encode_hexenc(item, hitem);
1844
1845 if (!data_exists(type, item))
1846 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1847 % typestr % hitem());
1848
1849 switch (type)
1850 {
1851 case epoch_item:
1852 {
1853 branch_name branch;
1854 epoch_data epoch;
1855 project.db.get_epoch(epoch_id(item), branch, epoch);
1856 write_epoch(branch, epoch, out);
1857 }
1858 break;
1859 case key_item:
1860 {
1861 rsa_keypair_id keyid;
1862 rsa_pub_key pub;
1863 project.db.get_pubkey(item, keyid, pub);
1864 L(FL("public key '%s' is also called '%s'") % hitem() % keyid);
1865 write_pubkey(keyid, pub, out);
1866 }
1867 break;
1868
1869 case revision_item:
1870 {
1871 revision_data mdat;
1872 data dat;
1873 project.db.get_revision(revision_id(item), mdat);
1874 out = mdat.inner()();
1875 }
1876 break;
1877
1878 case file_item:
1879 {
1880 file_data fdat;
1881 data dat;
1882 project.db.get_file_version(file_id(item), fdat);
1883 out = fdat.inner()();
1884 }
1885 break;
1886
1887 case cert_item:
1888 {
1889 revision<cert> c;
1890 project.db.get_revision_cert(item, c);
1891 string tmp;
1892 write_cert(c.inner(), out);
1893 }
1894 break;
1895 }
1896}
1897
1898bool
1899session::process_data_cmd(netcmd_item_type type,
1900 id const & item,
1901 string const & dat)
1902{
1903 hexenc<id> hitem;
1904 encode_hexenc(item, hitem);
1905
1906 string typestr;
1907 netcmd_item_type_to_string(type, typestr);
1908
1909 note_item_arrived(type, item);
1910 if (data_exists(type, item))
1911 {
1912 L(FL("%s '%s' already exists in our database") % typestr % hitem());
1913 if (type == epoch_item)
1914 maybe_note_epochs_finished();
1915 return true;
1916 }
1917
1918 switch (type)
1919 {
1920 case epoch_item:
1921 {
1922 branch_name branch;
1923 epoch_data epoch;
1924 read_epoch(dat, branch, epoch);
1925 L(FL("received epoch %s for branch %s")
1926 % encode_hexenc(epoch.inner()()) % branch);
1927 map<branch_name, epoch_data> epochs;
1928 project.db.get_epochs(epochs);
1929 map<branch_name, epoch_data>::const_iterator i;
1930 i = epochs.find(branch);
1931 if (i == epochs.end())
1932 {
1933 L(FL("branch %s has no epoch; setting epoch to %s")
1934 % branch % encode_hexenc(epoch.inner()()));
1935 project.db.set_epoch(branch, epoch);
1936 }
1937 else
1938 {
1939 L(FL("branch %s already has an epoch; checking") % branch);
1940 // If we get here, then we know that the epoch must be
1941 // different, because if it were the same then the
1942 // if (epoch_exists()) branch up above would have been taken.
1943 // if somehow this is wrong, then we have broken epoch
1944 // hashing or something, which is very dangerous, so play it
1945 // safe...
1946 I(!(i->second == epoch));
1947
1948 // It is safe to call 'error' here, because if we get here,
1949 // then the current netcmd packet cannot possibly have
1950 // written anything to the database.
1951 error(mixing_versions,
1952 (F("Mismatched epoch on branch %s."
1953 " Server has '%s', client has '%s'.")
1954 % branch
1955 % encode_hexenc((voice == server_voice
1956 ? i->second: epoch).inner()())
1957 % encode_hexenc((voice == server_voice
1958 ? epoch : i->second).inner()())).str());
1959 }
1960 }
1961 maybe_note_epochs_finished();
1962 break;
1963
1964 case key_item:
1965 {
1966 rsa_keypair_id keyid;
1967 rsa_pub_key pub;
1968 read_pubkey(dat, keyid, pub);
1969 id tmp;
1970 key_hash_code(keyid, pub, tmp);
1971 if (! (tmp == item))
1972 {
1973 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1974 " wanted '%s' got '%s'")
1975 % hitem() % keyid % hitem()
1976 % encode_hexenc(tmp()));
1977 }
1978 if (project.db.put_key(keyid, pub))
1979 written_keys.push_back(keyid);
1980 else
1981 error(partial_transfer,
1982 (F("Received duplicate key %s") % keyid).str());
1983 }
1984 break;
1985
1986 case cert_item:
1987 {
1988 cert c;
1989 read_cert(dat, c);
1990 id tmp;
1991 cert_hash_code(c, tmp);
1992 if (! (tmp == item))
1993 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem());
1994 if (project.db.put_revision_cert(revision<cert>(c)))
1995 written_certs.push_back(c);
1996 }
1997 break;
1998
1999 case revision_item:
2000 {
2001 L(FL("received revision '%s'") % hitem());
2002 if (project.db.put_revision(revision_id(item), revision_data(dat)))
2003 written_revisions.push_back(revision_id(item));
2004 }
2005 break;
2006
2007 case file_item:
2008 {
2009 L(FL("received file '%s'") % hitem());
2010 project.db.put_file(file_id(item), file_data(dat));
2011 }
2012 break;
2013 }
2014 return true;
2015}
2016
2017bool
2018session::process_delta_cmd(netcmd_item_type type,
2019 id const & base,
2020 id const & ident,
2021 delta const & del)
2022{
2023 string typestr;
2024 netcmd_item_type_to_string(type, typestr);
2025
2026 pair<id,id> id_pair = make_pair(base, ident);
2027
2028 note_item_arrived(type, ident);
2029
2030 switch (type)
2031 {
2032 case file_item:
2033 {
2034 file_id src_file(base), dst_file(ident);
2035 project.db.put_file_version(src_file, dst_file, file_delta(del));
2036 }
2037 break;
2038
2039 default:
2040 L(FL("ignoring delta received for item type %s") % typestr);
2041 break;
2042 }
2043 return true;
2044}
2045
2046bool
2047session::process_usher_cmd(utf8 const & msg)
2048{
2049 if (msg().size())
2050 {
2051 if (msg()[0] == '!')
2052 P(F("Received warning from usher: %s") % msg().substr(1));
2053 else
2054 L(FL("Received greeting from usher: %s") % msg().substr(1));
2055 }
2056 netcmd cmdout;
2057 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2058 write_netcmd_and_try_flush(cmdout);
2059 L(FL("Sent reply."));
2060 return true;
2061}
2062
2063
2064void
2065session::send_all_data(netcmd_item_type ty, set<id> const & items)
2066{
2067 string typestr;
2068 netcmd_item_type_to_string(ty, typestr);
2069
2070 // Use temporary; passed arg will be invalidated during iteration.
2071 set<id> tmp = items;
2072
2073 for (set<id>::const_iterator i = tmp.begin();
2074 i != tmp.end(); ++i)
2075 {
2076 if (data_exists(ty, *i))
2077 {
2078 string out;
2079 load_data(ty, *i, out);
2080 queue_data_cmd(ty, *i, out);
2081 }
2082 }
2083}
2084
2085bool
2086session::dispatch_payload(netcmd const & cmd,
2087 transaction_guard & guard)
2088{
2089
2090 switch (cmd.get_cmd_code())
2091 {
2092
2093 case error_cmd:
2094 {
2095 string errmsg;
2096 cmd.read_error_cmd(errmsg);
2097 return process_error_cmd(errmsg);
2098 }
2099 break;
2100
2101 case hello_cmd:
2102 require(! authenticated, "hello netcmd received when not authenticated");
2103 require(voice == client_voice, "hello netcmd received in client voice");
2104 {
2105 rsa_keypair_id server_keyname;
2106 rsa_pub_key server_key;
2107 id nonce;
2108 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2109 return process_hello_cmd(server_keyname, server_key, nonce);
2110 }
2111 break;
2112
2113 case bye_cmd:
2114 require(authenticated, "bye netcmd received when not authenticated");
2115 {
2116 u8 phase;
2117 cmd.read_bye_cmd(phase);
2118 return process_bye_cmd(phase, guard);
2119 }
2120 break;
2121
2122 case anonymous_cmd:
2123 require(! authenticated, "anonymous netcmd received when not authenticated");
2124 require(voice == server_voice, "anonymous netcmd received in server voice");
2125 require(role == source_role ||
2126 role == source_and_sink_role,
2127 "anonymous netcmd received in source or source/sink role");
2128 {
2129 protocol_role role;
2130 globish their_include_pattern, their_exclude_pattern;
2131 rsa_oaep_sha_data hmac_key_encrypted;
2132 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2133 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2134 "in %s mode\n")
2135 % their_include_pattern % their_exclude_pattern
2136 % (role == source_and_sink_role ? _("source and sink") :
2137 (role == source_role ? _("source") : _("sink"))));
2138
2139 set_session_key(hmac_key_encrypted);
2140 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2141 return false;
2142 queue_confirm_cmd();
2143 return true;
2144 }
2145 break;
2146
2147 case auth_cmd:
2148 require(! authenticated, "auth netcmd received when not authenticated");
2149 require(voice == server_voice, "auth netcmd received in server voice");
2150 {
2151 protocol_role role;
2152 rsa_sha1_signature signature;
2153 globish their_include_pattern, their_exclude_pattern;
2154 id client, nonce1, nonce2;
2155 rsa_oaep_sha_data hmac_key_encrypted;
2156 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2157 client, nonce1, hmac_key_encrypted, signature);
2158
2159 hexenc<id> their_key_hash;
2160 encode_hexenc(client, their_key_hash);
2161 hexenc<id> hnonce1;
2162 encode_hexenc(nonce1, hnonce1);
2163
2164 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2165 "exclude '%s' in %s mode with nonce1 '%s'\n")
2166 % their_key_hash % their_include_pattern % their_exclude_pattern
2167 % (role == source_and_sink_role ? _("source and sink") :
2168 (role == source_role ? _("source") : _("sink")))
2169 % hnonce1);
2170
2171 set_session_key(hmac_key_encrypted);
2172
2173 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2174 client, nonce1, signature))
2175 return false;
2176 queue_confirm_cmd();
2177 return true;
2178 }
2179 break;
2180
2181 case confirm_cmd:
2182 require(! authenticated, "confirm netcmd received when not authenticated");
2183 require(voice == client_voice, "confirm netcmd received in client voice");
2184 {
2185 string signature;
2186 cmd.read_confirm_cmd();
2187 this->authenticated = true;
2188 respond_to_confirm_cmd();
2189 return true;
2190 }
2191 break;
2192
2193 case refine_cmd:
2194 require(authenticated, "refine netcmd received when authenticated");
2195 {
2196 merkle_node node;
2197 refinement_type ty;
2198 cmd.read_refine_cmd(ty, node);
2199 return process_refine_cmd(ty, node);
2200 }
2201 break;
2202
2203 case done_cmd:
2204 require(authenticated, "done netcmd received when not authenticated");
2205 {
2206 size_t n_items;
2207 netcmd_item_type type;
2208 cmd.read_done_cmd(type, n_items);
2209 return process_done_cmd(type, n_items);
2210 }
2211 break;
2212
2213 case data_cmd:
2214 require(authenticated, "data netcmd received when not authenticated");
2215 require(role == sink_role ||
2216 role == source_and_sink_role,
2217 "data netcmd received in source or source/sink role");
2218 {
2219 netcmd_item_type type;
2220 id item;
2221 string dat;
2222 cmd.read_data_cmd(type, item, dat);
2223 return process_data_cmd(type, item, dat);
2224 }
2225 break;
2226
2227 case delta_cmd:
2228 require(authenticated, "delta netcmd received when not authenticated");
2229 require(role == sink_role ||
2230 role == source_and_sink_role,
2231 "delta netcmd received in source or source/sink role");
2232 {
2233 netcmd_item_type type;
2234 id base, ident;
2235 delta del;
2236 cmd.read_delta_cmd(type, base, ident, del);
2237 return process_delta_cmd(type, base, ident, del);
2238 }
2239 break;
2240
2241 case usher_cmd:
2242 {
2243 utf8 greeting;
2244 cmd.read_usher_cmd(greeting);
2245 return process_usher_cmd(greeting);
2246 }
2247 break;
2248
2249 case usher_reply_cmd:
2250 return false; // Should not happen.
2251 break;
2252 }
2253 return false;
2254}
2255
2256// This kicks off the whole cascade starting from "hello".
2257void
2258session::begin_service()
2259{
2260 keypair kp;
2261 if (use_transport_auth)
2262 keys.get_key_pair(signing_key, kp);
2263 queue_hello_cmd(signing_key, kp.pub, mk_nonce());
2264}
2265
2266void
2267session::maybe_step()
2268{
2269 while (done_all_refinements()
2270 && !rev_enumerator.done()
2271 && outbuf_size < constants::bufsz * 10)
2272 {
2273 rev_enumerator.step();
2274 }
2275}
2276
2277void
2278session::maybe_say_goodbye(transaction_guard & guard)
2279{
2280 if (voice == client_voice
2281 && protocol_state == working_state
2282 && finished_working())
2283 {
2284 protocol_state = shutdown_state;
2285 guard.do_checkpoint();
2286 queue_bye_cmd(0);
2287 }
2288}
2289
2290bool
2291session::arm()
2292{
2293 if (!armed)
2294 {
2295 // Don't pack the buffer unnecessarily.
2296 if (outbuf_size > constants::bufsz * 10)
2297 return false;
2298
2299 if (cmd.read(inbuf, read_hmac))
2300 {
2301 armed = true;
2302 }
2303 }
2304 return armed;
2305}
2306
2307bool session::process(transaction_guard & guard)
2308{
2309 if (encountered_error)
2310 return true;
2311 try
2312 {
2313 if (!arm())
2314 return true;
2315
2316 armed = false;
2317 L(FL("processing %d byte input buffer from peer %s")
2318 % inbuf.size() % peer_id);
2319
2320 size_t sz = cmd.encoded_size();
2321 bool ret = dispatch_payload(cmd, guard);
2322
2323 if (inbuf.size() >= constants::netcmd_maxsz)
2324 W(F("input buffer for peer %s is overfull "
2325 "after netcmd dispatch") % peer_id);
2326
2327 guard.maybe_checkpoint(sz);
2328
2329 if (!ret)
2330 L(FL("finishing processing with '%d' packet")
2331 % cmd.get_cmd_code());
2332 return ret;
2333 }
2334 catch (bad_decode & bd)
2335 {
2336 W(F("protocol error while processing peer %s: '%s'")
2337 % peer_id % bd.what);
2338 return false;
2339 }
2340 catch (netsync_error & err)
2341 {
2342 W(F("error: %s") % err.msg);
2343 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2344 encountered_error = true;
2345 return true; // Don't terminate until we've send the error_cmd.
2346 }
2347}
2348
2349
2350static shared_ptr<Netxx::StreamBase>
2351build_stream_to_server(options & opts, lua_hooks & lua,
2352 netsync_connection_info info,
2353 Netxx::port_type default_port,
2354 Netxx::Timeout timeout)
2355{
2356 shared_ptr<Netxx::StreamBase> server;
2357
2358 if (info.client.use_argv)
2359 {
2360 I(info.client.argv.size() > 0);
2361 string cmd = info.client.argv[0];
2362 info.client.argv.erase(info.client.argv.begin());
2363 return shared_ptr<Netxx::StreamBase>
2364 (new Netxx::PipeStream(cmd, info.client.argv));
2365 }
2366 else
2367 {
2368#ifdef USE_IPV6
2369 bool use_ipv6=true;
2370#else
2371 bool use_ipv6=false;
2372#endif
2373 string host(info.client.u.host);
2374 if (host.empty())
2375 host = info.client.unparsed();
2376 if (!info.client.u.port.empty())
2377 default_port = lexical_cast<Netxx::port_type>(info.client.u.port);
2378 Netxx::Address addr(info.client.unparsed().c_str(),
2379 default_port, use_ipv6);
2380 return shared_ptr<Netxx::StreamBase>
2381 (new Netxx::Stream(addr, timeout));
2382 }
2383}
2384
2385static void
2386call_server(options & opts,
2387 lua_hooks & lua,
2388 project_t & project,
2389 key_store & keys,
2390 protocol_role role,
2391 netsync_connection_info const & info,
2392 Netxx::port_type default_port,
2393 unsigned long timeout_seconds)
2394{
2395 Netxx::PipeCompatibleProbe probe;
2396 transaction_guard guard(project.db);
2397
2398 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2399
2400 P(F("connecting to %s") % info.client.unparsed);
2401
2402 shared_ptr<Netxx::StreamBase> server
2403 = build_stream_to_server(opts, lua,
2404 info, default_port,
2405 timeout);
2406
2407
2408 // 'false' here means not to revert changes when the SockOpt
2409 // goes out of scope.
2410 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2411 socket_options.set_non_blocking();
2412
2413 session sess(opts, lua, project, keys,
2414 role, client_voice,
2415 info.client.include_pattern,
2416 info.client.exclude_pattern,
2417 info.client.unparsed(), server);
2418
2419 while (true)
2420 {
2421 bool armed = false;
2422 try
2423 {
2424 armed = sess.arm();
2425 }
2426 catch (bad_decode & bd)
2427 {
2428 E(false, F("protocol error while processing peer %s: '%s'")
2429 % sess.peer_id % bd.what);
2430 }
2431
2432 sess.maybe_step();
2433 sess.maybe_say_goodbye(guard);
2434
2435 probe.clear();
2436 probe.add(*(sess.str), sess.which_events());
2437 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2438 Netxx::Probe::ready_type event = res.second;
2439 Netxx::socket_type fd = res.first;
2440
2441 if (fd == -1 && !armed)
2442 {
2443 E(false, (F("timed out waiting for I/O with "
2444 "peer %s, disconnecting")
2445 % sess.peer_id));
2446 }
2447
2448 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2449
2450 if (event & Netxx::Probe::ready_read)
2451 all_io_clean = all_io_clean && sess.read_some();
2452
2453 if (event & Netxx::Probe::ready_write)
2454 all_io_clean = all_io_clean && sess.write_some();
2455
2456 if (armed)
2457 if (!sess.process(guard))
2458 {
2459 // Commit whatever work we managed to accomplish anyways.
2460 guard.commit();
2461
2462 // We failed during processing. This should only happen in
2463 // client voice when we have a decode exception, or received an
2464 // error from our server (which is translated to a decode
2465 // exception). We call these cases E() errors.
2466 E(false, F("processing failure while talking to "
2467 "peer %s, disconnecting")
2468 % sess.peer_id);
2469 return;
2470 }
2471
2472 if (!all_io_clean)
2473 {
2474 // Commit whatever work we managed to accomplish anyways.
2475 guard.commit();
2476
2477 // We had an I/O error. We must decide if this represents a
2478 // user-reported error or a clean disconnect. See protocol
2479 // state diagram in session::process_bye_cmd.
2480
2481 if (sess.protocol_state == session::confirmed_state)
2482 {
2483 P(F("successful exchange with %s")
2484 % sess.peer_id);
2485 return;
2486 }
2487 else if (sess.encountered_error)
2488 {
2489 P(F("peer %s disconnected after we informed them of error")
2490 % sess.peer_id);
2491 return;
2492 }
2493 else
2494 E(false, (F("I/O failure while talking to "
2495 "peer %s, disconnecting")
2496 % sess.peer_id));
2497 }
2498 }
2499}
2500
2501static void
2502drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2503 Netxx::socket_type fd)
2504{
2505 // This is a bit of a hack. Initially all "file descriptors" in
2506 // netsync were full duplex, so we could get away with indexing
2507 // sessions by their file descriptor.
2508 //
2509 // When using pipes in unix, it's no longer true: a session gets
2510 // entered in the session map under its read pipe fd *and* its write
2511 // pipe fd. When we're in such a situation the socket fd is "-1" and
2512 // we downcast to a PipeStream and use its read+write fds.
2513 //
2514 // When using pipes in windows, we use a full duplex pipe (named
2515 // pipe) so the socket-like abstraction holds.
2516
2517 I(fd != -1);
2518 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2519 I(i != sessions.end());
2520 shared_ptr<session> sess = i->second;
2521 fd = sess->str->get_socketfd();
2522 if (fd != -1)
2523 {
2524 sessions.erase(fd);
2525 }
2526 else
2527 {
2528 shared_ptr<Netxx::PipeStream> pipe =
2529 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2530 I(static_cast<bool>(pipe));
2531 I(pipe->get_writefd() != -1);
2532 I(pipe->get_readfd() != -1);
2533 sessions.erase(pipe->get_readfd());
2534 sessions.erase(pipe->get_writefd());
2535 }
2536}
2537
2538static void
2539arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2540 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2541 set<Netxx::socket_type> & armed_sessions,
2542 transaction_guard & guard)
2543{
2544 set<Netxx::socket_type> arm_failed;
2545 for (map<Netxx::socket_type,
2546 shared_ptr<session> >::const_iterator i = sessions.begin();
2547 i != sessions.end(); ++i)
2548 {
2549 i->second->maybe_step();
2550 i->second->maybe_say_goodbye(guard);
2551 try
2552 {
2553 if (i->second->arm())
2554 {
2555 L(FL("fd %d is armed") % i->first);
2556 armed_sessions.insert(i->first);
2557 }
2558 probe.add(*i->second->str, i->second->which_events());
2559 }
2560 catch (bad_decode & bd)
2561 {
2562 W(F("protocol error while processing peer %s: '%s', marking as bad")
2563 % i->second->peer_id % bd.what);
2564 arm_failed.insert(i->first);
2565 }
2566 }
2567 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2568 i != arm_failed.end(); ++i)
2569 {
2570 drop_session_associated_with_fd(sessions, *i);
2571 }
2572}
2573
2574static void
2575handle_new_connection(options & opts,
2576 lua_hooks & lua,
2577 project_t & project,
2578 key_store & keys,
2579 Netxx::Address & addr,
2580 Netxx::StreamServer & server,
2581 Netxx::Timeout & timeout,
2582 protocol_role role,
2583 map<Netxx::socket_type, shared_ptr<session> > & sessions)
2584{
2585 L(FL("accepting new connection on %s : %s")
2586 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2587 Netxx::Peer client = server.accept_connection();
2588
2589 if (!client)
2590 {
2591 L(FL("accept() returned a dead client"));
2592 }
2593 else
2594 {
2595 P(F("accepted new client connection from %s : %s")
2596 % client.get_address() % lexical_cast<string>(client.get_port()));
2597
2598 // 'false' here means not to revert changes when the SockOpt
2599 // goes out of scope.
2600 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2601 socket_options.set_non_blocking();
2602
2603 shared_ptr<Netxx::Stream> str =
2604 shared_ptr<Netxx::Stream>
2605 (new Netxx::Stream(client.get_socketfd(), timeout));
2606
2607 shared_ptr<session> sess(new session(opts, lua, project, keys,
2608 role, server_voice,
2609 globish("*"), globish(""),
2610 lexical_cast<string>(client), str));
2611 sess->begin_service();
2612 sessions.insert(make_pair(client.get_socketfd(), sess));
2613 }
2614}
2615
2616static void
2617handle_read_available(Netxx::socket_type fd,
2618 shared_ptr<session> sess,
2619 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2620 set<Netxx::socket_type> & armed_sessions,
2621 bool & live_p)
2622{
2623 if (sess->read_some())
2624 {
2625 try
2626 {
2627 if (sess->arm())
2628 armed_sessions.insert(fd);
2629 }
2630 catch (bad_decode & bd)
2631 {
2632 W(F("protocol error while processing peer %s: '%s', disconnecting")
2633 % sess->peer_id % bd.what);
2634 drop_session_associated_with_fd(sessions, fd);
2635 live_p = false;
2636 }
2637 }
2638 else
2639 {
2640 switch (sess->protocol_state)
2641 {
2642 case session::working_state:
2643 P(F("peer %s read failed in working state (error)")
2644 % sess->peer_id);
2645 break;
2646
2647 case session::shutdown_state:
2648 P(F("peer %s read failed in shutdown state "
2649 "(possibly client misreported error)")
2650 % sess->peer_id);
2651 break;
2652
2653 case session::confirmed_state:
2654 P(F("peer %s read failed in confirmed state (success)")
2655 % sess->peer_id);
2656 break;
2657 }
2658 drop_session_associated_with_fd(sessions, fd);
2659 live_p = false;
2660 }
2661}
2662
2663
2664static void
2665handle_write_available(Netxx::socket_type fd,
2666 shared_ptr<session> sess,
2667 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2668 bool & live_p)
2669{
2670 if (!sess->write_some())
2671 {
2672 switch (sess->protocol_state)
2673 {
2674 case session::working_state:
2675 P(F("peer %s write failed in working state (error)")
2676 % sess->peer_id);
2677 break;
2678
2679 case session::shutdown_state:
2680 P(F("peer %s write failed in shutdown state "
2681 "(possibly client misreported error)")
2682 % sess->peer_id);
2683 break;
2684
2685 case session::confirmed_state:
2686 P(F("peer %s write failed in confirmed state (success)")
2687 % sess->peer_id);
2688 break;
2689 }
2690
2691 drop_session_associated_with_fd(sessions, fd);
2692 live_p = false;
2693 }
2694}
2695
2696static void
2697process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2698 set<Netxx::socket_type> & armed_sessions,
2699 transaction_guard & guard)
2700{
2701 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2702 i != armed_sessions.end(); ++i)
2703 {
2704 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2705 j = sessions.find(*i);
2706 if (j == sessions.end())
2707 continue;
2708 else
2709 {
2710 shared_ptr<session> sess = j->second;
2711 if (!sess->process(guard))
2712 {
2713 P(F("peer %s processing finished, disconnecting")
2714 % sess->peer_id);
2715 drop_session_associated_with_fd(sessions, *i);
2716 }
2717 }
2718 }
2719}
2720
2721static void
2722reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2723 unsigned long timeout_seconds)
2724{
2725 // Kill any clients which haven't done any i/o inside the timeout period
2726 // or who have exchanged all items and flushed their output buffers.
2727 set<Netxx::socket_type> dead_clients;
2728 time_t now = ::time(NULL);
2729 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2730 i = sessions.begin(); i != sessions.end(); ++i)
2731 {
2732 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2733 < static_cast<unsigned long>(now))
2734 {
2735 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2736 % i->first % i->second->peer_id);
2737 dead_clients.insert(i->first);
2738 }
2739 }
2740 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2741 i != dead_clients.end(); ++i)
2742 {
2743 drop_session_associated_with_fd(sessions, *i);
2744 }
2745}
2746
2747static void
2748serve_connections(options & opts,
2749 lua_hooks & lua,
2750 project_t & project,
2751 key_store & keys,
2752 protocol_role role,
2753 std::list<utf8> const & addresses,
2754 Netxx::port_type default_port,
2755 unsigned long timeout_seconds,
2756 unsigned long session_limit)
2757{
2758 Netxx::PipeCompatibleProbe probe;
2759
2760 Netxx::Timeout
2761 forever,
2762 timeout(static_cast<long>(timeout_seconds)),
2763 instant(0,1);
2764
2765#ifdef USE_IPV6
2766 bool use_ipv6=true;
2767#else
2768 bool use_ipv6=false;
2769#endif
2770 // This will be true when we try to bind while using IPv6. See comments
2771 // further down.
2772 bool try_again=false;
2773
2774 do
2775 {
2776 try
2777 {
2778 try_again = false;
2779
2780 Netxx::Address addr(use_ipv6);
2781
2782 if (addresses.empty())
2783 addr.add_all_addresses(default_port);
2784 else
2785 {
2786 for (std::list<utf8>::const_iterator it = addresses.begin(); it != addresses.end(); ++it)
2787 {
2788 const utf8 & address = *it;
2789 if (!address().empty())
2790 {
2791 size_t l_colon = address().find(':');
2792 size_t r_colon = address().rfind(':');
2793
2794 if (l_colon == r_colon && l_colon == 0)
2795 {
2796 // can't be an IPv6 address as there is only one colon
2797 // must be a : followed by a port
2798 string port_str = address().substr(1);
2799 addr.add_all_addresses(std::atoi(port_str.c_str()));
2800 }
2801 else
2802 addr.add_address(address().c_str(), default_port);
2803 }
2804 }
2805 }
2806
2807 // If se use IPv6 and the initialisation of server fails, we want
2808 // to try again with IPv4. The reason is that someone may have
2809 // downloaded a IPv6-enabled monotone on a system that doesn't
2810 // have IPv6, and which might fail therefore.
2811 // On failure, Netxx::NetworkException is thrown, and we catch
2812 // it further down.
2813 try_again=use_ipv6;
2814
2815 Netxx::StreamServer server(addr, timeout);
2816
2817 // If we came this far, whatever we used (IPv6 or IPv4) was
2818 // accepted, so we don't need to try again any more.
2819 try_again=false;
2820
2821 const char *name = addr.get_name();
2822 P(F("beginning service on %s : %s")
2823 % (name != NULL ? name : _("<all interfaces>"))
2824 % lexical_cast<string>(addr.get_port()));
2825
2826 map<Netxx::socket_type, shared_ptr<session> > sessions;
2827 set<Netxx::socket_type> armed_sessions;
2828
2829 shared_ptr<transaction_guard> guard;
2830
2831 while (true)
2832 {
2833 probe.clear();
2834 armed_sessions.clear();
2835
2836 if (sessions.size() >= session_limit)
2837 W(F("session limit %d reached, some connections "
2838 "will be refused") % session_limit);
2839 else
2840 probe.add(server);
2841
2842 if (!guard)
2843 guard = shared_ptr<transaction_guard>
2844 (new transaction_guard(project.db));
2845
2846 I(guard);
2847
2848 while (!server_initiated_sync_requests.empty())
2849 {
2850 server_initiated_sync_request request
2851 = server_initiated_sync_requests.front();
2852 server_initiated_sync_requests.pop_front();
2853
2854 netsync_connection_info info;
2855 info.client.unparsed = utf8(request.address);
2856 info.client.include_pattern = globish(request.include);
2857 info.client.exclude_pattern = globish(request.exclude);
2858 info.client.use_argv = false;
2859 parse_uri(info.client.unparsed(), info.client.u);
2860
2861 try
2862 {
2863 P(F("connecting to %s") % info.client.unparsed);
2864 shared_ptr<Netxx::StreamBase> server
2865 = build_stream_to_server(opts, lua,
2866 info, default_port,
2867 timeout);
2868
2869 // 'false' here means not to revert changes when
2870 // the SockOpt goes out of scope.
2871 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2872 socket_options.set_non_blocking();
2873
2874 protocol_role role = source_and_sink_role;
2875 if (request.what == "sync")
2876 role = source_and_sink_role;
2877 else if (request.what == "push")
2878 role = source_role;
2879 else if (request.what == "pull")
2880 role = sink_role;
2881
2882 shared_ptr<session> sess(new session(opts, lua,
2883 project, keys,
2884 role, client_voice,
2885 info.client.include_pattern,
2886 info.client.exclude_pattern,
2887 info.client.unparsed(),
2888 server, true));
2889
2890 sessions.insert(make_pair(server->get_socketfd(), sess));
2891 }
2892 catch (Netxx::NetworkException & e)
2893 {
2894 P(F("Network error: %s") % e.what());
2895 }
2896 }
2897
2898 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard);
2899
2900 L(FL("i/o probe with %d armed") % armed_sessions.size());
2901 Netxx::socket_type fd;
2902 Netxx::Timeout how_long;
2903 if (sessions.empty())
2904 how_long = forever;
2905 else if (armed_sessions.empty())
2906 how_long = timeout;
2907 else
2908 how_long = instant;
2909 do
2910 {
2911 Netxx::Probe::result_type res = probe.ready(how_long);
2912 how_long = instant;
2913 Netxx::Probe::ready_type event = res.second;
2914 fd = res.first;
2915
2916 if (fd == -1)
2917 {
2918 if (armed_sessions.empty())
2919 L(FL("timed out waiting for I/O (listening on %s : %s)")
2920 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2921 }
2922
2923 // we either got a new connection
2924 else if (fd == server)
2925 handle_new_connection(opts, lua, project, keys,
2926 addr, server, timeout, role,
2927 sessions);
2928
2929 // or an existing session woke up
2930 else
2931 {
2932 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2933 i = sessions.find(fd);
2934 if (i == sessions.end())
2935 {
2936 L(FL("got woken up for action on unknown fd %d") % fd);
2937 }
2938 else
2939 {
2940 probe.remove(*(i->second->str));
2941 shared_ptr<session> sess = i->second;
2942 bool live_p = true;
2943
2944 try
2945 {
2946 if (event & Netxx::Probe::ready_read)
2947 handle_read_available(fd, sess, sessions,
2948 armed_sessions, live_p);
2949
2950 if (live_p && (event & Netxx::Probe::ready_write))
2951 handle_write_available(fd, sess, sessions, live_p);
2952 }
2953 catch (Netxx::Exception &)
2954 {
2955 P(F("Network error on peer %s, disconnecting")
2956 % sess->peer_id);
2957 drop_session_associated_with_fd(sessions, fd);
2958 }
2959 if (live_p && (event & Netxx::Probe::ready_oobd))
2960 {
2961 P(F("got OOB from peer %s, disconnecting")
2962 % sess->peer_id);
2963 drop_session_associated_with_fd(sessions, fd);
2964 }
2965 }
2966 }
2967 }
2968 while (fd != -1);
2969 process_armed_sessions(sessions, armed_sessions, *guard);
2970 reap_dead_sessions(sessions, timeout_seconds);
2971
2972 if (sessions.empty())
2973 {
2974 // Let the guard die completely if everything's gone quiet.
2975 guard->commit();
2976 guard.reset();
2977 }
2978 }
2979 }
2980 // This exception is thrown when bind() fails somewhere in Netxx.
2981 catch (Netxx::NetworkException &)
2982 {
2983 // If we tried with IPv6 and failed, we want to try again using IPv4.
2984 if (try_again)
2985 {
2986 use_ipv6 = false;
2987 }
2988 // In all other cases, just rethrow the exception.
2989 else
2990 throw;
2991 }
2992 // This exception is thrown when there is no support for the type of
2993 // connection we want to do in the kernel, for example when a socket()
2994 // call fails somewhere in Netxx.
2995 catch (Netxx::Exception &)
2996 {
2997 // If we tried with IPv6 and failed, we want to try again using IPv4.
2998 if (try_again)
2999 {
3000 use_ipv6 = false;
3001 }
3002 // In all other cases, just rethrow the exception.
3003 else
3004 throw;
3005 }
3006 }
3007 while(try_again);
3008 }
3009
3010static void
3011serve_single_connection(shared_ptr<session> sess,
3012 unsigned long timeout_seconds)
3013{
3014 Netxx::PipeCompatibleProbe probe;
3015
3016 Netxx::Timeout
3017 forever,
3018 timeout(static_cast<long>(timeout_seconds)),
3019 instant(0,1);
3020
3021 P(F("beginning service on %s") % sess->peer_id);
3022
3023 sess->begin_service();
3024
3025 transaction_guard guard(sess->project.db);
3026
3027 map<Netxx::socket_type, shared_ptr<session> > sessions;
3028 set<Netxx::socket_type> armed_sessions;
3029
3030 if (sess->str->get_socketfd() == -1)
3031 {
3032 // Unix pipes are non-duplex, have two filedescriptors
3033 shared_ptr<Netxx::PipeStream> pipe =
3034 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
3035 I(pipe);
3036 sessions[pipe->get_writefd()]=sess;
3037 sessions[pipe->get_readfd()]=sess;
3038 }
3039 else
3040 sessions[sess->str->get_socketfd()]=sess;
3041
3042 while (!sessions.empty())
3043 {
3044 probe.clear();
3045 armed_sessions.clear();
3046
3047 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, guard);
3048
3049 L(FL("i/o probe with %d armed") % armed_sessions.size());
3050 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
3051 : instant));
3052 Netxx::Probe::ready_type event = res.second;
3053 Netxx::socket_type fd = res.first;
3054
3055 if (fd == -1)
3056 {
3057 if (armed_sessions.empty())
3058 L(FL("timed out waiting for I/O (listening on %s)")
3059 % sess->peer_id);
3060 }
3061
3062 // an existing session woke up
3063 else
3064 {
3065 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3066 i = sessions.find(fd);
3067 if (i == sessions.end())
3068 {
3069 L(FL("got woken up for action on unknown fd %d") % fd);
3070 }
3071 else
3072 {
3073 shared_ptr<session> sess = i->second;
3074 bool live_p = true;
3075
3076 if (event & Netxx::Probe::ready_read)
3077 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3078
3079 if (live_p && (event & Netxx::Probe::ready_write))
3080 handle_write_available(fd, sess, sessions, live_p);
3081
3082 if (live_p && (event & Netxx::Probe::ready_oobd))
3083 {
3084 P(F("got some OOB data on fd %d (peer %s), disconnecting")
3085 % fd % sess->peer_id);
3086 drop_session_associated_with_fd(sessions, fd);
3087 }
3088 }
3089 }
3090 process_armed_sessions(sessions, armed_sessions, guard);
3091 reap_dead_sessions(sessions, timeout_seconds);
3092 }
3093}
3094
3095
3096void
3097insert_with_parents(revision_id rev,
3098 refiner & ref,
3099 revision_enumerator & rev_enumerator,
3100 set<revision_id> & revs,
3101 ticker & revisions_ticker)
3102{
3103 deque<revision_id> work;
3104 work.push_back(rev);
3105 while (!work.empty())
3106 {
3107 revision_id rid = work.front();
3108 work.pop_front();
3109
3110 if (!null_id(rid) && revs.find(rid) == revs.end())
3111 {
3112 revs.insert(rid);
3113 ++revisions_ticker;
3114 ref.note_local_item(rid.inner());
3115 vector<revision_id> parents;
3116 rev_enumerator.get_revision_parents(rid, parents);
3117 for (vector<revision_id>::const_iterator i = parents.begin();
3118 i != parents.end(); ++i)
3119 {
3120 work.push_back(*i);
3121 }
3122 }
3123 }
3124}
3125
3126void
3127session::rebuild_merkle_trees(set<branch_name> const & branchnames)
3128{
3129 P(F("finding items to synchronize:"));
3130 for (set<branch_name>::const_iterator i = branchnames.begin();
3131 i != branchnames.end(); ++i)
3132 L(FL("including branch %s") % *i);
3133
3134 // xgettext: please use short message and try to avoid multibytes chars
3135 ticker revisions_ticker(N_("revisions"), "r", 64);
3136 // xgettext: please use short message and try to avoid multibytes chars
3137 ticker certs_ticker(N_("certificates"), "c", 256);
3138 // xgettext: please use short message and try to avoid multibytes chars
3139 ticker keys_ticker(N_("keys"), "k", 1);
3140
3141 set<revision_id> revision_ids;
3142 set<rsa_keypair_id> inserted_keys;
3143
3144 {
3145 for (set<branch_name>::const_iterator i = branchnames.begin();
3146 i != branchnames.end(); ++i)
3147 {
3148 // Get branch certs.
3149 vector< revision<cert> > certs;
3150 project.get_branch_certs(*i, certs);
3151 for (vector< revision<cert> >::const_iterator j = certs.begin();
3152 j != certs.end(); j++)
3153 {
3154 revision_id rid(j->inner().ident);
3155 insert_with_parents(rid, rev_refiner, rev_enumerator,
3156 revision_ids, revisions_ticker);
3157 // Branch certs go in here, others later on.
3158 id item;
3159 cert_hash_code(j->inner(), item);
3160 cert_refiner.note_local_item(item);
3161 rev_enumerator.note_cert(rid, item);
3162 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3163 inserted_keys.insert(j->inner().key);
3164 }
3165 }
3166 }
3167
3168 {
3169 map<branch_name, epoch_data> epochs;
3170 project.db.get_epochs(epochs);
3171
3172 epoch_data epoch_zero(string(constants::epochlen_bytes, '\x00'));
3173 for (set<branch_name>::const_iterator i = branchnames.begin();
3174 i != branchnames.end(); ++i)
3175 {
3176 branch_name const & branch(*i);
3177 map<branch_name, epoch_data>::const_iterator j;
3178 j = epochs.find(branch);
3179
3180 // Set to zero any epoch which is not yet set.
3181 if (j == epochs.end())
3182 {
3183 L(FL("setting epoch on %s to zero") % branch);
3184 epochs.insert(make_pair(branch, epoch_zero));
3185 project.db.set_epoch(branch, epoch_zero);
3186 }
3187
3188 // Then insert all epochs into merkle tree.
3189 j = epochs.find(branch);
3190 I(j != epochs.end());
3191 epoch_id eid;
3192 epoch_hash_code(j->first, j->second, eid);
3193 epoch_refiner.note_local_item(eid.inner());
3194 }
3195 }
3196
3197 {
3198 typedef vector< pair<revision_id,
3199 pair<revision_id, rsa_keypair_id> > > cert_idx;
3200
3201 cert_idx idx;
3202 project.db.get_revision_cert_nobranch_index(idx);
3203
3204 // Insert all non-branch certs reachable via these revisions
3205 // (branch certs were inserted earlier).
3206
3207 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3208 {
3209 revision_id const & hash = i->first;
3210 revision_id const & ident = i->second.first;
3211 rsa_keypair_id const & key = i->second.second;
3212
3213 rev_enumerator.note_cert(ident, hash.inner());
3214
3215 if (revision_ids.find(ident) == revision_ids.end())
3216 continue;
3217
3218 cert_refiner.note_local_item(hash.inner());
3219 ++certs_ticker;
3220 if (inserted_keys.find(key) == inserted_keys.end())
3221 inserted_keys.insert(key);
3222 }
3223 }
3224
3225 // Add any keys specified on the command line.
3226 for (vector<rsa_keypair_id>::const_iterator key
3227 = keys_to_push.begin();
3228 key != keys_to_push.end(); ++key)
3229 {
3230 if (inserted_keys.find(*key) == inserted_keys.end())
3231 {
3232 if (!project.db.public_key_exists(*key))
3233 {
3234 keypair kp;
3235 if (keys.maybe_get_key_pair(*key, kp))
3236 project.db.put_key(*key, kp.pub);
3237 else
3238 W(F("Cannot find key '%s'") % *key);
3239 }
3240 inserted_keys.insert(*key);
3241 }
3242 }
3243
3244 // Insert all the keys.
3245 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3246 key != inserted_keys.end(); key++)
3247 {
3248 if (project.db.public_key_exists(*key))
3249 {
3250 rsa_pub_key pub;
3251 project.db.get_key(*key, pub);
3252 id keyhash;
3253 key_hash_code(*key, pub, keyhash);
3254
3255 if (global_sanity.debug_p())
3256 L(FL("noting key '%s' = '%s' to send")
3257 % *key
3258 % encode_hexenc(keyhash()));
3259
3260 key_refiner.note_local_item(keyhash);
3261 ++keys_ticker;
3262 }
3263 }
3264
3265 rev_refiner.reindex_local_items();
3266 cert_refiner.reindex_local_items();
3267 key_refiner.reindex_local_items();
3268 epoch_refiner.reindex_local_items();
3269}
3270
3271void
3272run_netsync_protocol(options & opts, lua_hooks & lua,
3273 project_t & project, key_store & keys,
3274 protocol_voice voice,
3275 protocol_role role,
3276 netsync_connection_info const & info)
3277{
3278 if (info.client.include_pattern().find_first_of("'\"") != string::npos)
3279 {
3280 W(F("include branch pattern contains a quote character:\n"
3281 "%s") % info.client.include_pattern());
3282 }
3283
3284 if (info.client.exclude_pattern().find_first_of("'\"") != string::npos)
3285 {
3286 W(F("exclude branch pattern contains a quote character:\n"
3287 "%s") % info.client.exclude_pattern());
3288 }
3289
3290 // We do not want to be killed by SIGPIPE from a network disconnect.
3291 ignore_sigpipe();
3292
3293 try
3294 {
3295 if (voice == server_voice)
3296 {
3297 if (opts.bind_stdio)
3298 {
3299 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3300 shared_ptr<session> sess(new session(opts, lua, project, keys,
3301 role, server_voice,
3302 globish("*"), globish(""),
3303 "stdio", str));
3304 serve_single_connection(sess,constants::netsync_timeout_seconds);
3305 }
3306 else
3307 serve_connections(opts, lua, project, keys,
3308 role,
3309 info.server.addrs,
3310 static_cast<Netxx::port_type>(constants::netsync_default_port),
3311 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3312 static_cast<unsigned long>(constants::netsync_connection_limit));
3313 }
3314 else
3315 {
3316 I(voice == client_voice);
3317 call_server(opts, lua, project, keys,
3318 role, info,
3319 static_cast<Netxx::port_type>(constants::netsync_default_port),
3320 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3321 }
3322 }
3323 catch (Netxx::NetworkException & e)
3324 {
3325 throw informative_failure((F("network error: %s") % e.what()).str());
3326 }
3327 catch (Netxx::Exception & e)
3328 {
3329 throw oops((F("network error: %s") % e.what()).str());;
3330 }
3331}
3332
3333// Local Variables:
3334// mode: C++
3335// fill-column: 76
3336// c-file-style: "gnu"
3337// indent-tabs-mode: nil
3338// End:
3339// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status