monotone

monotone Mtn Source Tree

Root/netsync.cc

1// Copyright (C) 2004 Graydon Hoare <graydon@pobox.com>
2//
3// This program is made available under the GNU GPL version 2.0 or
4// greater. See the accompanying file COPYING for details.
5//
6// This program is distributed WITHOUT ANY WARRANTY; without even the
7// implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
8// PURPOSE.
9
10#include "base.hh"
11#include <map>
12#include <cstdlib>
13#include <memory>
14#include <list>
15#include <deque>
16#include <stack>
17
18#include <time.h>
19
20#include "lexical_cast.hh"
21#include <boost/scoped_ptr.hpp>
22#include <boost/shared_ptr.hpp>
23#include <boost/bind.hpp>
24#include <boost/regex.hpp>
25
26#include "app_state.hh"
27#include "cert.hh"
28#include "constants.hh"
29#include "enumerator.hh"
30#include "keys.hh"
31#include "lua.hh"
32#include "merkle_tree.hh"
33#include "netcmd.hh"
34#include "netio.hh"
35#include "numeric_vocab.hh"
36#include "refiner.hh"
37#include "revision.hh"
38#include "sanity.hh"
39#include "transforms.hh"
40#include "ui.hh"
41#include "xdelta.hh"
42#include "epoch.hh"
43#include "platform.hh"
44#include "hmac.hh"
45#include "globish.hh"
46#include "uri.hh"
47
48#include "botan/botan.h"
49
50#include "netxx/address.h"
51#include "netxx/peer.h"
52#include "netxx/probe.h"
53#include "netxx/socket.h"
54#include "netxx/sockopt.h"
55#include "netxx/stream.h"
56#include "netxx/streamserver.h"
57#include "netxx/timeout.h"
58#include "netxx_pipe.hh"
59// TODO: things to do that will break protocol compatibility
60// -- need some way to upgrade anonymous to keyed pull, without user having
61// to explicitly specify which they want
62// just having a way to respond "access denied, try again" might work
63// but perhaps better to have the anonymous command include a note "I
64// _could_ use key <...> if you prefer", and if that would lead to more
65// access, could reply "I do prefer". (Does this lead to too much
66// information exposure? Allows anonymous people to probe what branches
67// a key has access to.)
68// -- "warning" packet type?
69// -- Richard Levitte wants, when you (e.g.) request '*' but don't have
70// access to all of it, you just get the parts you have access to
71// (maybe with warnings about skipped branches). to do this right,
72// should have a way for the server to send back to the client "right,
73// you're not getting the following branches: ...", so the client will
74// not include them in its merkle trie.
75// -- add some sort of vhost field to the client's first packet, saying who
76// they expect to talk to
77
78//
79// This is the "new" network synchronization (netsync) system in
80// monotone. It is based on synchronizing pairs of merkle trees over an
81// interactive connection.
82//
83// A netsync process between peers treats each peer as either a source, a
84// sink, or both. When a peer is only a source, it will not write any new
85// items to its database. when a peer is only a sink, it will not send any
86// items from its database. When a peer is both a source and sink, it may
87// send and write items freely.
88//
89// The post-state of a netsync is that each sink contains a superset of the
90// items in its corresponding source; when peers are behaving as both
91// source and sink, this means that the post-state of the sync is for the
92// peers to have identical item sets.
93//
94//
95// Data structure
96// --------------
97//
98// Each node in a merkle tree contains a fixed number of slots. this number
99// is derived from a global parameter of the protocol -- the tree fanout --
100// such that the number of slots is 2^fanout. For now we will assume that
101// fanout is 4 thus there are 16 slots in a node, because this makes
102// illustration easier. The other parameter of the protocol is the size of
103// a hash; we use SHA1 so the hash is 20 bytes (160 bits) long.
104//
105// Each slot in a merkle tree node is in one of 3 states:
106//
107// - empty
108// - leaf
109// - subtree
110//
111// In addition, each leaf contains a hash code which identifies an element
112// of the set being synchronized. Each subtree slot contains a hash code of
113// the node immediately beneath it in the merkle tree. Empty slots contain
114// no hash codes.
115//
116// Since empty slots have no hash code, they are represented implicitly by
117// a bitmap at the head of each merkle tree node. As an additional
118// integrity check, each merkle tree node contains a label indicating its
119// prefix in the tree, and a hash of its own contents.
120//
121// In total, then, the byte-level representation of a <160,4> merkle tree
122// node is as follows:
123//
124// 20 bytes - hash of the remaining bytes in the node
125// 1 byte - type of this node (manifest, file, key, mcert, fcert)
126// 1-N bytes - level of this node in the tree (0 == "root", uleb128)
127// 0-20 bytes - the prefix of this node, 4 bits * level,
128// rounded up to a byte
129// 1-N bytes - number of leaves under this node (uleb128)
130// 4 bytes - slot-state bitmap of the node
131// 0-320 bytes - between 0 and 16 live slots in the node
132//
133// So, in the worst case such a node is 367 bytes, with these parameters.
134//
135//
136// Protocol
137// --------
138//
139// The protocol is a binary command-packet system over TCP; each packet
140// consists of a single byte which identifies the protocol version, a byte
141// which identifies the command name inside that version, a size_t sent as
142// a uleb128 indicating the length of the packet, that many bytes of
143// payload, and finally 20 bytes of SHA-1 HMAC calculated over the payload.
144// The key for the SHA-1 HMAC is 20 bytes of 0 during authentication, and a
145// 20-byte random key chosen by the client after authentication (discussed
146// below). Decoding involves simply buffering until a sufficient number of
147// bytes are received, then advancing the buffer pointer. Any time an
148// integrity check (the HMAC) fails, the protocol is assumed to have lost
149// synchronization, and the connection is dropped. The parties are free to
150// drop the TCP stream at any point, if too much data is received or too
151// much idle time passes; no commitments or transactions are made.
152//
153//
154// Authentication and setup
155// ------------------------
156//
157// The exchange begins in a non-authenticated state. The server sends a
158// "hello <id> <nonce>" command, which identifies the server's RSA key and
159// issues a nonce which must be used for a subsequent authentication.
160//
161// The client then responds with either:
162//
163// An "auth (source|sink|both) <include_pattern> <exclude_pattern> <id>
164// <nonce1> <hmac key> <sig>" command, which identifies its RSA key, notes the
165// role it wishes to play in the synchronization, identifies the pattern it
166// wishes to sync with, signs the previous nonce with its own key, and informs
167// the server of the HMAC key it wishes to use for this session (encrypted
168// with the server's public key); or
169//
170// An "anonymous (source|sink|both) <include_pattern> <exclude_pattern>
171// <hmac key>" command, which identifies the role it wishes to play in the
172// synchronization, the pattern it wishes to sync with, and the HMAC key it
173// wishes to use for this session (also encrypted with the server's public
174// key).
175//
176// The server then replies with a "confirm" command, which contains no
177// other data but will only have the correct HMAC integrity code if the
178// server received and properly decrypted the HMAC key offered by the
179// client. This transitions the peers into an authenticated state and
180// begins epoch refinement. If epoch refinement and epoch transmission
181// succeed, the peers switch to data refinement and data transmission.
182//
183//
184// Refinement
185// ----------
186//
187// Refinement is executed by "refiners"; there is a refiner for each
188// set of 'items' being exchanged: epochs, keys, certs, and revisions.
189// When refinement starts, each party knows only their own set of
190// items; when refinement completes, each party has learned of the
191// complete set of items it needs to send, and a count of items it's
192// expecting to receive.
193//
194// For more details on the refinement process, see refiner.cc.
195//
196//
197// Transmission
198// ------------
199//
200// Once the set of items to send has been determined (for keys, certs, and
201// revisions) each peer switches into a transmission mode. This mode
202// involves walking the revision graph in ancestry-order and sending all
203// the items the local peer has which the remote one does not. Since the
204// remote and local peers both know all the items which need to be
205// transferred (they learned during refinement) they know what to wait for
206// and what to send. The mechanisms of the transmission phase (notably,
207// enumerator.cc) simply ensure that things are sent in the proper order,
208// and without over-filling the output buffer too much.
209//
210//
211// Shutdown
212// --------
213//
214// After transmission completes, one special command, "bye", is used to
215// shut down a connection gracefully. The shutdown sequence based on "bye"
216// commands is documented below in session::process_bye_cmd.
217//
218//
219// Note on epochs
220// --------------
221//
222// One refinement and transmission phase preceeds all the others: epochs.
223// Epochs are exchanged and compared in order to be sure that further
224// refinement and transmission (on certs and revisions) makes sense; they
225// are a sort of "immune system" to prevent incompatible databases (say
226// between rebuilds due to bugs in monotone) from cross-contaminating. The
227// later refinements are only kicked off *after* all epochs are received
228// and compare correctly.
229//
230//
231// Note on dense coding
232// --------------------
233//
234// This protocol is "raw binary" (non-text) because coding density is
235// actually important here, and each packet consists of very
236// information-dense material that you wouldn't have a hope of typing in,
237// or interpreting manually anyways.
238//
239
240using std::auto_ptr;
241using std::deque;
242using std::make_pair;
243using std::map;
244using std::min;
245using std::pair;
246using std::set;
247using std::string;
248using std::vector;
249
250using boost::shared_ptr;
251using boost::lexical_cast;
252
253struct server_initiated_sync_request
254{
255 string what;
256 string address;
257 string include;
258 string exclude;
259};
260deque<server_initiated_sync_request> server_initiated_sync_requests;
261LUAEXT(server_request_sync, )
262{
263 char const * w = luaL_checkstring(L, 1);
264 char const * a = luaL_checkstring(L, 2);
265 char const * i = luaL_checkstring(L, 3);
266 char const * e = luaL_checkstring(L, 4);
267 server_initiated_sync_request request;
268 request.what = string(w);
269 request.address = string(a);
270 request.include = string(i);
271 request.exclude = string(e);
272 server_initiated_sync_requests.push_back(request);
273 return 0;
274}
275
276static inline void
277require(bool check, string const & context)
278{
279 if (!check)
280 throw bad_decode(F("check of '%s' failed") % context);
281}
282
283struct netsync_error
284{
285 string msg;
286 netsync_error(string const & s): msg(s) {}
287};
288
289struct
290session:
291 public refiner_callbacks,
292 public enumerator_callbacks
293{
294 protocol_role role;
295 protocol_voice const voice;
296 globish our_include_pattern;
297 globish our_exclude_pattern;
298 globish_matcher our_matcher;
299 app_state & app;
300
301 string peer_id;
302 shared_ptr<Netxx::StreamBase> str;
303
304 string_queue inbuf;
305 // deque of pair<string data, size_t cur_pos>
306 deque< pair<string,size_t> > outbuf;
307 // the total data stored in outbuf - this is
308 // used as a valve to stop too much data
309 // backing up
310 size_t outbuf_size;
311
312 netcmd cmd;
313 bool armed;
314 bool arm();
315
316 id remote_peer_key_hash;
317 rsa_keypair_id remote_peer_key_name;
318 netsync_session_key session_key;
319 chained_hmac read_hmac;
320 chained_hmac write_hmac;
321 bool authenticated;
322
323 time_t last_io_time;
324 auto_ptr<ticker> byte_in_ticker;
325 auto_ptr<ticker> byte_out_ticker;
326 auto_ptr<ticker> cert_in_ticker;
327 auto_ptr<ticker> cert_out_ticker;
328 auto_ptr<ticker> revision_in_ticker;
329 auto_ptr<ticker> revision_out_ticker;
330 size_t bytes_in, bytes_out;
331 size_t certs_in, certs_out;
332 size_t revs_in, revs_out;
333 size_t keys_in, keys_out;
334 // used to identify this session to the netsync hooks.
335 // We can't just use saved_nonce, because that's blank for all
336 // anonymous connections and could lead to confusion.
337 size_t session_id;
338 static size_t session_count;
339
340 vector<revision_id> written_revisions;
341 vector<rsa_keypair_id> written_keys;
342 vector<cert> written_certs;
343
344 id saved_nonce;
345
346 enum
347 {
348 working_state,
349 shutdown_state,
350 confirmed_state
351 }
352 protocol_state;
353
354 bool encountered_error;
355
356 static const int no_error = 200;
357 static const int partial_transfer = 211;
358 static const int no_transfer = 212;
359
360 static const int not_permitted = 412;
361 static const int unknown_key = 422;
362 static const int mixing_versions = 432;
363
364 static const int role_mismatch = 512;
365 static const int bad_command = 521;
366
367 static const int failed_identification = 532;
368 //static const int bad_data = 541;
369
370 int error_code;
371
372 bool set_totals;
373
374 // Interface to refinement.
375 refiner epoch_refiner;
376 refiner key_refiner;
377 refiner cert_refiner;
378 refiner rev_refiner;
379
380 // Interface to ancestry grovelling.
381 revision_enumerator rev_enumerator;
382
383 // Enumerator_callbacks methods.
384 set<file_id> file_items_sent;
385 bool process_this_rev(revision_id const & rev);
386 bool queue_this_cert(hexenc<id> const & c);
387 bool queue_this_file(hexenc<id> const & f);
388 void note_file_data(file_id const & f);
389 void note_file_delta(file_id const & src, file_id const & dst);
390 void note_rev(revision_id const & rev);
391 void note_cert(hexenc<id> const & c);
392
393 session(protocol_role role,
394 protocol_voice voice,
395 globish const & our_include_pattern,
396 globish const & our_exclude_pattern,
397 app_state & app,
398 string const & peer,
399 shared_ptr<Netxx::StreamBase> sock,
400 bool initiated_by_server = false);
401
402 virtual ~session();
403
404 id mk_nonce();
405 void mark_recent_io();
406
407 void set_session_key(string const & key);
408 void set_session_key(rsa_oaep_sha_data const & key_encrypted);
409
410 void setup_client_tickers();
411 bool done_all_refinements();
412 bool queued_all_items();
413 bool received_all_items();
414 bool finished_working();
415 void maybe_step();
416 void maybe_say_goodbye(transaction_guard & guard);
417
418 void note_item_arrived(netcmd_item_type ty, id const & i);
419 void maybe_note_epochs_finished();
420 void note_item_sent(netcmd_item_type ty, id const & i);
421
422 Netxx::Probe::ready_type which_events() const;
423 bool read_some();
424 bool write_some();
425
426 void error(int errcode, string const & errmsg);
427
428 void write_netcmd_and_try_flush(netcmd const & cmd);
429
430 // Outgoing queue-writers.
431 void queue_bye_cmd(u8 phase);
432 void queue_error_cmd(string const & errmsg);
433 void queue_done_cmd(netcmd_item_type type, size_t n_items);
434 void queue_hello_cmd(rsa_keypair_id const & key_name,
435 base64<rsa_pub_key> const & pub_encoded,
436 id const & nonce);
437 void queue_anonymous_cmd(protocol_role role,
438 globish const & include_pattern,
439 globish const & exclude_pattern,
440 id const & nonce2,
441 base64<rsa_pub_key> server_key_encoded);
442 void queue_auth_cmd(protocol_role role,
443 globish const & include_pattern,
444 globish const & exclude_pattern,
445 id const & client,
446 id const & nonce1,
447 id const & nonce2,
448 string const & signature,
449 base64<rsa_pub_key> server_key_encoded);
450 void queue_confirm_cmd();
451 void queue_refine_cmd(refinement_type ty, merkle_node const & node);
452 void queue_data_cmd(netcmd_item_type type,
453 id const & item,
454 string const & dat);
455 void queue_delta_cmd(netcmd_item_type type,
456 id const & base,
457 id const & ident,
458 delta const & del);
459
460 // Incoming dispatch-called methods.
461 bool process_error_cmd(string const & errmsg);
462 bool process_hello_cmd(rsa_keypair_id const & server_keyname,
463 rsa_pub_key const & server_key,
464 id const & nonce);
465 bool process_bye_cmd(u8 phase, transaction_guard & guard);
466 bool process_anonymous_cmd(protocol_role role,
467 globish const & their_include_pattern,
468 globish const & their_exclude_pattern);
469 bool process_auth_cmd(protocol_role role,
470 globish const & their_include_pattern,
471 globish const & their_exclude_pattern,
472 id const & client,
473 id const & nonce1,
474 string const & signature);
475 bool process_refine_cmd(refinement_type ty, merkle_node const & node);
476 bool process_done_cmd(netcmd_item_type type, size_t n_items);
477 bool process_data_cmd(netcmd_item_type type,
478 id const & item,
479 string const & dat);
480 bool process_delta_cmd(netcmd_item_type type,
481 id const & base,
482 id const & ident,
483 delta const & del);
484 bool process_usher_cmd(utf8 const & msg);
485
486 // The incoming dispatcher.
487 bool dispatch_payload(netcmd const & cmd,
488 transaction_guard & guard);
489
490 // Various helpers.
491 void assume_corresponding_role(protocol_role their_role);
492 void respond_to_confirm_cmd();
493 bool data_exists(netcmd_item_type type,
494 id const & item);
495 void load_data(netcmd_item_type type,
496 id const & item,
497 string & out);
498
499 void rebuild_merkle_trees(app_state & app,
500 set<branch_name> const & branches);
501
502 void send_all_data(netcmd_item_type ty, set<id> const & items);
503 void begin_service();
504 bool process(transaction_guard & guard);
505
506 bool initiated_by_server;
507};
508size_t session::session_count = 0;
509
510session::session(protocol_role role,
511 protocol_voice voice,
512 globish const & our_include_pattern,
513 globish const & our_exclude_pattern,
514 app_state & app,
515 string const & peer,
516 shared_ptr<Netxx::StreamBase> sock,
517 bool initiated_by_server) :
518 role(role),
519 voice(voice),
520 our_include_pattern(our_include_pattern),
521 our_exclude_pattern(our_exclude_pattern),
522 our_matcher(our_include_pattern, our_exclude_pattern),
523 app(app),
524 peer_id(peer),
525 str(sock),
526 inbuf(),
527 outbuf_size(0),
528 armed(false),
529 remote_peer_key_hash(""),
530 remote_peer_key_name(""),
531 session_key(constants::netsync_key_initializer),
532 read_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth),
533 write_hmac(constants::netsync_key_initializer, app.opts.use_transport_auth),
534 authenticated(false),
535 last_io_time(::time(NULL)),
536 byte_in_ticker(NULL),
537 byte_out_ticker(NULL),
538 cert_in_ticker(NULL),
539 cert_out_ticker(NULL),
540 revision_in_ticker(NULL),
541 revision_out_ticker(NULL),
542 bytes_in(0), bytes_out(0),
543 certs_in(0), certs_out(0),
544 revs_in(0), revs_out(0),
545 keys_in(0), keys_out(0),
546 session_id(++session_count),
547 saved_nonce(""),
548 protocol_state(working_state),
549 encountered_error(false),
550 error_code(no_transfer),
551 set_totals(false),
552 epoch_refiner(epoch_item, voice, *this),
553 key_refiner(key_item, voice, *this),
554 cert_refiner(cert_item, voice, *this),
555 rev_refiner(revision_item, voice, *this),
556 rev_enumerator(*this, app),
557 initiated_by_server(initiated_by_server)
558{}
559
560session::~session()
561{
562 if (protocol_state == confirmed_state)
563 error_code = no_error;
564 else if (error_code == no_transfer &&
565 (revs_in || revs_out ||
566 certs_in || certs_out ||
567 keys_in || keys_out))
568 error_code = partial_transfer;
569
570 vector<cert> unattached_certs;
571 map<revision_id, vector<cert> > revcerts;
572 for (vector<revision_id>::iterator i = written_revisions.begin();
573 i != written_revisions.end(); ++i)
574 revcerts.insert(make_pair(*i, vector<cert>()));
575 for (vector<cert>::iterator i = written_certs.begin();
576 i != written_certs.end(); ++i)
577 {
578 map<revision_id, vector<cert> >::iterator j;
579 j = revcerts.find(revision_id(i->ident));
580 if (j == revcerts.end())
581 unattached_certs.push_back(*i);
582 else
583 j->second.push_back(*i);
584 }
585
586 // if (role == sink_role || role == source_and_sink_role)
587 if (!written_keys.empty()
588 || !written_revisions.empty()
589 || !written_certs.empty())
590 {
591
592 //Keys
593 for (vector<rsa_keypair_id>::iterator i = written_keys.begin();
594 i != written_keys.end(); ++i)
595 {
596 app.lua.hook_note_netsync_pubkey_received(*i, session_id);
597 }
598
599 //Revisions
600 for (vector<revision_id>::iterator i = written_revisions.begin();
601 i != written_revisions.end(); ++i)
602 {
603 vector<cert> & ctmp(revcerts[*i]);
604 set<pair<rsa_keypair_id, pair<cert_name, cert_value> > > certs;
605 for (vector<cert>::const_iterator j = ctmp.begin();
606 j != ctmp.end(); ++j)
607 {
608 cert_value vtmp;
609 decode_base64(j->value, vtmp);
610 certs.insert(make_pair(j->key, make_pair(j->name, vtmp)));
611 }
612 revision_data rdat;
613 app.db.get_revision(*i, rdat);
614 app.lua.hook_note_netsync_revision_received(*i, rdat, certs,
615 session_id);
616 }
617
618 //Certs (not attached to a new revision)
619 for (vector<cert>::iterator i = unattached_certs.begin();
620 i != unattached_certs.end(); ++i)
621 {
622 cert_value tmp;
623 decode_base64(i->value, tmp);
624 app.lua.hook_note_netsync_cert_received(revision_id(i->ident), i->key,
625 i->name, tmp, session_id);
626 }
627 }
628 app.lua.hook_note_netsync_end(session_id, error_code,
629 bytes_in, bytes_out,
630 certs_in, certs_out,
631 revs_in, revs_out,
632 keys_in, keys_out);
633}
634
635bool
636session::process_this_rev(revision_id const & rev)
637{
638 id item;
639 decode_hexenc(rev.inner(), item);
640 return (rev_refiner.items_to_send.find(item)
641 != rev_refiner.items_to_send.end());
642}
643
644bool
645session::queue_this_cert(hexenc<id> const & c)
646{
647 id item;
648 decode_hexenc(c, item);
649 return (cert_refiner.items_to_send.find(item)
650 != cert_refiner.items_to_send.end());
651}
652
653bool
654session::queue_this_file(hexenc<id> const & f)
655{
656 return file_items_sent.find(file_id(f)) == file_items_sent.end();
657}
658
659void
660session::note_file_data(file_id const & f)
661{
662 if (role == sink_role)
663 return;
664 file_data fd;
665 id item;
666 decode_hexenc(f.inner(), item);
667 app.db.get_file_version(f, fd);
668 queue_data_cmd(file_item, item, fd.inner()());
669 file_items_sent.insert(f);
670}
671
672void
673session::note_file_delta(file_id const & src, file_id const & dst)
674{
675 if (role == sink_role)
676 return;
677 file_delta fdel;
678 id fid1, fid2;
679 decode_hexenc(src.inner(), fid1);
680 decode_hexenc(dst.inner(), fid2);
681 app.db.get_arbitrary_file_delta(src, dst, fdel);
682 queue_delta_cmd(file_item, fid1, fid2, fdel.inner());
683 file_items_sent.insert(dst);
684}
685
686void
687session::note_rev(revision_id const & rev)
688{
689 if (role == sink_role)
690 return;
691 revision_t rs;
692 id item;
693 decode_hexenc(rev.inner(), item);
694 app.db.get_revision(rev, rs);
695 data tmp;
696 write_revision(rs, tmp);
697 queue_data_cmd(revision_item, item, tmp());
698}
699
700void
701session::note_cert(hexenc<id> const & c)
702{
703 if (role == sink_role)
704 return;
705 id item;
706 decode_hexenc(c, item);
707 revision<cert> cert;
708 string str;
709 app.db.get_revision_cert(c, cert);
710 write_cert(cert.inner(), str);
711 queue_data_cmd(cert_item, item, str);
712}
713
714
715id
716session::mk_nonce()
717{
718 I(this->saved_nonce().size() == 0);
719 char buf[constants::merkle_hash_length_in_bytes];
720 Botan::Global_RNG::randomize(reinterpret_cast<Botan::byte *>(buf),
721 constants::merkle_hash_length_in_bytes);
722 this->saved_nonce = id(string(buf, buf + constants::merkle_hash_length_in_bytes));
723 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
724 return this->saved_nonce;
725}
726
727void
728session::mark_recent_io()
729{
730 last_io_time = ::time(NULL);
731}
732
733void
734session::set_session_key(string const & key)
735{
736 session_key = netsync_session_key(key);
737 read_hmac.set_key(session_key);
738 write_hmac.set_key(session_key);
739}
740
741void
742session::set_session_key(rsa_oaep_sha_data const & hmac_key_encrypted)
743{
744 if (app.opts.use_transport_auth)
745 {
746 keypair our_kp;
747 load_key_pair(app, app.opts.signing_key, our_kp);
748 string hmac_key;
749 decrypt_rsa(app.lua, app.opts.signing_key, our_kp.priv,
750 hmac_key_encrypted, hmac_key);
751 set_session_key(hmac_key);
752 }
753}
754
755void
756session::setup_client_tickers()
757{
758 // xgettext: please use short message and try to avoid multibytes chars
759 byte_in_ticker.reset(new ticker(N_("bytes in"), ">", 1024, true));
760 // xgettext: please use short message and try to avoid multibytes chars
761 byte_out_ticker.reset(new ticker(N_("bytes out"), "<", 1024, true));
762 if (role == sink_role)
763 {
764 // xgettext: please use short message and try to avoid multibytes chars
765 cert_in_ticker.reset(new ticker(N_("certs in"), "c", 3));
766 // xgettext: please use short message and try to avoid multibytes chars
767 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
768 }
769 else if (role == source_role)
770 {
771 // xgettext: please use short message and try to avoid multibytes chars
772 cert_out_ticker.reset(new ticker(N_("certs out"), "C", 3));
773 // xgettext: please use short message and try to avoid multibytes chars
774 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
775 }
776 else
777 {
778 I(role == source_and_sink_role);
779 // xgettext: please use short message and try to avoid multibytes chars
780 revision_in_ticker.reset(new ticker(N_("revs in"), "r", 1));
781 // xgettext: please use short message and try to avoid multibytes chars
782 revision_out_ticker.reset(new ticker(N_("revs out"), "R", 1));
783 }
784}
785
786bool
787session::done_all_refinements()
788{
789 bool all = rev_refiner.done
790 && cert_refiner.done
791 && key_refiner.done
792 && epoch_refiner.done;
793
794 if (all && !set_totals)
795 {
796 if (cert_out_ticker.get())
797 cert_out_ticker->set_total(cert_refiner.items_to_send.size());
798
799 if (revision_out_ticker.get())
800 revision_out_ticker->set_total(rev_refiner.items_to_send.size());
801
802 if (cert_in_ticker.get())
803 cert_in_ticker->set_total(cert_refiner.items_to_receive);
804
805 if (revision_in_ticker.get())
806 revision_in_ticker->set_total(rev_refiner.items_to_receive);
807
808 set_totals = true;
809 }
810 return all;
811}
812
813
814
815bool
816session::received_all_items()
817{
818 if (role == source_role)
819 return true;
820 bool all = rev_refiner.items_to_receive == 0
821 && cert_refiner.items_to_receive == 0
822 && key_refiner.items_to_receive == 0
823 && epoch_refiner.items_to_receive == 0;
824 return all;
825}
826
827bool
828session::finished_working()
829{
830 bool all = done_all_refinements()
831 && received_all_items()
832 && queued_all_items()
833 && rev_enumerator.done();
834 return all;
835}
836
837bool
838session::queued_all_items()
839{
840 if (role == sink_role)
841 return true;
842 bool all = rev_refiner.items_to_send.empty()
843 && cert_refiner.items_to_send.empty()
844 && key_refiner.items_to_send.empty()
845 && epoch_refiner.items_to_send.empty();
846 return all;
847}
848
849
850void
851session::maybe_note_epochs_finished()
852{
853 // Maybe there are outstanding epoch requests.
854 // These only matter if we're in sink or source-and-sink mode.
855 if (!(epoch_refiner.items_to_receive == 0) && !(role == source_role))
856 return;
857
858 // And maybe we haven't even finished the refinement.
859 if (!epoch_refiner.done)
860 return;
861
862 // If we ran into an error -- say a mismatched epoch -- don't do any
863 // further refinements.
864 if (encountered_error)
865 return;
866
867 // But otherwise, we're ready to go. Start the next
868 // set of refinements.
869 if (voice == client_voice)
870 {
871 L(FL("epoch refinement finished; beginning other refinements"));
872 key_refiner.begin_refinement();
873 cert_refiner.begin_refinement();
874 rev_refiner.begin_refinement();
875 }
876 else
877 L(FL("epoch refinement finished"));
878}
879
880static void
881decrement_if_nonzero(netcmd_item_type ty,
882 size_t & n)
883{
884 if (n == 0)
885 {
886 string typestr;
887 netcmd_item_type_to_string(ty, typestr);
888 E(false, F("underflow on count of %s items to receive") % typestr);
889 }
890 --n;
891 if (n == 0)
892 {
893 string typestr;
894 netcmd_item_type_to_string(ty, typestr);
895 L(FL("count of %s items to receive has reached zero") % typestr);
896 }
897}
898
899void
900session::note_item_arrived(netcmd_item_type ty, id const & ident)
901{
902 switch (ty)
903 {
904 case cert_item:
905 decrement_if_nonzero(ty, cert_refiner.items_to_receive);
906 if (cert_in_ticker.get() != NULL)
907 ++(*cert_in_ticker);
908 ++certs_in;
909 break;
910 case revision_item:
911 decrement_if_nonzero(ty, rev_refiner.items_to_receive);
912 if (revision_in_ticker.get() != NULL)
913 ++(*revision_in_ticker);
914 ++revs_in;
915 break;
916 case key_item:
917 decrement_if_nonzero(ty, key_refiner.items_to_receive);
918 ++keys_in;
919 break;
920 case epoch_item:
921 decrement_if_nonzero(ty, epoch_refiner.items_to_receive);
922 break;
923 default:
924 // No ticker for other things.
925 break;
926 }
927}
928
929
930
931void
932session::note_item_sent(netcmd_item_type ty, id const & ident)
933{
934 switch (ty)
935 {
936 case cert_item:
937 cert_refiner.items_to_send.erase(ident);
938 if (cert_out_ticker.get() != NULL)
939 ++(*cert_out_ticker);
940 ++certs_out;
941 break;
942 case revision_item:
943 rev_refiner.items_to_send.erase(ident);
944 if (revision_out_ticker.get() != NULL)
945 ++(*revision_out_ticker);
946 ++revs_out;
947 break;
948 case key_item:
949 key_refiner.items_to_send.erase(ident);
950 ++keys_out;
951 break;
952 case epoch_item:
953 epoch_refiner.items_to_send.erase(ident);
954 break;
955 default:
956 // No ticker for other things.
957 break;
958 }
959}
960
961void
962session::write_netcmd_and_try_flush(netcmd const & cmd)
963{
964 if (!encountered_error)
965 {
966 string buf;
967 cmd.write(buf, write_hmac);
968 outbuf.push_back(make_pair(buf, 0));
969 outbuf_size += buf.size();
970 }
971 else
972 L(FL("dropping outgoing netcmd (because we're in error unwind mode)"));
973 // FIXME: this helps keep the protocol pipeline full but it seems to
974 // interfere with initial and final sequences. careful with it.
975 // write_some();
976 // read_some();
977}
978
979// This method triggers a special "error unwind" mode to netsync. In this
980// mode, all received data is ignored, and no new data is queued. We simply
981// stay connected long enough for the current write buffer to be flushed, to
982// ensure that our peer receives the error message.
983// Affects read_some, write_some, and process .
984void
985session::error(int errcode, string const & errmsg)
986{
987 error_code = errcode;
988 throw netsync_error(errmsg);
989}
990
991Netxx::Probe::ready_type
992session::which_events() const
993{
994 // Only ask to read if we're not armed.
995 if (outbuf.empty())
996 {
997 if (inbuf.size() < constants::netcmd_maxsz && !armed)
998 return Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
999 else
1000 return Netxx::Probe::ready_oobd;
1001 }
1002 else
1003 {
1004 if (inbuf.size() < constants::netcmd_maxsz && !armed)
1005 return Netxx::Probe::ready_write | Netxx::Probe::ready_read | Netxx::Probe::ready_oobd;
1006 else
1007 return Netxx::Probe::ready_write | Netxx::Probe::ready_oobd;
1008 }
1009}
1010
1011bool
1012session::read_some()
1013{
1014 I(inbuf.size() < constants::netcmd_maxsz);
1015 char tmp[constants::bufsz];
1016 Netxx::signed_size_type count = str->read(tmp, sizeof(tmp));
1017 if (count > 0)
1018 {
1019 L(FL("read %d bytes from fd %d (peer %s)") % count % str->get_socketfd() % peer_id);
1020 if (encountered_error)
1021 {
1022 L(FL("in error unwind mode, so throwing them into the bit bucket"));
1023 return true;
1024 }
1025 inbuf.append(tmp,count);
1026 mark_recent_io();
1027 if (byte_in_ticker.get() != NULL)
1028 (*byte_in_ticker) += count;
1029 bytes_in += count;
1030 return true;
1031 }
1032 else
1033 return false;
1034}
1035
1036bool
1037session::write_some()
1038{
1039 I(!outbuf.empty());
1040 size_t writelen = outbuf.front().first.size() - outbuf.front().second;
1041 Netxx::signed_size_type count = str->write(outbuf.front().first.data() + outbuf.front().second,
1042 min(writelen,
1043 constants::bufsz));
1044 if (count > 0)
1045 {
1046 if ((size_t)count == writelen)
1047 {
1048 outbuf_size -= outbuf.front().first.size();
1049 outbuf.pop_front();
1050 }
1051 else
1052 {
1053 outbuf.front().second += count;
1054 }
1055 L(FL("wrote %d bytes to fd %d (peer %s)")
1056 % count % str->get_socketfd() % peer_id);
1057 mark_recent_io();
1058 if (byte_out_ticker.get() != NULL)
1059 (*byte_out_ticker) += count;
1060 bytes_out += count;
1061 if (encountered_error && outbuf.empty())
1062 {
1063 // we've flushed our error message, so it's time to get out.
1064 L(FL("finished flushing output queue in error unwind mode, disconnecting"));
1065 return false;
1066 }
1067 return true;
1068 }
1069 else
1070 return false;
1071}
1072
1073// senders
1074
1075void
1076session::queue_error_cmd(string const & errmsg)
1077{
1078 L(FL("queueing 'error' command"));
1079 netcmd cmd;
1080 cmd.write_error_cmd(errmsg);
1081 write_netcmd_and_try_flush(cmd);
1082}
1083
1084void
1085session::queue_bye_cmd(u8 phase)
1086{
1087 L(FL("queueing 'bye' command, phase %d")
1088 % static_cast<size_t>(phase));
1089 netcmd cmd;
1090 cmd.write_bye_cmd(phase);
1091 write_netcmd_and_try_flush(cmd);
1092}
1093
1094void
1095session::queue_done_cmd(netcmd_item_type type,
1096 size_t n_items)
1097{
1098 string typestr;
1099 netcmd_item_type_to_string(type, typestr);
1100 L(FL("queueing 'done' command for %s (%d items)")
1101 % typestr % n_items);
1102 netcmd cmd;
1103 cmd.write_done_cmd(type, n_items);
1104 write_netcmd_and_try_flush(cmd);
1105}
1106
1107void
1108session::queue_hello_cmd(rsa_keypair_id const & key_name,
1109 base64<rsa_pub_key> const & pub_encoded,
1110 id const & nonce)
1111{
1112 rsa_pub_key pub;
1113 if (app.opts.use_transport_auth)
1114 decode_base64(pub_encoded, pub);
1115 cmd.write_hello_cmd(key_name, pub, nonce);
1116 write_netcmd_and_try_flush(cmd);
1117}
1118
1119void
1120session::queue_anonymous_cmd(protocol_role role,
1121 globish const & include_pattern,
1122 globish const & exclude_pattern,
1123 id const & nonce2,
1124 base64<rsa_pub_key> server_key_encoded)
1125{
1126 netcmd cmd;
1127 rsa_oaep_sha_data hmac_key_encrypted;
1128 if (app.opts.use_transport_auth)
1129 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1130 nonce2(), hmac_key_encrypted);
1131 cmd.write_anonymous_cmd(role, include_pattern, exclude_pattern,
1132 hmac_key_encrypted);
1133 write_netcmd_and_try_flush(cmd);
1134 set_session_key(nonce2());
1135}
1136
1137void
1138session::queue_auth_cmd(protocol_role role,
1139 globish const & include_pattern,
1140 globish const & exclude_pattern,
1141 id const & client,
1142 id const & nonce1,
1143 id const & nonce2,
1144 string const & signature,
1145 base64<rsa_pub_key> server_key_encoded)
1146{
1147 netcmd cmd;
1148 rsa_oaep_sha_data hmac_key_encrypted;
1149 I(app.opts.use_transport_auth);
1150 encrypt_rsa(app.lua, remote_peer_key_name, server_key_encoded,
1151 nonce2(), hmac_key_encrypted);
1152 cmd.write_auth_cmd(role, include_pattern, exclude_pattern, client,
1153 nonce1, hmac_key_encrypted, signature);
1154 write_netcmd_and_try_flush(cmd);
1155 set_session_key(nonce2());
1156}
1157
1158void
1159session::queue_confirm_cmd()
1160{
1161 netcmd cmd;
1162 cmd.write_confirm_cmd();
1163 write_netcmd_and_try_flush(cmd);
1164}
1165
1166void
1167session::queue_refine_cmd(refinement_type ty, merkle_node const & node)
1168{
1169 string typestr;
1170 hexenc<prefix> hpref;
1171 node.get_hex_prefix(hpref);
1172 netcmd_item_type_to_string(node.type, typestr);
1173 L(FL("queueing refinement %s of %s node '%s', level %d")
1174 % (ty == refinement_query ? "query" : "response")
1175 % typestr % hpref % static_cast<int>(node.level));
1176 netcmd cmd;
1177 cmd.write_refine_cmd(ty, node);
1178 write_netcmd_and_try_flush(cmd);
1179}
1180
1181void
1182session::queue_data_cmd(netcmd_item_type type,
1183 id const & item,
1184 string const & dat)
1185{
1186 string typestr;
1187 netcmd_item_type_to_string(type, typestr);
1188 hexenc<id> hid;
1189 encode_hexenc(item, hid);
1190
1191 if (role == sink_role)
1192 {
1193 L(FL("not queueing %s data for '%s' as we are in pure sink role")
1194 % typestr % hid);
1195 return;
1196 }
1197
1198 L(FL("queueing %d bytes of data for %s item '%s'")
1199 % dat.size() % typestr % hid);
1200
1201 netcmd cmd;
1202 // TODO: This pair of functions will make two copies of a large
1203 // file, the first in cmd.write_data_cmd, and the second in
1204 // write_netcmd_and_try_flush when the data is copied from the
1205 // cmd.payload variable to the string buffer for output. This
1206 // double copy should be collapsed out, it may be better to use
1207 // a string_queue for output as well as input, as that will reduce
1208 // the amount of mallocs that happen when the string queue is large
1209 // enough to just store the data.
1210 cmd.write_data_cmd(type, item, dat);
1211 write_netcmd_and_try_flush(cmd);
1212 note_item_sent(type, item);
1213}
1214
1215void
1216session::queue_delta_cmd(netcmd_item_type type,
1217 id const & base,
1218 id const & ident,
1219 delta const & del)
1220{
1221 I(type == file_item);
1222 string typestr;
1223 netcmd_item_type_to_string(type, typestr);
1224 hexenc<id> base_hid;
1225 encode_hexenc(base, base_hid);
1226 hexenc<id> ident_hid;
1227 encode_hexenc(ident, ident_hid);
1228
1229 if (role == sink_role)
1230 {
1231 L(FL("not queueing %s delta '%s' -> '%s' as we are in pure sink role")
1232 % typestr % base_hid % ident_hid);
1233 return;
1234 }
1235
1236 L(FL("queueing %s delta '%s' -> '%s'")
1237 % typestr % base_hid % ident_hid);
1238 netcmd cmd;
1239 cmd.write_delta_cmd(type, base, ident, del);
1240 write_netcmd_and_try_flush(cmd);
1241 note_item_sent(type, ident);
1242}
1243
1244
1245// processors
1246
1247bool
1248session::process_error_cmd(string const & errmsg)
1249{
1250 // "xxx string" with xxx being digits means there's an error code
1251 if (errmsg.size() > 4 && errmsg.substr(3,1) == " ")
1252 {
1253 try
1254 {
1255 int err = boost::lexical_cast<int>(errmsg.substr(0,3));
1256 if (err >= 100)
1257 {
1258 error_code = err;
1259 throw bad_decode(F("received network error: %s")
1260 % errmsg.substr(4));
1261 }
1262 }
1263 catch (boost::bad_lexical_cast)
1264 { // ok, so it wasn't a number
1265 }
1266 }
1267 throw bad_decode(F("received network error: %s") % errmsg);
1268}
1269
1270static const var_domain known_servers_domain = var_domain("known-servers");
1271
1272bool
1273session::process_hello_cmd(rsa_keypair_id const & their_keyname,
1274 rsa_pub_key const & their_key,
1275 id const & nonce)
1276{
1277 I(this->remote_peer_key_hash().size() == 0);
1278 I(this->saved_nonce().size() == 0);
1279
1280 base64<rsa_pub_key> their_key_encoded;
1281
1282 if (app.opts.use_transport_auth)
1283 {
1284 hexenc<id> their_key_hash;
1285 encode_base64(their_key, their_key_encoded);
1286 key_hash_code(their_keyname, their_key_encoded, their_key_hash);
1287 L(FL("server key has name %s, hash %s") % their_keyname % their_key_hash);
1288 var_key their_key_key(known_servers_domain, var_name(peer_id));
1289 if (app.db.var_exists(their_key_key))
1290 {
1291 var_value expected_key_hash;
1292 app.db.get_var(their_key_key, expected_key_hash);
1293 if (expected_key_hash() != their_key_hash())
1294 {
1295 P(F("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1296 "@ WARNING: SERVER IDENTIFICATION HAS CHANGED @\n"
1297 "@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@\n"
1298 "IT IS POSSIBLE THAT SOMEONE IS DOING SOMETHING NASTY\n"
1299 "it is also possible that the server key has just been changed\n"
1300 "remote host sent key %s\n"
1301 "I expected %s\n"
1302 "'%s unset %s %s' overrides this check")
1303 % their_key_hash % expected_key_hash
1304 % ui.prog_name % their_key_key.first % their_key_key.second);
1305 E(false, F("server key changed"));
1306 }
1307 }
1308 else
1309 {
1310 P(F("first time connecting to server %s\n"
1311 "I'll assume it's really them, but you might want to double-check\n"
1312 "their key's fingerprint: %s") % peer_id % their_key_hash);
1313 app.db.set_var(their_key_key, var_value(their_key_hash()));
1314 }
1315 if (app.db.put_key(their_keyname, their_key_encoded))
1316 W(F("saving public key for %s to database") % their_keyname);
1317
1318 {
1319 hexenc<id> hnonce;
1320 encode_hexenc(nonce, hnonce);
1321 L(FL("received 'hello' netcmd from server '%s' with nonce '%s'")
1322 % their_key_hash % hnonce);
1323 }
1324
1325 I(app.db.public_key_exists(their_key_hash));
1326
1327 // save their identity
1328 id their_key_hash_decoded;
1329 decode_hexenc(their_key_hash, their_key_hash_decoded);
1330 this->remote_peer_key_hash = their_key_hash_decoded;
1331 }
1332
1333 // clients always include in the synchronization set, every branch that the
1334 // user requested
1335 set<branch_name> all_branches, ok_branches;
1336 app.get_project().get_branch_list(all_branches);
1337 for (set<branch_name>::const_iterator i = all_branches.begin();
1338 i != all_branches.end(); i++)
1339 {
1340 if (our_matcher((*i)()))
1341 ok_branches.insert(*i);
1342 }
1343 rebuild_merkle_trees(app, ok_branches);
1344
1345 if (!initiated_by_server)
1346 setup_client_tickers();
1347
1348 if (app.opts.use_transport_auth &&
1349 app.opts.signing_key() != "")
1350 {
1351 // get our key pair
1352 keypair our_kp;
1353 load_key_pair(app, app.opts.signing_key, our_kp);
1354
1355 // get the hash identifier for our pubkey
1356 hexenc<id> our_key_hash;
1357 id our_key_hash_raw;
1358 key_hash_code(app.opts.signing_key, our_kp.pub, our_key_hash);
1359 decode_hexenc(our_key_hash, our_key_hash_raw);
1360
1361 // make a signature
1362 base64<rsa_sha1_signature> sig;
1363 rsa_sha1_signature sig_raw;
1364 make_signature(app, app.opts.signing_key, our_kp.priv, nonce(), sig);
1365 decode_base64(sig, sig_raw);
1366
1367 // make a new nonce of our own and send off the 'auth'
1368 queue_auth_cmd(this->role, our_include_pattern, our_exclude_pattern,
1369 our_key_hash_raw, nonce, mk_nonce(), sig_raw(),
1370 their_key_encoded);
1371 }
1372 else
1373 {
1374 queue_anonymous_cmd(this->role, our_include_pattern,
1375 our_exclude_pattern, mk_nonce(), their_key_encoded);
1376 }
1377
1378 app.lua.hook_note_netsync_start(session_id, "client", this->role,
1379 peer_id, their_keyname,
1380 our_include_pattern, our_exclude_pattern);
1381
1382 return true;
1383}
1384
1385bool
1386session::process_anonymous_cmd(protocol_role their_role,
1387 globish const & their_include_pattern,
1388 globish const & their_exclude_pattern)
1389{
1390 // Internally netsync thinks in terms of sources and sinks. Users like
1391 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1392 //
1393 // We therefore use the read/write terminology when dealing with the UI:
1394 // if the user asks to run a "read only" service, this means they are
1395 // willing to be a source but not a sink.
1396 //
1397 // nb: The "role" here is the role the *client* wants to play
1398 // so we need to check that the opposite role is allowed for us,
1399 // in our this->role field.
1400 //
1401
1402 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1403 peer_id, rsa_keypair_id(),
1404 their_include_pattern, their_exclude_pattern);
1405
1406 // Client must be a sink and server must be a source (anonymous
1407 // read-only), unless transport auth is disabled.
1408 //
1409 // If running in no-transport-auth mode, we operate anonymously and
1410 // permit adoption of any role.
1411
1412 if (app.opts.use_transport_auth)
1413 {
1414 if (their_role != sink_role)
1415 {
1416 this->saved_nonce = id("");
1417 error(not_permitted,
1418 F("rejected attempt at anonymous connection for write").str());
1419 }
1420
1421 if (this->role == sink_role)
1422 {
1423 this->saved_nonce = id("");
1424 error(role_mismatch,
1425 F("rejected attempt at anonymous connection while running as sink").str());
1426 }
1427 }
1428
1429 set<branch_name> all_branches, ok_branches;
1430 app.get_project().get_branch_list(all_branches);
1431 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1432 for (set<branch_name>::const_iterator i = all_branches.begin();
1433 i != all_branches.end(); i++)
1434 {
1435 if (their_matcher((*i)()))
1436 if (app.opts.use_transport_auth &&
1437 !app.lua.hook_get_netsync_read_permitted((*i)()))
1438 {
1439 error(not_permitted,
1440 (F("anonymous access to branch '%s' denied by server") % *i).str());
1441 }
1442 else
1443 ok_branches.insert(*i);
1444 }
1445
1446 if (app.opts.use_transport_auth)
1447 {
1448 P(F("allowed anonymous read permission for '%s' excluding '%s'")
1449 % their_include_pattern % their_exclude_pattern);
1450 this->role = source_role;
1451 }
1452 else
1453 {
1454 P(F("allowed anonymous read/write permission for '%s' excluding '%s'")
1455 % their_include_pattern % their_exclude_pattern);
1456 assume_corresponding_role(their_role);
1457 }
1458
1459 rebuild_merkle_trees(app, ok_branches);
1460
1461 this->remote_peer_key_name = rsa_keypair_id("");
1462 this->authenticated = true;
1463 return true;
1464}
1465
1466void
1467session::assume_corresponding_role(protocol_role their_role)
1468{
1469 // Assume the (possibly degraded) opposite role.
1470 switch (their_role)
1471 {
1472 case source_role:
1473 I(this->role != source_role);
1474 this->role = sink_role;
1475 break;
1476
1477 case source_and_sink_role:
1478 I(this->role == source_and_sink_role);
1479 break;
1480
1481 case sink_role:
1482 I(this->role != sink_role);
1483 this->role = source_role;
1484 break;
1485 }
1486}
1487
1488bool
1489session::process_auth_cmd(protocol_role their_role,
1490 globish const & their_include_pattern,
1491 globish const & their_exclude_pattern,
1492 id const & client,
1493 id const & nonce1,
1494 string const & signature)
1495{
1496 I(this->remote_peer_key_hash().size() == 0);
1497 I(this->saved_nonce().size() == constants::merkle_hash_length_in_bytes);
1498
1499 hexenc<id> their_key_hash;
1500 encode_hexenc(client, their_key_hash);
1501
1502 globish_matcher their_matcher(their_include_pattern, their_exclude_pattern);
1503
1504 if (!app.db.public_key_exists(their_key_hash))
1505 {
1506 // If it's not in the db, it still could be in the keystore if we
1507 // have the private key that goes with it.
1508 if (!app.keys.try_ensure_in_db(their_key_hash))
1509 {
1510 this->saved_nonce = id("");
1511
1512 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1513 peer_id, rsa_keypair_id("-unknown-"),
1514 their_include_pattern,
1515 their_exclude_pattern);
1516 error(unknown_key,
1517 (F("remote public key hash '%s' is unknown") % their_key_hash).str());
1518 }
1519 }
1520
1521 // Get their public key.
1522 rsa_keypair_id their_id;
1523 base64<rsa_pub_key> their_key;
1524 app.db.get_pubkey(their_key_hash, their_id, their_key);
1525
1526 app.lua.hook_note_netsync_start(session_id, "server", their_role,
1527 peer_id, their_id,
1528 their_include_pattern, their_exclude_pattern);
1529
1530 // Check that they replied with the nonce we asked for.
1531 if (!(nonce1 == this->saved_nonce))
1532 {
1533 this->saved_nonce = id("");
1534 error(failed_identification,
1535 F("detected replay attack in auth netcmd").str());
1536 }
1537
1538 // Internally netsync thinks in terms of sources and sinks. users like
1539 // thinking of repositories as "readonly", "readwrite", or "writeonly".
1540 //
1541 // We therefore use the read/write terminology when dealing with the UI:
1542 // if the user asks to run a "read only" service, this means they are
1543 // willing to be a source but not a sink.
1544 //
1545 // nb: The "their_role" here is the role the *client* wants to play
1546 // so we need to check that the opposite role is allowed for us,
1547 // in our this->role field.
1548
1549 // Client as sink, server as source (reading).
1550
1551 if (their_role == sink_role || their_role == source_and_sink_role)
1552 {
1553 if (this->role != source_role && this->role != source_and_sink_role)
1554 {
1555 this->saved_nonce = id("");
1556 error(not_permitted,
1557 (F("denied '%s' read permission for '%s' excluding '%s' while running as pure sink")
1558 % their_id % their_include_pattern % their_exclude_pattern).str());
1559 }
1560 }
1561
1562 set<branch_name> all_branches, ok_branches;
1563 app.get_project().get_branch_list(all_branches);
1564 for (set<branch_name>::const_iterator i = all_branches.begin();
1565 i != all_branches.end(); i++)
1566 {
1567 if (their_matcher((*i)()))
1568 {
1569 if (!app.lua.hook_get_netsync_read_permitted((*i)(), their_id))
1570 {
1571 error(not_permitted,
1572 (F("denied '%s' read permission for '%s' excluding '%s' because of branch '%s'")
1573 % their_id % their_include_pattern % their_exclude_pattern % *i).str());
1574 }
1575 else
1576 ok_branches.insert(*i);
1577 }
1578 }
1579
1580 // If we're source_and_sink_role, continue even with no branches readable
1581 // eg. serve --db=empty.db
1582 P(F("allowed '%s' read permission for '%s' excluding '%s'")
1583 % their_id % their_include_pattern % their_exclude_pattern);
1584
1585 // Client as source, server as sink (writing).
1586
1587 if (their_role == source_role || their_role == source_and_sink_role)
1588 {
1589 if (this->role != sink_role && this->role != source_and_sink_role)
1590 {
1591 this->saved_nonce = id("");
1592 error(not_permitted,
1593 (F("denied '%s' write permission for '%s' excluding '%s' while running as pure source")
1594 % their_id % their_include_pattern % their_exclude_pattern).str());
1595 }
1596
1597 if (!app.lua.hook_get_netsync_write_permitted(their_id))
1598 {
1599 this->saved_nonce = id("");
1600 error(not_permitted,
1601 (F("denied '%s' write permission for '%s' excluding '%s'")
1602 % their_id % their_include_pattern % their_exclude_pattern).str());
1603 }
1604
1605 P(F("allowed '%s' write permission for '%s' excluding '%s'")
1606 % their_id % their_include_pattern % their_exclude_pattern);
1607 }
1608
1609 rebuild_merkle_trees(app, ok_branches);
1610
1611 // Save their identity.
1612 this->remote_peer_key_hash = client;
1613
1614 // Check the signature.
1615 base64<rsa_sha1_signature> sig;
1616 encode_base64(rsa_sha1_signature(signature), sig);
1617 if (check_signature(app, their_id, their_key, nonce1(), sig))
1618 {
1619 // Get our private key and sign back.
1620 L(FL("client signature OK, accepting authentication"));
1621 this->authenticated = true;
1622 this->remote_peer_key_name = their_id;
1623
1624 assume_corresponding_role(their_role);
1625 return true;
1626 }
1627 else
1628 {
1629 error(failed_identification, (F("bad client signature")).str());
1630 }
1631 return false;
1632}
1633
1634bool
1635session::process_refine_cmd(refinement_type ty, merkle_node const & node)
1636{
1637 string typestr;
1638 netcmd_item_type_to_string(node.type, typestr);
1639 L(FL("processing refine cmd for %s node at level %d")
1640 % typestr % node.level);
1641
1642 switch (node.type)
1643 {
1644 case file_item:
1645 W(F("Unexpected 'refine' command on non-refined item type"));
1646 break;
1647
1648 case key_item:
1649 key_refiner.process_refinement_command(ty, node);
1650 break;
1651
1652 case revision_item:
1653 rev_refiner.process_refinement_command(ty, node);
1654 break;
1655
1656 case cert_item:
1657 cert_refiner.process_refinement_command(ty, node);
1658 break;
1659
1660 case epoch_item:
1661 epoch_refiner.process_refinement_command(ty, node);
1662 break;
1663 }
1664 return true;
1665}
1666
1667bool
1668session::process_bye_cmd(u8 phase,
1669 transaction_guard & guard)
1670{
1671
1672// Ideal shutdown
1673// ~~~~~~~~~~~~~~~
1674//
1675// I/O events state transitions
1676// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~
1677// client: C_WORKING
1678// server: S_WORKING
1679// 0. [refinement, data, deltas, etc.]
1680// client: C_SHUTDOWN
1681// (client checkpoints here)
1682// 1. client -> "bye 0"
1683// 2. "bye 0" -> server
1684// server: S_SHUTDOWN
1685// (server checkpoints here)
1686// 3. "bye 1" <- server
1687// 4. client <- "bye 1"
1688// client: C_CONFIRMED
1689// 5. client -> "bye 2"
1690// 6. "bye 2" -> server
1691// server: S_CONFIRMED
1692// 7. [server drops connection]
1693//
1694//
1695// Affects of I/O errors or disconnections
1696// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1697// C_WORKING: report error and fault
1698// S_WORKING: report error and recover
1699// C_SHUTDOWN: report error and fault
1700// S_SHUTDOWN: report success and recover
1701// (and warn that client might falsely see error)
1702// C_CONFIRMED: report success
1703// S_CONFIRMED: report success
1704
1705 switch (phase)
1706 {
1707 case 0:
1708 if (voice == server_voice &&
1709 protocol_state == working_state)
1710 {
1711 protocol_state = shutdown_state;
1712 guard.do_checkpoint();
1713 queue_bye_cmd(1);
1714 }
1715 else
1716 error(bad_command, "unexpected bye phase 0 received");
1717 break;
1718
1719 case 1:
1720 if (voice == client_voice &&
1721 protocol_state == shutdown_state)
1722 {
1723 protocol_state = confirmed_state;
1724 queue_bye_cmd(2);
1725 }
1726 else
1727 error(bad_command, "unexpected bye phase 1 received");
1728 break;
1729
1730 case 2:
1731 if (voice == server_voice &&
1732 protocol_state == shutdown_state)
1733 {
1734 protocol_state = confirmed_state;
1735 return false;
1736 }
1737 else
1738 error(bad_command, "unexpected bye phase 2 received");
1739 break;
1740
1741 default:
1742 error(bad_command, (F("unknown bye phase %d received") % phase).str());
1743 }
1744
1745 return true;
1746}
1747
1748bool
1749session::process_done_cmd(netcmd_item_type type, size_t n_items)
1750{
1751 string typestr;
1752 netcmd_item_type_to_string(type, typestr);
1753 L(FL("received 'done' command for %s (%s items)") % typestr % n_items);
1754 switch (type)
1755 {
1756 case file_item:
1757 W(F("Unexpected 'done' command on non-refined item type"));
1758 break;
1759
1760 case key_item:
1761 key_refiner.process_done_command(n_items);
1762 if (key_refiner.done && role != sink_role)
1763 send_all_data(key_item, key_refiner.items_to_send);
1764 break;
1765
1766 case revision_item:
1767 rev_refiner.process_done_command(n_items);
1768 break;
1769
1770 case cert_item:
1771 cert_refiner.process_done_command(n_items);
1772 break;
1773
1774 case epoch_item:
1775 epoch_refiner.process_done_command(n_items);
1776 if (epoch_refiner.done)
1777 {
1778 send_all_data(epoch_item, epoch_refiner.items_to_send);
1779 maybe_note_epochs_finished();
1780 }
1781 break;
1782 }
1783 return true;
1784}
1785
1786void
1787session::respond_to_confirm_cmd()
1788{
1789 epoch_refiner.begin_refinement();
1790}
1791
1792bool
1793session::data_exists(netcmd_item_type type,
1794 id const & item)
1795{
1796 hexenc<id> hitem;
1797 encode_hexenc(item, hitem);
1798 switch (type)
1799 {
1800 case key_item:
1801 return key_refiner.local_item_exists(item)
1802 || app.db.public_key_exists(hitem);
1803 case file_item:
1804 return app.db.file_version_exists(file_id(hitem));
1805 case revision_item:
1806 return rev_refiner.local_item_exists(item)
1807 || app.db.revision_exists(revision_id(hitem));
1808 case cert_item:
1809 return cert_refiner.local_item_exists(item)
1810 || app.db.revision_cert_exists(hitem);
1811 case epoch_item:
1812 return epoch_refiner.local_item_exists(item)
1813 || app.db.epoch_exists(epoch_id(hitem));
1814 }
1815 return false;
1816}
1817
1818void
1819session::load_data(netcmd_item_type type,
1820 id const & item,
1821 string & out)
1822{
1823 string typestr;
1824 netcmd_item_type_to_string(type, typestr);
1825 hexenc<id> hitem;
1826 encode_hexenc(item, hitem);
1827
1828 if (!data_exists(type, item))
1829 throw bad_decode(F("%s with hash '%s' does not exist in our database")
1830 % typestr % hitem);
1831
1832 switch (type)
1833 {
1834 case epoch_item:
1835 {
1836 branch_name branch;
1837 epoch_data epoch;
1838 app.db.get_epoch(epoch_id(hitem), branch, epoch);
1839 write_epoch(branch, epoch, out);
1840 }
1841 break;
1842 case key_item:
1843 {
1844 rsa_keypair_id keyid;
1845 base64<rsa_pub_key> pub_encoded;
1846 app.db.get_pubkey(hitem, keyid, pub_encoded);
1847 L(FL("public key '%s' is also called '%s'") % hitem % keyid);
1848 write_pubkey(keyid, pub_encoded, out);
1849 }
1850 break;
1851
1852 case revision_item:
1853 {
1854 revision_data mdat;
1855 data dat;
1856 app.db.get_revision(revision_id(hitem), mdat);
1857 out = mdat.inner()();
1858 }
1859 break;
1860
1861 case file_item:
1862 {
1863 file_data fdat;
1864 data dat;
1865 app.db.get_file_version(file_id(hitem), fdat);
1866 out = fdat.inner()();
1867 }
1868 break;
1869
1870 case cert_item:
1871 {
1872 revision<cert> c;
1873 app.db.get_revision_cert(hitem, c);
1874 string tmp;
1875 write_cert(c.inner(), out);
1876 }
1877 break;
1878 }
1879}
1880
1881bool
1882session::process_data_cmd(netcmd_item_type type,
1883 id const & item,
1884 string const & dat)
1885{
1886 hexenc<id> hitem;
1887 encode_hexenc(item, hitem);
1888
1889 string typestr;
1890 netcmd_item_type_to_string(type, typestr);
1891
1892 note_item_arrived(type, item);
1893 if (data_exists(type, item))
1894 {
1895 L(FL("%s '%s' already exists in our database") % typestr % hitem);
1896 if (type == epoch_item)
1897 maybe_note_epochs_finished();
1898 return true;
1899 }
1900
1901 switch (type)
1902 {
1903 case epoch_item:
1904 {
1905 branch_name branch;
1906 epoch_data epoch;
1907 read_epoch(dat, branch, epoch);
1908 L(FL("received epoch %s for branch %s") % epoch % branch);
1909 map<branch_name, epoch_data> epochs;
1910 app.db.get_epochs(epochs);
1911 map<branch_name, epoch_data>::const_iterator i;
1912 i = epochs.find(branch);
1913 if (i == epochs.end())
1914 {
1915 L(FL("branch %s has no epoch; setting epoch to %s") % branch % epoch);
1916 app.db.set_epoch(branch, epoch);
1917 }
1918 else
1919 {
1920 L(FL("branch %s already has an epoch; checking") % branch);
1921 // If we get here, then we know that the epoch must be
1922 // different, because if it were the same then the
1923 // if (epoch_exists()) branch up above would have been taken.
1924 // if somehow this is wrong, then we have broken epoch
1925 // hashing or something, which is very dangerous, so play it
1926 // safe...
1927 I(!(i->second == epoch));
1928
1929 // It is safe to call 'error' here, because if we get here,
1930 // then the current netcmd packet cannot possibly have
1931 // written anything to the database.
1932 error(mixing_versions,
1933 (F("Mismatched epoch on branch %s."
1934 " Server has '%s', client has '%s'.")
1935 % branch
1936 % (voice == server_voice ? i->second : epoch)
1937 % (voice == server_voice ? epoch : i->second)).str());
1938 }
1939 }
1940 maybe_note_epochs_finished();
1941 break;
1942
1943 case key_item:
1944 {
1945 rsa_keypair_id keyid;
1946 base64<rsa_pub_key> pub;
1947 read_pubkey(dat, keyid, pub);
1948 hexenc<id> tmp;
1949 key_hash_code(keyid, pub, tmp);
1950 if (! (tmp == hitem))
1951 throw bad_decode(F("hash check failed for public key '%s' (%s);"
1952 " wanted '%s' got '%s'")
1953 % hitem % keyid % hitem % tmp);
1954 if (app.db.put_key(keyid, pub))
1955 written_keys.push_back(keyid);
1956 }
1957 break;
1958
1959 case cert_item:
1960 {
1961 cert c;
1962 read_cert(dat, c);
1963 hexenc<id> tmp;
1964 cert_hash_code(c, tmp);
1965 if (! (tmp == hitem))
1966 throw bad_decode(F("hash check failed for revision cert '%s'") % hitem);
1967 if (app.db.put_revision_cert(revision<cert>(c)))
1968 written_certs.push_back(c);
1969 }
1970 break;
1971
1972 case revision_item:
1973 {
1974 L(FL("received revision '%s'") % hitem);
1975 if (app.db.put_revision(revision_id(hitem), revision_data(dat)))
1976 written_revisions.push_back(revision_id(hitem));
1977 }
1978 break;
1979
1980 case file_item:
1981 {
1982 L(FL("received file '%s'") % hitem);
1983 app.db.put_file(file_id(hitem), file_data(dat));
1984 }
1985 break;
1986 }
1987 return true;
1988}
1989
1990bool
1991session::process_delta_cmd(netcmd_item_type type,
1992 id const & base,
1993 id const & ident,
1994 delta const & del)
1995{
1996 string typestr;
1997 netcmd_item_type_to_string(type, typestr);
1998 hexenc<id> hbase, hident;
1999 encode_hexenc(base, hbase);
2000 encode_hexenc(ident, hident);
2001
2002 pair<id,id> id_pair = make_pair(base, ident);
2003
2004 note_item_arrived(type, ident);
2005
2006 switch (type)
2007 {
2008 case file_item:
2009 {
2010 file_id src_file(hbase), dst_file(hident);
2011 app.db.put_file_version(src_file, dst_file, file_delta(del));
2012 }
2013 break;
2014
2015 default:
2016 L(FL("ignoring delta received for item type %s") % typestr);
2017 break;
2018 }
2019 return true;
2020}
2021
2022bool
2023session::process_usher_cmd(utf8 const & msg)
2024{
2025 if (msg().size())
2026 {
2027 if (msg()[0] == '!')
2028 P(F("Received warning from usher: %s") % msg().substr(1));
2029 else
2030 L(FL("Received greeting from usher: %s") % msg().substr(1));
2031 }
2032 netcmd cmdout;
2033 cmdout.write_usher_reply_cmd(utf8(peer_id), our_include_pattern);
2034 write_netcmd_and_try_flush(cmdout);
2035 L(FL("Sent reply."));
2036 return true;
2037}
2038
2039
2040void
2041session::send_all_data(netcmd_item_type ty, set<id> const & items)
2042{
2043 string typestr;
2044 netcmd_item_type_to_string(ty, typestr);
2045
2046 // Use temporary; passed arg will be invalidated during iteration.
2047 set<id> tmp = items;
2048
2049 for (set<id>::const_iterator i = tmp.begin();
2050 i != tmp.end(); ++i)
2051 {
2052 hexenc<id> hitem;
2053 encode_hexenc(*i, hitem);
2054
2055 if (data_exists(ty, *i))
2056 {
2057 string out;
2058 load_data(ty, *i, out);
2059 queue_data_cmd(ty, *i, out);
2060 }
2061 }
2062}
2063
2064bool
2065session::dispatch_payload(netcmd const & cmd,
2066 transaction_guard & guard)
2067{
2068
2069 switch (cmd.get_cmd_code())
2070 {
2071
2072 case error_cmd:
2073 {
2074 string errmsg;
2075 cmd.read_error_cmd(errmsg);
2076 return process_error_cmd(errmsg);
2077 }
2078 break;
2079
2080 case hello_cmd:
2081 require(! authenticated, "hello netcmd received when not authenticated");
2082 require(voice == client_voice, "hello netcmd received in client voice");
2083 {
2084 rsa_keypair_id server_keyname;
2085 rsa_pub_key server_key;
2086 id nonce;
2087 cmd.read_hello_cmd(server_keyname, server_key, nonce);
2088 return process_hello_cmd(server_keyname, server_key, nonce);
2089 }
2090 break;
2091
2092 case bye_cmd:
2093 require(authenticated, "bye netcmd received when not authenticated");
2094 {
2095 u8 phase;
2096 cmd.read_bye_cmd(phase);
2097 return process_bye_cmd(phase, guard);
2098 }
2099 break;
2100
2101 case anonymous_cmd:
2102 require(! authenticated, "anonymous netcmd received when not authenticated");
2103 require(voice == server_voice, "anonymous netcmd received in server voice");
2104 require(role == source_role ||
2105 role == source_and_sink_role,
2106 "anonymous netcmd received in source or source/sink role");
2107 {
2108 protocol_role role;
2109 globish their_include_pattern, their_exclude_pattern;
2110 rsa_oaep_sha_data hmac_key_encrypted;
2111 cmd.read_anonymous_cmd(role, their_include_pattern, their_exclude_pattern, hmac_key_encrypted);
2112 L(FL("received 'anonymous' netcmd from client for pattern '%s' excluding '%s' "
2113 "in %s mode\n")
2114 % their_include_pattern % their_exclude_pattern
2115 % (role == source_and_sink_role ? _("source and sink") :
2116 (role == source_role ? _("source") : _("sink"))));
2117
2118 set_session_key(hmac_key_encrypted);
2119 if (!process_anonymous_cmd(role, their_include_pattern, their_exclude_pattern))
2120 return false;
2121 queue_confirm_cmd();
2122 return true;
2123 }
2124 break;
2125
2126 case auth_cmd:
2127 require(! authenticated, "auth netcmd received when not authenticated");
2128 require(voice == server_voice, "auth netcmd received in server voice");
2129 {
2130 protocol_role role;
2131 string signature;
2132 globish their_include_pattern, their_exclude_pattern;
2133 id client, nonce1, nonce2;
2134 rsa_oaep_sha_data hmac_key_encrypted;
2135 cmd.read_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2136 client, nonce1, hmac_key_encrypted, signature);
2137
2138 hexenc<id> their_key_hash;
2139 encode_hexenc(client, their_key_hash);
2140 hexenc<id> hnonce1;
2141 encode_hexenc(nonce1, hnonce1);
2142
2143 L(FL("received 'auth(hmac)' netcmd from client '%s' for pattern '%s' "
2144 "exclude '%s' in %s mode with nonce1 '%s'\n")
2145 % their_key_hash % their_include_pattern % their_exclude_pattern
2146 % (role == source_and_sink_role ? _("source and sink") :
2147 (role == source_role ? _("source") : _("sink")))
2148 % hnonce1);
2149
2150 set_session_key(hmac_key_encrypted);
2151
2152 if (!process_auth_cmd(role, their_include_pattern, their_exclude_pattern,
2153 client, nonce1, signature))
2154 return false;
2155 queue_confirm_cmd();
2156 return true;
2157 }
2158 break;
2159
2160 case confirm_cmd:
2161 require(! authenticated, "confirm netcmd received when not authenticated");
2162 require(voice == client_voice, "confirm netcmd received in client voice");
2163 {
2164 string signature;
2165 cmd.read_confirm_cmd();
2166 this->authenticated = true;
2167 respond_to_confirm_cmd();
2168 return true;
2169 }
2170 break;
2171
2172 case refine_cmd:
2173 require(authenticated, "refine netcmd received when authenticated");
2174 {
2175 merkle_node node;
2176 refinement_type ty;
2177 cmd.read_refine_cmd(ty, node);
2178 return process_refine_cmd(ty, node);
2179 }
2180 break;
2181
2182 case done_cmd:
2183 require(authenticated, "done netcmd received when not authenticated");
2184 {
2185 size_t n_items;
2186 netcmd_item_type type;
2187 cmd.read_done_cmd(type, n_items);
2188 return process_done_cmd(type, n_items);
2189 }
2190 break;
2191
2192 case data_cmd:
2193 require(authenticated, "data netcmd received when not authenticated");
2194 require(role == sink_role ||
2195 role == source_and_sink_role,
2196 "data netcmd received in source or source/sink role");
2197 {
2198 netcmd_item_type type;
2199 id item;
2200 string dat;
2201 cmd.read_data_cmd(type, item, dat);
2202 return process_data_cmd(type, item, dat);
2203 }
2204 break;
2205
2206 case delta_cmd:
2207 require(authenticated, "delta netcmd received when not authenticated");
2208 require(role == sink_role ||
2209 role == source_and_sink_role,
2210 "delta netcmd received in source or source/sink role");
2211 {
2212 netcmd_item_type type;
2213 id base, ident;
2214 delta del;
2215 cmd.read_delta_cmd(type, base, ident, del);
2216 return process_delta_cmd(type, base, ident, del);
2217 }
2218 break;
2219
2220 case usher_cmd:
2221 {
2222 utf8 greeting;
2223 cmd.read_usher_cmd(greeting);
2224 return process_usher_cmd(greeting);
2225 }
2226 break;
2227
2228 case usher_reply_cmd:
2229 return false; // Should not happen.
2230 break;
2231 }
2232 return false;
2233}
2234
2235// This kicks off the whole cascade starting from "hello".
2236void
2237session::begin_service()
2238{
2239 keypair kp;
2240 if (app.opts.use_transport_auth)
2241 app.keys.get_key_pair(app.opts.signing_key, kp);
2242 queue_hello_cmd(app.opts.signing_key, kp.pub, mk_nonce());
2243}
2244
2245void
2246session::maybe_step()
2247{
2248 while (done_all_refinements()
2249 && !rev_enumerator.done()
2250 && outbuf_size < constants::bufsz * 10)
2251 {
2252 rev_enumerator.step();
2253 }
2254}
2255
2256void
2257session::maybe_say_goodbye(transaction_guard & guard)
2258{
2259 if (voice == client_voice
2260 && protocol_state == working_state
2261 && finished_working())
2262 {
2263 protocol_state = shutdown_state;
2264 guard.do_checkpoint();
2265 queue_bye_cmd(0);
2266 }
2267}
2268
2269bool
2270session::arm()
2271{
2272 if (!armed)
2273 {
2274 // Don't pack the buffer unnecessarily.
2275 if (outbuf_size > constants::bufsz * 10)
2276 return false;
2277
2278 if (cmd.read(inbuf, read_hmac))
2279 {
2280 armed = true;
2281 }
2282 }
2283 return armed;
2284}
2285
2286bool session::process(transaction_guard & guard)
2287{
2288 if (encountered_error)
2289 return true;
2290 try
2291 {
2292 if (!arm())
2293 return true;
2294
2295 armed = false;
2296 L(FL("processing %d byte input buffer from peer %s")
2297 % inbuf.size() % peer_id);
2298
2299 size_t sz = cmd.encoded_size();
2300 bool ret = dispatch_payload(cmd, guard);
2301
2302 if (inbuf.size() >= constants::netcmd_maxsz)
2303 W(F("input buffer for peer %s is overfull "
2304 "after netcmd dispatch") % peer_id);
2305
2306 guard.maybe_checkpoint(sz);
2307
2308 if (!ret)
2309 L(FL("finishing processing with '%d' packet")
2310 % cmd.get_cmd_code());
2311 return ret;
2312 }
2313 catch (bad_decode & bd)
2314 {
2315 W(F("protocol error while processing peer %s: '%s'")
2316 % peer_id % bd.what);
2317 return false;
2318 }
2319 catch (netsync_error & err)
2320 {
2321 W(F("error: %s") % err.msg);
2322 queue_error_cmd(boost::lexical_cast<string>(error_code) + " " + err.msg);
2323 encountered_error = true;
2324 return true; // Don't terminate until we've send the error_cmd.
2325 }
2326}
2327
2328
2329static shared_ptr<Netxx::StreamBase>
2330build_stream_to_server(app_state & app,
2331 globish const & include_pattern,
2332 globish const & exclude_pattern,
2333 utf8 const & address,
2334 Netxx::port_type default_port,
2335 Netxx::Timeout timeout)
2336{
2337 shared_ptr<Netxx::StreamBase> server;
2338 uri u;
2339 vector<string> argv;
2340 if (parse_uri(address(), u)
2341 && app.lua.hook_get_netsync_connect_command(u,
2342 include_pattern,
2343 exclude_pattern,
2344 global_sanity.debug,
2345 argv))
2346 {
2347 I(argv.size() > 0);
2348 string cmd = argv[0];
2349 argv.erase(argv.begin());
2350 app.opts.use_transport_auth = app.lua.hook_use_transport_auth(u);
2351 return shared_ptr<Netxx::StreamBase>
2352 (new Netxx::PipeStream(cmd, argv));
2353
2354 }
2355 else
2356 {
2357#ifdef USE_IPV6
2358 bool use_ipv6=true;
2359#else
2360 bool use_ipv6=false;
2361#endif
2362 Netxx::Address addr(address().c_str(),
2363 default_port, use_ipv6);
2364 return shared_ptr<Netxx::StreamBase>
2365 (new Netxx::Stream(addr, timeout));
2366 }
2367}
2368
2369static void
2370call_server(protocol_role role,
2371 globish const & include_pattern,
2372 globish const & exclude_pattern,
2373 app_state & app,
2374 utf8 const & address,
2375 Netxx::port_type default_port,
2376 unsigned long timeout_seconds)
2377{
2378 Netxx::PipeCompatibleProbe probe;
2379 transaction_guard guard(app.db);
2380
2381 Netxx::Timeout timeout(static_cast<long>(timeout_seconds)), instant(0,1);
2382
2383 // FIXME: split into labels and convert to ace here.
2384
2385 P(F("connecting to %s") % address());
2386
2387 shared_ptr<Netxx::StreamBase> server
2388 = build_stream_to_server(app,
2389 include_pattern,
2390 exclude_pattern,
2391 address, default_port,
2392 timeout);
2393
2394
2395 // 'false' here means not to revert changes when the SockOpt
2396 // goes out of scope.
2397 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2398 socket_options.set_non_blocking();
2399
2400 session sess(role, client_voice,
2401 include_pattern,
2402 exclude_pattern,
2403 app, address(), server);
2404
2405 while (true)
2406 {
2407 bool armed = false;
2408 try
2409 {
2410 armed = sess.arm();
2411 }
2412 catch (bad_decode & bd)
2413 {
2414 E(false, F("protocol error while processing peer %s: '%s'")
2415 % sess.peer_id % bd.what);
2416 }
2417
2418 sess.maybe_step();
2419 sess.maybe_say_goodbye(guard);
2420
2421 probe.clear();
2422 probe.add(*(sess.str), sess.which_events());
2423 Netxx::Probe::result_type res = probe.ready(armed ? instant : timeout);
2424 Netxx::Probe::ready_type event = res.second;
2425 Netxx::socket_type fd = res.first;
2426
2427 if (fd == -1 && !armed)
2428 {
2429 E(false, (F("timed out waiting for I/O with "
2430 "peer %s, disconnecting")
2431 % sess.peer_id));
2432 }
2433
2434 bool all_io_clean = (event != Netxx::Probe::ready_oobd);
2435
2436 if (event & Netxx::Probe::ready_read)
2437 all_io_clean = all_io_clean && sess.read_some();
2438
2439 if (event & Netxx::Probe::ready_write)
2440 all_io_clean = all_io_clean && sess.write_some();
2441
2442 if (armed)
2443 if (!sess.process(guard))
2444 {
2445 // Commit whatever work we managed to accomplish anyways.
2446 guard.commit();
2447
2448 // We failed during processing. This should only happen in
2449 // client voice when we have a decode exception, or received an
2450 // error from our server (which is translated to a decode
2451 // exception). We call these cases E() errors.
2452 E(false, F("processing failure while talking to "
2453 "peer %s, disconnecting")
2454 % sess.peer_id);
2455 return;
2456 }
2457
2458 if (!all_io_clean)
2459 {
2460 // Commit whatever work we managed to accomplish anyways.
2461 guard.commit();
2462
2463 // We had an I/O error. We must decide if this represents a
2464 // user-reported error or a clean disconnect. See protocol
2465 // state diagram in session::process_bye_cmd.
2466
2467 if (sess.protocol_state == session::confirmed_state)
2468 {
2469 P(F("successful exchange with %s")
2470 % sess.peer_id);
2471 return;
2472 }
2473 else if (sess.encountered_error)
2474 {
2475 P(F("peer %s disconnected after we informed them of error")
2476 % sess.peer_id);
2477 return;
2478 }
2479 else
2480 E(false, (F("I/O failure while talking to "
2481 "peer %s, disconnecting")
2482 % sess.peer_id));
2483 }
2484 }
2485}
2486
2487static void
2488drop_session_associated_with_fd(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2489 Netxx::socket_type fd)
2490{
2491 // This is a bit of a hack. Initially all "file descriptors" in
2492 // netsync were full duplex, so we could get away with indexing
2493 // sessions by their file descriptor.
2494 //
2495 // When using pipes in unix, it's no longer true: a session gets
2496 // entered in the session map under its read pipe fd *and* its write
2497 // pipe fd. When we're in such a situation the socket fd is "-1" and
2498 // we downcast to a PipeStream and use its read+write fds.
2499 //
2500 // When using pipes in windows, we use a full duplex pipe (named
2501 // pipe) so the socket-like abstraction holds.
2502
2503 I(fd != -1);
2504 map<Netxx::socket_type, shared_ptr<session> >::const_iterator i = sessions.find(fd);
2505 I(i != sessions.end());
2506 shared_ptr<session> sess = i->second;
2507 fd = sess->str->get_socketfd();
2508 if (fd != -1)
2509 {
2510 sessions.erase(fd);
2511 }
2512 else
2513 {
2514 shared_ptr<Netxx::PipeStream> pipe =
2515 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2516 I(static_cast<bool>(pipe));
2517 I(pipe->get_writefd() != -1);
2518 I(pipe->get_readfd() != -1);
2519 sessions.erase(pipe->get_readfd());
2520 sessions.erase(pipe->get_writefd());
2521 }
2522}
2523
2524static void
2525arm_sessions_and_calculate_probe(Netxx::PipeCompatibleProbe & probe,
2526 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2527 set<Netxx::socket_type> & armed_sessions,
2528 transaction_guard & guard)
2529{
2530 set<Netxx::socket_type> arm_failed;
2531 for (map<Netxx::socket_type,
2532 shared_ptr<session> >::const_iterator i = sessions.begin();
2533 i != sessions.end(); ++i)
2534 {
2535 i->second->maybe_step();
2536 i->second->maybe_say_goodbye(guard);
2537 try
2538 {
2539 if (i->second->arm())
2540 {
2541 L(FL("fd %d is armed") % i->first);
2542 armed_sessions.insert(i->first);
2543 }
2544 probe.add(*i->second->str, i->second->which_events());
2545 }
2546 catch (bad_decode & bd)
2547 {
2548 W(F("protocol error while processing peer %s: '%s', marking as bad")
2549 % i->second->peer_id % bd.what);
2550 arm_failed.insert(i->first);
2551 }
2552 }
2553 for (set<Netxx::socket_type>::const_iterator i = arm_failed.begin();
2554 i != arm_failed.end(); ++i)
2555 {
2556 drop_session_associated_with_fd(sessions, *i);
2557 }
2558}
2559
2560static void
2561handle_new_connection(Netxx::Address & addr,
2562 Netxx::StreamServer & server,
2563 Netxx::Timeout & timeout,
2564 protocol_role role,
2565 globish const & include_pattern,
2566 globish const & exclude_pattern,
2567 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2568 app_state & app)
2569{
2570 L(FL("accepting new connection on %s : %s")
2571 % (addr.get_name()?addr.get_name():"") % lexical_cast<string>(addr.get_port()));
2572 Netxx::Peer client = server.accept_connection();
2573
2574 if (!client)
2575 {
2576 L(FL("accept() returned a dead client"));
2577 }
2578 else
2579 {
2580 P(F("accepted new client connection from %s : %s")
2581 % client.get_address() % lexical_cast<string>(client.get_port()));
2582
2583 // 'false' here means not to revert changes when the SockOpt
2584 // goes out of scope.
2585 Netxx::SockOpt socket_options(client.get_socketfd(), false);
2586 socket_options.set_non_blocking();
2587
2588 shared_ptr<Netxx::Stream> str =
2589 shared_ptr<Netxx::Stream>
2590 (new Netxx::Stream(client.get_socketfd(), timeout));
2591
2592 shared_ptr<session> sess(new session(role, server_voice,
2593 include_pattern, exclude_pattern,
2594 app,
2595 lexical_cast<string>(client), str));
2596 sess->begin_service();
2597 sessions.insert(make_pair(client.get_socketfd(), sess));
2598 }
2599}
2600
2601static void
2602handle_read_available(Netxx::socket_type fd,
2603 shared_ptr<session> sess,
2604 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2605 set<Netxx::socket_type> & armed_sessions,
2606 bool & live_p)
2607{
2608 if (sess->read_some())
2609 {
2610 try
2611 {
2612 if (sess->arm())
2613 armed_sessions.insert(fd);
2614 }
2615 catch (bad_decode & bd)
2616 {
2617 W(F("protocol error while processing peer %s: '%s', disconnecting")
2618 % sess->peer_id % bd.what);
2619 drop_session_associated_with_fd(sessions, fd);
2620 live_p = false;
2621 }
2622 }
2623 else
2624 {
2625 switch (sess->protocol_state)
2626 {
2627 case session::working_state:
2628 P(F("peer %s read failed in working state (error)")
2629 % sess->peer_id);
2630 break;
2631
2632 case session::shutdown_state:
2633 P(F("peer %s read failed in shutdown state "
2634 "(possibly client misreported error)")
2635 % sess->peer_id);
2636 break;
2637
2638 case session::confirmed_state:
2639 P(F("peer %s read failed in confirmed state (success)")
2640 % sess->peer_id);
2641 break;
2642 }
2643 drop_session_associated_with_fd(sessions, fd);
2644 live_p = false;
2645 }
2646}
2647
2648
2649static void
2650handle_write_available(Netxx::socket_type fd,
2651 shared_ptr<session> sess,
2652 map<Netxx::socket_type, shared_ptr<session> > & sessions,
2653 bool & live_p)
2654{
2655 if (!sess->write_some())
2656 {
2657 switch (sess->protocol_state)
2658 {
2659 case session::working_state:
2660 P(F("peer %s write failed in working state (error)")
2661 % sess->peer_id);
2662 break;
2663
2664 case session::shutdown_state:
2665 P(F("peer %s write failed in shutdown state "
2666 "(possibly client misreported error)")
2667 % sess->peer_id);
2668 break;
2669
2670 case session::confirmed_state:
2671 P(F("peer %s write failed in confirmed state (success)")
2672 % sess->peer_id);
2673 break;
2674 }
2675
2676 drop_session_associated_with_fd(sessions, fd);
2677 live_p = false;
2678 }
2679}
2680
2681static void
2682process_armed_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2683 set<Netxx::socket_type> & armed_sessions,
2684 transaction_guard & guard)
2685{
2686 for (set<Netxx::socket_type>::const_iterator i = armed_sessions.begin();
2687 i != armed_sessions.end(); ++i)
2688 {
2689 map<Netxx::socket_type, shared_ptr<session> >::iterator j;
2690 j = sessions.find(*i);
2691 if (j == sessions.end())
2692 continue;
2693 else
2694 {
2695 shared_ptr<session> sess = j->second;
2696 if (!sess->process(guard))
2697 {
2698 P(F("peer %s processing finished, disconnecting")
2699 % sess->peer_id);
2700 drop_session_associated_with_fd(sessions, *i);
2701 }
2702 }
2703 }
2704}
2705
2706static void
2707reap_dead_sessions(map<Netxx::socket_type, shared_ptr<session> > & sessions,
2708 unsigned long timeout_seconds)
2709{
2710 // Kill any clients which haven't done any i/o inside the timeout period
2711 // or who have exchanged all items and flushed their output buffers.
2712 set<Netxx::socket_type> dead_clients;
2713 time_t now = ::time(NULL);
2714 for (map<Netxx::socket_type, shared_ptr<session> >::const_iterator
2715 i = sessions.begin(); i != sessions.end(); ++i)
2716 {
2717 if (static_cast<unsigned long>(i->second->last_io_time + timeout_seconds)
2718 < static_cast<unsigned long>(now))
2719 {
2720 P(F("fd %d (peer %s) has been idle too long, disconnecting")
2721 % i->first % i->second->peer_id);
2722 dead_clients.insert(i->first);
2723 }
2724 }
2725 for (set<Netxx::socket_type>::const_iterator i = dead_clients.begin();
2726 i != dead_clients.end(); ++i)
2727 {
2728 drop_session_associated_with_fd(sessions, *i);
2729 }
2730}
2731
2732static void
2733serve_connections(protocol_role role,
2734 globish const & include_pattern,
2735 globish const & exclude_pattern,
2736 app_state & app,
2737 utf8 const & address,
2738 Netxx::port_type default_port,
2739 unsigned long timeout_seconds,
2740 unsigned long session_limit)
2741{
2742 Netxx::PipeCompatibleProbe probe;
2743
2744 Netxx::Timeout
2745 forever,
2746 timeout(static_cast<long>(timeout_seconds)),
2747 instant(0,1);
2748
2749 if (!app.opts.bind_port().empty())
2750 default_port = std::atoi(app.opts.bind_port().c_str());
2751#ifdef USE_IPV6
2752 bool use_ipv6=true;
2753#else
2754 bool use_ipv6=false;
2755#endif
2756 // This will be true when we try to bind while using IPv6. See comments
2757 // further down.
2758 bool try_again=false;
2759
2760 do
2761 {
2762 try
2763 {
2764 try_again = false;
2765
2766 Netxx::Address addr(use_ipv6);
2767
2768 if (!app.opts.bind_address().empty())
2769 addr.add_address(app.opts.bind_address().c_str(), default_port);
2770 else
2771 addr.add_all_addresses (default_port);
2772
2773 // If se use IPv6 and the initialisation of server fails, we want
2774 // to try again with IPv4. The reason is that someone may have
2775 // downloaded a IPv6-enabled monotone on a system that doesn't
2776 // have IPv6, and which might fail therefore.
2777 // On failure, Netxx::NetworkException is thrown, and we catch
2778 // it further down.
2779 try_again=use_ipv6;
2780
2781 Netxx::StreamServer server(addr, timeout);
2782
2783 // If we came this far, whatever we used (IPv6 or IPv4) was
2784 // accepted, so we don't need to try again any more.
2785 try_again=false;
2786
2787 const char *name = addr.get_name();
2788 P(F("beginning service on %s : %s")
2789 % (name != NULL ? name : _("<all interfaces>"))
2790 % lexical_cast<string>(addr.get_port()));
2791
2792 map<Netxx::socket_type, shared_ptr<session> > sessions;
2793 set<Netxx::socket_type> armed_sessions;
2794
2795 shared_ptr<transaction_guard> guard;
2796
2797 while (true)
2798 {
2799 probe.clear();
2800 armed_sessions.clear();
2801
2802 if (sessions.size() >= session_limit)
2803 W(F("session limit %d reached, some connections "
2804 "will be refused") % session_limit);
2805 else
2806 probe.add(server);
2807
2808 if (!guard)
2809 guard = shared_ptr<transaction_guard>(new transaction_guard(app.db));
2810
2811 I(guard);
2812
2813 while (!server_initiated_sync_requests.empty())
2814 {
2815 server_initiated_sync_request request
2816 = server_initiated_sync_requests.front();
2817 server_initiated_sync_requests.pop_front();
2818
2819 utf8 addr(request.address);
2820 globish inc(request.include);
2821 globish exc(request.exclude);
2822
2823 try
2824 {
2825 P(F("connecting to %s") % addr());
2826 shared_ptr<Netxx::StreamBase> server
2827 = build_stream_to_server(app, inc, exc,
2828 addr, default_port,
2829 timeout);
2830
2831 // 'false' here means not to revert changes when
2832 // the SockOpt goes out of scope.
2833 Netxx::SockOpt socket_options(server->get_socketfd(), false);
2834 socket_options.set_non_blocking();
2835
2836 protocol_role role = source_and_sink_role;
2837 if (request.what == "sync")
2838 role = source_and_sink_role;
2839 else if (request.what == "push")
2840 role = source_role;
2841 else if (request.what == "pull")
2842 role = sink_role;
2843
2844 shared_ptr<session> sess(new session(role, client_voice,
2845 inc, exc,
2846 app, addr(), server, true));
2847
2848 sessions.insert(make_pair(server->get_socketfd(), sess));
2849 }
2850 catch (Netxx::NetworkException & e)
2851 {
2852 P(F("Network error: %s") % e.what());
2853 }
2854 }
2855
2856 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, *guard);
2857
2858 L(FL("i/o probe with %d armed") % armed_sessions.size());
2859 Netxx::socket_type fd;
2860 Netxx::Timeout how_long;
2861 if (sessions.empty())
2862 how_long = forever;
2863 else if (armed_sessions.empty())
2864 how_long = timeout;
2865 else
2866 how_long = instant;
2867 do
2868 {
2869 Netxx::Probe::result_type res = probe.ready(how_long);
2870 how_long = instant;
2871 Netxx::Probe::ready_type event = res.second;
2872 fd = res.first;
2873
2874 if (fd == -1)
2875 {
2876 if (armed_sessions.empty())
2877 L(FL("timed out waiting for I/O (listening on %s : %s)")
2878 % addr.get_name() % lexical_cast<string>(addr.get_port()));
2879 }
2880
2881 // we either got a new connection
2882 else if (fd == server)
2883 handle_new_connection(addr, server, timeout, role,
2884 include_pattern, exclude_pattern,
2885 sessions, app);
2886
2887 // or an existing session woke up
2888 else
2889 {
2890 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
2891 i = sessions.find(fd);
2892 if (i == sessions.end())
2893 {
2894 L(FL("got woken up for action on unknown fd %d") % fd);
2895 }
2896 else
2897 {
2898 probe.remove(*(i->second->str));
2899 shared_ptr<session> sess = i->second;
2900 bool live_p = true;
2901
2902 try
2903 {
2904 if (event & Netxx::Probe::ready_read)
2905 handle_read_available(fd, sess, sessions,
2906 armed_sessions, live_p);
2907
2908 if (live_p && (event & Netxx::Probe::ready_write))
2909 handle_write_available(fd, sess, sessions, live_p);
2910 }
2911 catch (Netxx::Exception &)
2912 {
2913 P(F("Network error on peer %s, disconnecting")
2914 % sess->peer_id);
2915 drop_session_associated_with_fd(sessions, fd);
2916 }
2917 if (live_p && (event & Netxx::Probe::ready_oobd))
2918 {
2919 P(F("got OOB from peer %s, disconnecting")
2920 % sess->peer_id);
2921 drop_session_associated_with_fd(sessions, fd);
2922 }
2923 }
2924 }
2925 }
2926 while (fd != -1);
2927 process_armed_sessions(sessions, armed_sessions, *guard);
2928 reap_dead_sessions(sessions, timeout_seconds);
2929
2930 if (sessions.empty())
2931 {
2932 // Let the guard die completely if everything's gone quiet.
2933 guard->commit();
2934 guard.reset();
2935 }
2936 }
2937 }
2938 // This exception is thrown when bind() fails somewhere in Netxx.
2939 catch (Netxx::NetworkException &)
2940 {
2941 // If we tried with IPv6 and failed, we want to try again using IPv4.
2942 if (try_again)
2943 {
2944 use_ipv6 = false;
2945 }
2946 // In all other cases, just rethrow the exception.
2947 else
2948 throw;
2949 }
2950 // This exception is thrown when there is no support for the type of
2951 // connection we want to do in the kernel, for example when a socket()
2952 // call fails somewhere in Netxx.
2953 catch (Netxx::Exception &)
2954 {
2955 // If we tried with IPv6 and failed, we want to try again using IPv4.
2956 if (try_again)
2957 {
2958 use_ipv6 = false;
2959 }
2960 // In all other cases, just rethrow the exception.
2961 else
2962 throw;
2963 }
2964 }
2965 while(try_again);
2966 }
2967
2968static void
2969serve_single_connection(shared_ptr<session> sess,
2970 unsigned long timeout_seconds)
2971{
2972 Netxx::PipeCompatibleProbe probe;
2973
2974 Netxx::Timeout
2975 forever,
2976 timeout(static_cast<long>(timeout_seconds)),
2977 instant(0,1);
2978
2979 P(F("beginning service on %s") % sess->peer_id);
2980
2981 sess->begin_service();
2982
2983 transaction_guard guard(sess->app.db);
2984
2985 map<Netxx::socket_type, shared_ptr<session> > sessions;
2986 set<Netxx::socket_type> armed_sessions;
2987
2988 if (sess->str->get_socketfd() == -1)
2989 {
2990 // Unix pipes are non-duplex, have two filedescriptors
2991 shared_ptr<Netxx::PipeStream> pipe =
2992 boost::dynamic_pointer_cast<Netxx::PipeStream, Netxx::StreamBase>(sess->str);
2993 I(pipe);
2994 sessions[pipe->get_writefd()]=sess;
2995 sessions[pipe->get_readfd()]=sess;
2996 }
2997 else
2998 sessions[sess->str->get_socketfd()]=sess;
2999
3000 while (!sessions.empty())
3001 {
3002 probe.clear();
3003 armed_sessions.clear();
3004
3005 arm_sessions_and_calculate_probe(probe, sessions, armed_sessions, guard);
3006
3007 L(FL("i/o probe with %d armed") % armed_sessions.size());
3008 Netxx::Probe::result_type res = probe.ready((armed_sessions.empty() ? timeout
3009 : instant));
3010 Netxx::Probe::ready_type event = res.second;
3011 Netxx::socket_type fd = res.first;
3012
3013 if (fd == -1)
3014 {
3015 if (armed_sessions.empty())
3016 L(FL("timed out waiting for I/O (listening on %s)")
3017 % sess->peer_id);
3018 }
3019
3020 // an existing session woke up
3021 else
3022 {
3023 map<Netxx::socket_type, shared_ptr<session> >::iterator i;
3024 i = sessions.find(fd);
3025 if (i == sessions.end())
3026 {
3027 L(FL("got woken up for action on unknown fd %d") % fd);
3028 }
3029 else
3030 {
3031 shared_ptr<session> sess = i->second;
3032 bool live_p = true;
3033
3034 if (event & Netxx::Probe::ready_read)
3035 handle_read_available(fd, sess, sessions, armed_sessions, live_p);
3036
3037 if (live_p && (event & Netxx::Probe::ready_write))
3038 handle_write_available(fd, sess, sessions, live_p);
3039
3040 if (live_p && (event & Netxx::Probe::ready_oobd))
3041 {
3042 P(F("got some OOB data on fd %d (peer %s), disconnecting")
3043 % fd % sess->peer_id);
3044 drop_session_associated_with_fd(sessions, fd);
3045 }
3046 }
3047 }
3048 process_armed_sessions(sessions, armed_sessions, guard);
3049 reap_dead_sessions(sessions, timeout_seconds);
3050 }
3051}
3052
3053
3054void
3055insert_with_parents(revision_id rev,
3056 refiner & ref,
3057 revision_enumerator & rev_enumerator,
3058 set<revision_id> & revs,
3059 app_state & app,
3060 ticker & revisions_ticker)
3061{
3062 deque<revision_id> work;
3063 work.push_back(rev);
3064 while (!work.empty())
3065 {
3066 revision_id rid = work.front();
3067 work.pop_front();
3068
3069 if (!null_id(rid) && revs.find(rid) == revs.end())
3070 {
3071 revs.insert(rid);
3072 ++revisions_ticker;
3073 id rev_item;
3074 decode_hexenc(rid.inner(), rev_item);
3075 ref.note_local_item(rev_item);
3076 vector<revision_id> parents;
3077 rev_enumerator.get_revision_parents(rid, parents);
3078 for (vector<revision_id>::const_iterator i = parents.begin();
3079 i != parents.end(); ++i)
3080 {
3081 work.push_back(*i);
3082 }
3083 }
3084 }
3085}
3086
3087void
3088session::rebuild_merkle_trees(app_state & app,
3089 set<branch_name> const & branchnames)
3090{
3091 P(F("finding items to synchronize:"));
3092 for (set<branch_name>::const_iterator i = branchnames.begin();
3093 i != branchnames.end(); ++i)
3094 L(FL("including branch %s") % *i);
3095
3096 // xgettext: please use short message and try to avoid multibytes chars
3097 ticker revisions_ticker(N_("revisions"), "r", 64);
3098 // xgettext: please use short message and try to avoid multibytes chars
3099 ticker certs_ticker(N_("certificates"), "c", 256);
3100 // xgettext: please use short message and try to avoid multibytes chars
3101 ticker keys_ticker(N_("keys"), "k", 1);
3102
3103 set<revision_id> revision_ids;
3104 set<rsa_keypair_id> inserted_keys;
3105
3106 {
3107 for (set<branch_name>::const_iterator i = branchnames.begin();
3108 i != branchnames.end(); ++i)
3109 {
3110 // Get branch certs.
3111 vector< revision<cert> > certs;
3112 // FIXME_PROJECTS: probably something like
3113 // app.get_project(i->project).get_branch_certs(i->branch)
3114 // or so.
3115 app.get_project().get_branch_certs(*i, certs);
3116 for (vector< revision<cert> >::const_iterator j = certs.begin();
3117 j != certs.end(); j++)
3118 {
3119 revision_id rid(j->inner().ident);
3120 insert_with_parents(rid, rev_refiner, rev_enumerator,
3121 revision_ids, app, revisions_ticker);
3122 // Branch certs go in here, others later on.
3123 hexenc<id> tmp;
3124 id item;
3125 cert_hash_code(j->inner(), tmp);
3126 decode_hexenc(tmp, item);
3127 cert_refiner.note_local_item(item);
3128 rev_enumerator.note_cert(rid, tmp);
3129 if (inserted_keys.find(j->inner().key) == inserted_keys.end())
3130 inserted_keys.insert(j->inner().key);
3131 }
3132 }
3133 }
3134
3135 {
3136 map<branch_name, epoch_data> epochs;
3137 app.db.get_epochs(epochs);
3138
3139 epoch_data epoch_zero(string(constants::epochlen, '0'));
3140 for (set<branch_name>::const_iterator i = branchnames.begin();
3141 i != branchnames.end(); ++i)
3142 {
3143 branch_name const & branch(*i);
3144 map<branch_name, epoch_data>::const_iterator j;
3145 j = epochs.find(branch);
3146
3147 // Set to zero any epoch which is not yet set.
3148 if (j == epochs.end())
3149 {
3150 L(FL("setting epoch on %s to zero") % branch);
3151 epochs.insert(make_pair(branch, epoch_zero));
3152 app.db.set_epoch(branch, epoch_zero);
3153 }
3154
3155 // Then insert all epochs into merkle tree.
3156 j = epochs.find(branch);
3157 I(j != epochs.end());
3158 epoch_id eid;
3159 id epoch_item;
3160 epoch_hash_code(j->first, j->second, eid);
3161 decode_hexenc(eid.inner(), epoch_item);
3162 epoch_refiner.note_local_item(epoch_item);
3163 }
3164 }
3165
3166 {
3167 typedef vector< pair<hexenc<id>,
3168 pair<revision_id, rsa_keypair_id> > > cert_idx;
3169
3170 cert_idx idx;
3171 app.db.get_revision_cert_nobranch_index(idx);
3172
3173 // Insert all non-branch certs reachable via these revisions
3174 // (branch certs were inserted earlier).
3175
3176 for (cert_idx::const_iterator i = idx.begin(); i != idx.end(); ++i)
3177 {
3178 hexenc<id> const & hash = i->first;
3179 revision_id const & ident = i->second.first;
3180 rsa_keypair_id const & key = i->second.second;
3181
3182 rev_enumerator.note_cert(ident, hash);
3183
3184 if (revision_ids.find(ident) == revision_ids.end())
3185 continue;
3186
3187 id item;
3188 decode_hexenc(hash, item);
3189 cert_refiner.note_local_item(item);
3190 ++certs_ticker;
3191 if (inserted_keys.find(key) == inserted_keys.end())
3192 inserted_keys.insert(key);
3193 }
3194 }
3195
3196 // Add any keys specified on the command line.
3197 for (vector<rsa_keypair_id>::const_iterator key
3198 = app.opts.keys_to_push.begin();
3199 key != app.opts.keys_to_push.end(); ++key)
3200 {
3201 if (inserted_keys.find(*key) == inserted_keys.end())
3202 {
3203 if (!app.db.public_key_exists(*key))
3204 {
3205 if (app.keys.key_pair_exists(*key))
3206 app.keys.ensure_in_database(*key);
3207 else
3208 W(F("Cannot find key '%s'") % *key);
3209 }
3210 inserted_keys.insert(*key);
3211 }
3212 }
3213
3214 // Insert all the keys.
3215 for (set<rsa_keypair_id>::const_iterator key = inserted_keys.begin();
3216 key != inserted_keys.end(); key++)
3217 {
3218 if (app.db.public_key_exists(*key))
3219 {
3220 base64<rsa_pub_key> pub_encoded;
3221 app.db.get_key(*key, pub_encoded);
3222 hexenc<id> keyhash;
3223 key_hash_code(*key, pub_encoded, keyhash);
3224 L(FL("noting key '%s' = '%s' to send") % *key % keyhash);
3225 id key_item;
3226 decode_hexenc(keyhash, key_item);
3227 key_refiner.note_local_item(key_item);
3228 ++keys_ticker;
3229 }
3230 }
3231
3232 rev_refiner.reindex_local_items();
3233 cert_refiner.reindex_local_items();
3234 key_refiner.reindex_local_items();
3235 epoch_refiner.reindex_local_items();
3236}
3237
3238void
3239run_netsync_protocol(protocol_voice voice,
3240 protocol_role role,
3241 utf8 const & addr,
3242 globish const & include_pattern,
3243 globish const & exclude_pattern,
3244 app_state & app)
3245{
3246 if (include_pattern().find_first_of("'\"") != string::npos)
3247 {
3248 W(F("include branch pattern contains a quote character:\n"
3249 "%s") % include_pattern());
3250 }
3251
3252 if (exclude_pattern().find_first_of("'\"") != string::npos)
3253 {
3254 W(F("exclude branch pattern contains a quote character:\n"
3255 "%s") % exclude_pattern());
3256 }
3257
3258 // We do not want to be killed by SIGPIPE from a network disconnect.
3259 ignore_sigpipe();
3260
3261 try
3262 {
3263 if (voice == server_voice)
3264 {
3265 if (app.opts.bind_stdio)
3266 {
3267 shared_ptr<Netxx::PipeStream> str(new Netxx::PipeStream(0,1));
3268 shared_ptr<session> sess(new session(role, server_voice,
3269 include_pattern, exclude_pattern,
3270 app, "stdio", str));
3271 serve_single_connection(sess,constants::netsync_timeout_seconds);
3272 }
3273 else
3274 serve_connections(role, include_pattern, exclude_pattern, app,
3275 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3276 static_cast<unsigned long>(constants::netsync_timeout_seconds),
3277 static_cast<unsigned long>(constants::netsync_connection_limit));
3278 }
3279 else
3280 {
3281 I(voice == client_voice);
3282 call_server(role, include_pattern, exclude_pattern, app,
3283 addr, static_cast<Netxx::port_type>(constants::netsync_default_port),
3284 static_cast<unsigned long>(constants::netsync_timeout_seconds));
3285 }
3286 }
3287 catch (Netxx::NetworkException & e)
3288 {
3289 throw informative_failure((F("network error: %s") % e.what()).str());
3290 }
3291 catch (Netxx::Exception & e)
3292 {
3293 throw oops((F("network error: %s") % e.what()).str());;
3294 }
3295}
3296
3297// Local Variables:
3298// mode: C++
3299// fill-column: 76
3300// c-file-style: "gnu"
3301// indent-tabs-mode: nil
3302// End:
3303// vim: et:sw=2:sts=2:ts=2:cino=>2s,{s,\:s,+s,t0,g0,^-2,e-2,n-2,p2s,(0,=s:

Archive Download this file

Branches

Tags

Quick Links:     www.monotone.ca    -     Downloads    -     Documentation    -     Wiki    -     Code Forge    -     Build Status