Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
update chrono and bson dependencies (#222)
Browse files Browse the repository at this point in the history
  • Loading branch information
saghm authored Jun 23, 2017
1 parent 58d28bb commit f513ef3
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 63 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ version = "0.3.0"

[dependencies]
bitflags = "0.9.1"
bson = "0.8.0"
bson = "0.9.0"
bufstream = "0.1.3"
byteorder = "1.0.0"
chrono = "0.3.1"
chrono = "0.4.0"
data-encoding = "1.2.0"
rand = "0.3.15"
rust-crypto = "0.2.36"
Expand Down
93 changes: 63 additions & 30 deletions src/gridfs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use bson::{self, Bson, oid};
use bson::spec::BinarySubtype;

use chrono::{DateTime, UTC};
use chrono::{DateTime, Utc};
use crypto::digest::Digest;
use crypto::md5::Md5;

Expand Down Expand Up @@ -80,7 +80,7 @@ pub struct GfsFile {
// The filename of the document.
pub name: Option<String>,
// The date the document was first stored in GridFS.
pub upload_date: Option<DateTime<UTC>>,
pub upload_date: Option<DateTime<Utc>>,
// The content type of the file.
pub content_type: Option<String>,
// Any additional metadata provided by the user.
Expand Down Expand Up @@ -195,7 +195,7 @@ impl File {
if self.mode == Mode::Write {
if try!(self.err_description()).is_none() {
if self.doc.upload_date.is_none() {
self.doc.upload_date = Some(UTC::now());
self.doc.upload_date = Some(Utc::now());
}
self.doc.md5 = self.wsum.result_str();
try!(self.gfs.files.insert_one(self.doc.to_bson(), None));
Expand All @@ -205,11 +205,15 @@ impl File {

let mut opts = IndexOptions::new();
opts.unique = Some(true);
try!(self.gfs.chunks.create_index(doc!{ "files_id" => 1, "n" => 1}, Some(opts)));
try!(self.gfs.chunks.create_index(
doc!{ "files_id" => 1, "n" => 1},
Some(opts),
));
} else {
try!(self.gfs
.chunks
.delete_many(doc!{ "files_id" => (self.doc.id.clone()) }, None));
try!(self.gfs.chunks.delete_many(
doc!{ "files_id" => (self.doc.id.clone()) },
None,
));
}
}

Expand Down Expand Up @@ -240,7 +244,8 @@ impl File {
let mut vec_buf = Vec::with_capacity(buf.len());
vec_buf.extend(buf.iter().cloned());

let document = doc! {
let document =
doc! {
"_id" => (try!(oid::ObjectId::new())),
"files_id" => (self.doc.id.clone()),
"n" => n,
Expand Down Expand Up @@ -322,8 +327,10 @@ impl File {
}
};

let result = arc_gfs.chunks
.find_one(Some(doc!{"files_id" => (id), "n" => (next_chunk_num)}), None);
let result = arc_gfs.chunks.find_one(
Some(doc!{"files_id" => (id), "n" => (next_chunk_num)}),
None,
);

match result {
Ok(Some(doc)) => {
Expand All @@ -333,8 +340,10 @@ impl File {
cache.err = None;
}
_ => {
cache.err = Some(OperationError(String::from("Chunk contained \
no data.")))
cache.err = Some(OperationError(String::from(
"Chunk contained \
no data.",
)))
}
}
}
Expand All @@ -361,7 +370,10 @@ impl io::Write for File {

let description = try!(self.err_description());
if description.is_some() {
return Err(io::Error::new(io::ErrorKind::Other, OperationError(description.unwrap())));
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}

let mut data = buf;
Expand Down Expand Up @@ -393,16 +405,19 @@ impl io::Write for File {

// If over a megabyte is being written at once, wait for the load to reduce.
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32 {
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};

let description = try!(self.err_description());
if description.is_some() {
return Err(io::Error::new(io::ErrorKind::Other,
OperationError(description.unwrap())));
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
}

Expand All @@ -423,16 +438,19 @@ impl io::Write for File {

// Pending megabyte
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32 {
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
};

let description = try!(self.err_description());
if description.is_some() {
return Err(io::Error::new(io::ErrorKind::Other,
OperationError(description.unwrap())));
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}
}

Expand Down Expand Up @@ -461,7 +479,8 @@ impl io::Write for File {

// Pending megabyte
while self.doc.chunk_size * self.wpending.load(Ordering::SeqCst) as i32 >=
MEGABYTE as i32 {
MEGABYTE as i32
{
guard = match self.condvar.wait(guard) {
Ok(guard) => guard,
Err(_) => return Err(io::Error::new(io::ErrorKind::Other, PoisonLockError)),
Expand All @@ -486,7 +505,10 @@ impl io::Write for File {

let description = try!(self.err_description());
if description.is_some() {
return Err(io::Error::new(io::ErrorKind::Other, OperationError(description.unwrap())));
return Err(io::Error::new(
io::ErrorKind::Other,
OperationError(description.unwrap()),
));
}

Ok(())
Expand Down Expand Up @@ -613,7 +635,8 @@ impl GfsFile {

/// Converts a GfsFile into a bson document.
pub fn to_bson(&self) -> bson::Document {
let mut doc = doc! {
let mut doc =
doc! {
"_id" => (self.id.clone()),
"chunkSize" => (self.chunk_size),
"length" => (self.len),
Expand All @@ -622,19 +645,27 @@ impl GfsFile {
};

if self.name.is_some() {
doc.insert("filename",
Bson::String(self.name.as_ref().unwrap().to_owned()));
doc.insert(
"filename",
Bson::String(self.name.as_ref().unwrap().to_owned()),
);
}

if self.content_type.is_some() {
doc.insert("contentType",
Bson::String(self.content_type.as_ref().unwrap().to_owned()));
doc.insert(
"contentType",
Bson::String(self.content_type.as_ref().unwrap().to_owned()),
);
}

if self.metadata.is_some() {
doc.insert("metadata",
Bson::Binary(BinarySubtype::Generic,
self.metadata.as_ref().unwrap().clone()));
doc.insert(
"metadata",
Bson::Binary(
BinarySubtype::Generic,
self.metadata.as_ref().unwrap().clone(),
),
);
}

doc
Expand All @@ -647,7 +678,9 @@ impl CachedChunk {
CachedChunk {
n: n,
data: Vec::new(),
err: Some(Error::DefaultError(String::from("Chunk has not yet been initialized"))),
err: Some(Error::DefaultError(
String::from("Chunk has not yet been initialized"),
)),
}
}
}
77 changes: 46 additions & 31 deletions src/topology/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use {Client, Result};
use Error::{self, ArgumentError, OperationError};

use bson::{self, Bson, oid};
use chrono::{DateTime, UTC};
use chrono::{DateTime, Utc};

use coll::options::FindOptions;
use command_type::CommandType;
Expand Down Expand Up @@ -33,7 +33,7 @@ pub struct IsMasterResult {
pub is_master: bool,
pub max_bson_object_size: i64,
pub max_message_size_bytes: i64,
pub local_time: Option<DateTime<UTC>>,
pub local_time: Option<DateTime<Utc>>,
pub min_wire_version: i64,
pub max_wire_version: i64,

Expand Down Expand Up @@ -220,13 +220,14 @@ impl IsMasterResult {

impl Monitor {
/// Returns a new monitor connected to the server.
pub fn new(client: Client,
host: Host,
pool: Arc<ConnectionPool>,
top_description: Arc<RwLock<TopologyDescription>>,
server_description: Arc<RwLock<ServerDescription>>,
connector: StreamConnector)
-> Monitor {
pub fn new(
client: Client,
host: Host,
pool: Arc<ConnectionPool>,
top_description: Arc<RwLock<TopologyDescription>>,
server_description: Arc<RwLock<ServerDescription>>,
connector: StreamConnector,
) -> Monitor {
Monitor {
client: client,
host: host.clone(),
Expand Down Expand Up @@ -263,15 +264,17 @@ impl Monitor {

let time_start = time::get_time();

let cursor = try!(Cursor::query_with_stream(stream,
self.client.clone(),
String::from("local.$cmd"),
flags,
filter.clone(),
options,
CommandType::IsMaster,
false,
None));
let cursor = try!(Cursor::query_with_stream(
stream,
self.client.clone(),
String::from("local.$cmd"),
flags,
filter.clone(),
options,
CommandType::IsMaster,
false,
None,
));

let time_end = time::get_time();

Expand All @@ -291,18 +294,21 @@ impl Monitor {

// Updates the server description associated with this monitor using an isMaster server
// response.
fn update_server_description(&self,
doc: bson::Document,
round_trip_time: i64)
-> Result<ServerDescription> {
fn update_server_description(
&self,
doc: bson::Document,
round_trip_time: i64,
) -> Result<ServerDescription> {

let ismaster_result = IsMasterResult::new(doc);
let mut server_description = self.server_description.write().unwrap();
match ismaster_result {
Ok(ismaster) => server_description.update(ismaster, round_trip_time),
Err(err) => {
server_description.set_err(err);
return Err(OperationError(String::from("Failed to parse ismaster result.")));
return Err(OperationError(
String::from("Failed to parse ismaster result."),
));
}
}

Expand All @@ -312,10 +318,12 @@ impl Monitor {
// Updates the topology description associated with this monitor using a new server description.
fn update_top_description(&self, description: ServerDescription) {
let mut top_description = self.top_description.write().unwrap();
top_description.update(self.host.clone(),
description,
self.client.clone(),
self.top_description.clone());
top_description.update(
self.host.clone(),
description,
self.client.clone(),
self.top_description.clone(),
);
}

// Updates server and topology descriptions using a successful isMaster cursor result.
Expand All @@ -330,7 +338,9 @@ impl Monitor {
self.set_err(err);
}
None => {
self.set_err(OperationError(String::from("ismaster returned no response.")));
self.set_err(OperationError(
String::from("ismaster returned no response."),
));
}
}
}
Expand Down Expand Up @@ -379,12 +389,17 @@ impl Monitor {
self.execute_update();

if let Ok(description) = self.top_description.read() {
self.heartbeat_frequency_ms.store(description.heartbeat_frequency_ms as usize,
Ordering::SeqCst);
self.heartbeat_frequency_ms.store(
description.heartbeat_frequency_ms as usize,
Ordering::SeqCst,
);
}

let frequency = self.heartbeat_frequency_ms.load(Ordering::SeqCst) as u64;
guard = self.condvar.wait_timeout(guard, Duration::from_millis(frequency)).unwrap().0;
guard = self.condvar
.wait_timeout(guard, Duration::from_millis(frequency))
.unwrap()
.0;
}
}
}

0 comments on commit f513ef3

Please sign in to comment.