-
Notifications
You must be signed in to change notification settings - Fork 237
/
multi.rs
1331 lines (1224 loc) · 47.8 KB
/
multi.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
//! Multi - initiating multiple requests simultaneously
use std::fmt;
use std::marker;
use std::ptr;
use std::sync::Arc;
use std::time::Duration;
use libc::{c_char, c_int, c_long, c_short, c_void};
#[cfg(unix)]
use libc::{pollfd, POLLIN, POLLOUT, POLLPRI};
use crate::easy::{Easy, Easy2, List};
use crate::panic;
use crate::{Error, MultiError};
/// A multi handle for initiating multiple connections simultaneously.
///
/// This structure corresponds to `CURLM` in libcurl and provides the ability to
/// have multiple transfers in flight simultaneously. This handle is then used
/// to manage each transfer. The main purpose of a `CURLM` is for the
/// *application* to drive the I/O rather than libcurl itself doing all the
/// blocking. Methods like `action` allow the application to inform libcurl of
/// when events have happened.
///
/// Lots more documentation can be found on the libcurl [multi tutorial] where
/// the APIs correspond pretty closely with this crate.
///
/// [multi tutorial]: https://curl.haxx.se/libcurl/c/libcurl-multi.html
pub struct Multi {
raw: Arc<RawMulti>,
data: Box<MultiData>,
}
#[derive(Debug)]
struct RawMulti {
handle: *mut curl_sys::CURLM,
}
struct MultiData {
socket: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
timer: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
}
/// Message from the `messages` function of a multi handle.
///
/// Currently only indicates whether a transfer is done.
pub struct Message<'multi> {
ptr: *mut curl_sys::CURLMsg,
_multi: &'multi Multi,
}
/// Wrapper around an easy handle while it's owned by a multi handle.
///
/// Once an easy handle has been added to a multi handle then it can no longer
/// be used via `perform`. This handle is also used to remove the easy handle
/// from the multi handle when desired.
pub struct EasyHandle {
// Safety: This *must* be before `easy` as it must be dropped first.
guard: DetachGuard,
easy: Easy,
// This is now effectively bound to a `Multi`, so it is no longer sendable.
_marker: marker::PhantomData<&'static Multi>,
}
/// Wrapper around an easy handle while it's owned by a multi handle.
///
/// Once an easy handle has been added to a multi handle then it can no longer
/// be used via `perform`. This handle is also used to remove the easy handle
/// from the multi handle when desired.
pub struct Easy2Handle<H> {
// Safety: This *must* be before `easy` as it must be dropped first.
guard: DetachGuard,
easy: Easy2<H>,
// This is now effectively bound to a `Multi`, so it is no longer sendable.
_marker: marker::PhantomData<&'static Multi>,
}
/// A guard struct which guarantees that `curl_multi_remove_handle` will be
/// called on an easy handle, either manually or on drop.
struct DetachGuard {
multi: Arc<RawMulti>,
easy: *mut curl_sys::CURL,
}
/// Notification of the events that have happened on a socket.
///
/// This type is passed as an argument to the `action` method on a multi handle
/// to indicate what events have occurred on a socket.
pub struct Events {
bits: c_int,
}
/// Notification of events that are requested on a socket.
///
/// This type is yielded to the `socket_function` callback to indicate what
/// events are requested on a socket.
pub struct SocketEvents {
bits: c_int,
}
/// Raw underlying socket type that the multi handles use
pub type Socket = curl_sys::curl_socket_t;
/// File descriptor to wait on for use with the `wait` method on a multi handle.
pub struct WaitFd {
inner: curl_sys::curl_waitfd,
}
/// A handle that can be used to wake up a thread that's blocked in [Multi::poll].
/// The handle can be passed to and used from any thread.
#[cfg(feature = "poll_7_68_0")]
#[derive(Debug, Clone)]
pub struct MultiWaker {
raw: std::sync::Weak<RawMulti>,
}
#[cfg(feature = "poll_7_68_0")]
unsafe impl Send for MultiWaker {}
#[cfg(feature = "poll_7_68_0")]
unsafe impl Sync for MultiWaker {}
impl Multi {
/// Creates a new multi session through which multiple HTTP transfers can be
/// initiated.
pub fn new() -> Multi {
unsafe {
crate::init();
let ptr = curl_sys::curl_multi_init();
assert!(!ptr.is_null());
Multi {
raw: Arc::new(RawMulti { handle: ptr }),
data: Box::new(MultiData {
socket: Box::new(|_, _, _| ()),
timer: Box::new(|_| true),
}),
}
}
}
/// Set the callback informed about what to wait for
///
/// When the `action` function runs, it informs the application about
/// updates in the socket (file descriptor) status by doing none, one, or
/// multiple calls to the socket callback. The callback gets status updates
/// with changes since the previous time the callback was called. See
/// `action` for more details on how the callback is used and should work.
///
/// The `SocketEvents` parameter informs the callback on the status of the
/// given socket, and the methods on that type can be used to learn about
/// what's going on with the socket.
///
/// The third `usize` parameter is a custom value set by the `assign` method
/// below.
pub fn socket_function<F>(&mut self, f: F) -> Result<(), MultiError>
where
F: FnMut(Socket, SocketEvents, usize) + Send + 'static,
{
self._socket_function(Box::new(f))
}
fn _socket_function(
&mut self,
f: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
) -> Result<(), MultiError> {
self.data.socket = f;
let cb: curl_sys::curl_socket_callback = cb;
self.setopt_ptr(
curl_sys::CURLMOPT_SOCKETFUNCTION,
cb as usize as *const c_char,
)?;
let ptr = &*self.data as *const _;
self.setopt_ptr(curl_sys::CURLMOPT_SOCKETDATA, ptr as *const c_char)?;
return Ok(());
// TODO: figure out how to expose `_easy`
extern "C" fn cb(
_easy: *mut curl_sys::CURL,
socket: curl_sys::curl_socket_t,
what: c_int,
userptr: *mut c_void,
socketp: *mut c_void,
) -> c_int {
panic::catch(|| unsafe {
let f = &mut (*(userptr as *mut MultiData)).socket;
f(socket, SocketEvents { bits: what }, socketp as usize)
});
0
}
}
/// Set data to associate with an internal socket
///
/// This function creates an association in the multi handle between the
/// given socket and a private token of the application. This is designed
/// for `action` uses.
///
/// When set, the token will be passed to all future socket callbacks for
/// the specified socket.
///
/// If the given socket isn't already in use by libcurl, this function will
/// return an error.
///
/// libcurl only keeps one single token associated with a socket, so
/// calling this function several times for the same socket will make the
/// last set token get used.
///
/// The idea here being that this association (socket to token) is something
/// that just about every application that uses this API will need and then
/// libcurl can just as well do it since it already has an internal hash
/// table lookup for this.
///
/// # Typical Usage
///
/// In a typical application you allocate a struct or at least use some kind
/// of semi-dynamic data for each socket that we must wait for action on
/// when using the `action` approach.
///
/// When our socket-callback gets called by libcurl and we get to know about
/// yet another socket to wait for, we can use `assign` to point out the
/// particular data so that when we get updates about this same socket
/// again, we don't have to find the struct associated with this socket by
/// ourselves.
pub fn assign(&self, socket: Socket, token: usize) -> Result<(), MultiError> {
unsafe {
cvt(curl_sys::curl_multi_assign(
self.raw.handle,
socket,
token as *mut _,
))?;
Ok(())
}
}
/// Set callback to receive timeout values
///
/// Certain features, such as timeouts and retries, require you to call
/// libcurl even when there is no activity on the file descriptors.
///
/// Your callback function should install a non-repeating timer with the
/// interval specified. Each time that timer fires, call either `action` or
/// `perform`, depending on which interface you use.
///
/// A timeout value of `None` means you should delete your timer.
///
/// A timeout value of 0 means you should call `action` or `perform` (once)
/// as soon as possible.
///
/// This callback will only be called when the timeout changes.
///
/// The timer callback should return `true` on success, and `false` on
/// error. This callback can be used instead of, or in addition to,
/// `get_timeout`.
pub fn timer_function<F>(&mut self, f: F) -> Result<(), MultiError>
where
F: FnMut(Option<Duration>) -> bool + Send + 'static,
{
self._timer_function(Box::new(f))
}
fn _timer_function(
&mut self,
f: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
) -> Result<(), MultiError> {
self.data.timer = f;
let cb: curl_sys::curl_multi_timer_callback = cb;
self.setopt_ptr(
curl_sys::CURLMOPT_TIMERFUNCTION,
cb as usize as *const c_char,
)?;
let ptr = &*self.data as *const _;
self.setopt_ptr(curl_sys::CURLMOPT_TIMERDATA, ptr as *const c_char)?;
return Ok(());
// TODO: figure out how to expose `_multi`
extern "C" fn cb(
_multi: *mut curl_sys::CURLM,
timeout_ms: c_long,
user: *mut c_void,
) -> c_int {
let keep_going = panic::catch(|| unsafe {
let f = &mut (*(user as *mut MultiData)).timer;
if timeout_ms == -1 {
f(None)
} else {
f(Some(Duration::from_millis(timeout_ms as u64)))
}
})
.unwrap_or(false);
if keep_going {
0
} else {
-1
}
}
}
/// Enable or disable HTTP pipelining and multiplexing.
///
/// When http_1 is true, enable HTTP/1.1 pipelining, which means that if
/// you add a second request that can use an already existing connection,
/// the second request will be "piped" on the same connection rather than
/// being executed in parallel.
///
/// When multiplex is true, enable HTTP/2 multiplexing, which means that
/// follow-up requests can re-use an existing connection and send the new
/// request multiplexed over that at the same time as other transfers are
/// already using that single connection.
pub fn pipelining(&mut self, http_1: bool, multiplex: bool) -> Result<(), MultiError> {
let bitmask = if http_1 { curl_sys::CURLPIPE_HTTP1 } else { 0 }
| if multiplex {
curl_sys::CURLPIPE_MULTIPLEX
} else {
0
};
self.setopt_long(curl_sys::CURLMOPT_PIPELINING, bitmask)
}
/// Sets the max number of connections to a single host.
///
/// Pass a long to indicate the max number of simultaneously open connections
/// to a single host (a host being the same as a host name + port number pair).
/// For each new session to a host, libcurl will open up a new connection up to the
/// limit set by the provided value. When the limit is reached, the sessions will
/// be pending until a connection becomes available. If pipelining is enabled,
/// libcurl will try to pipeline if the host is capable of it.
pub fn set_max_host_connections(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_HOST_CONNECTIONS, val as c_long)
}
/// Sets the max simultaneously open connections.
///
/// The set number will be used as the maximum number of simultaneously open
/// connections in total using this multi handle. For each new session,
/// libcurl will open a new connection up to the limit set by the provided
/// value. When the limit is reached, the sessions will be pending until
/// there are available connections. If pipelining is enabled, libcurl will
/// try to pipeline or use multiplexing if the host is capable of it.
pub fn set_max_total_connections(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_TOTAL_CONNECTIONS, val as c_long)
}
/// Set size of connection cache.
///
/// The set number will be used as the maximum amount of simultaneously open
/// connections that libcurl may keep in its connection cache after
/// completed use. By default libcurl will enlarge the size for each added
/// easy handle to make it fit 4 times the number of added easy handles.
///
/// By setting this option, you can prevent the cache size from growing
/// beyond the limit set by you.
///
/// When the cache is full, curl closes the oldest one in the cache to
/// prevent the number of open connections from increasing.
///
/// See [`set_max_total_connections`](#method.set_max_total_connections) for
/// limiting the number of active connections.
pub fn set_max_connects(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAXCONNECTS, val as c_long)
}
/// Sets the pipeline length.
///
/// This sets the max number that will be used as the maximum amount of
/// outstanding requests in an HTTP/1.1 pipelined connection. This option
/// is only used for HTTP/1.1 pipelining, and not HTTP/2 multiplexing.
pub fn set_pipeline_length(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_PIPELINE_LENGTH, val as c_long)
}
/// Sets the number of max concurrent streams for http2.
///
/// This sets the max number will be used as the maximum number of
/// concurrent streams for a connections that libcurl should support on
/// connections done using HTTP/2. Defaults to 100.
pub fn set_max_concurrent_streams(&mut self, val: usize) -> Result<(), MultiError> {
self.setopt_long(curl_sys::CURLMOPT_MAX_CONCURRENT_STREAMS, val as c_long)
}
fn setopt_long(&mut self, opt: curl_sys::CURLMoption, val: c_long) -> Result<(), MultiError> {
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
}
fn setopt_ptr(
&mut self,
opt: curl_sys::CURLMoption,
val: *const c_char,
) -> Result<(), MultiError> {
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
}
/// Add an easy handle to a multi session
///
/// Adds a standard easy handle to the multi stack. This function call will
/// make this multi handle control the specified easy handle.
///
/// When an easy interface is added to a multi handle, it will use a shared
/// connection cache owned by the multi handle. Removing and adding new easy
/// handles will not affect the pool of connections or the ability to do
/// connection re-use.
///
/// If you have `timer_function` set in the multi handle (and you really
/// should if you're working event-based with `action` and friends), that
/// callback will be called from within this function to ask for an updated
/// timer so that your main event loop will get the activity on this handle
/// to get started.
///
/// The easy handle will remain added to the multi handle until you remove
/// it again with `remove` on the returned handle - even when a transfer
/// with that specific easy handle is completed.
pub fn add(&self, mut easy: Easy) -> Result<EasyHandle, MultiError> {
// Clear any configuration set by previous transfers because we're
// moving this into a `Send+'static` situation now basically.
easy.transfer();
unsafe {
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
}
Ok(EasyHandle {
guard: DetachGuard {
multi: self.raw.clone(),
easy: easy.raw(),
},
easy,
_marker: marker::PhantomData,
})
}
/// Same as `add`, but works with the `Easy2` type.
pub fn add2<H>(&self, easy: Easy2<H>) -> Result<Easy2Handle<H>, MultiError> {
unsafe {
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
}
Ok(Easy2Handle {
guard: DetachGuard {
multi: self.raw.clone(),
easy: easy.raw(),
},
easy,
_marker: marker::PhantomData,
})
}
/// Remove an easy handle from this multi session
///
/// Removes the easy handle from this multi handle. This will make the
/// returned easy handle be removed from this multi handle's control.
///
/// When the easy handle has been removed from a multi stack, it is again
/// perfectly legal to invoke `perform` on it.
///
/// Removing an easy handle while being used is perfectly legal and will
/// effectively halt the transfer in progress involving that easy handle.
/// All other easy handles and transfers will remain unaffected.
pub fn remove(&self, mut easy: EasyHandle) -> Result<Easy, MultiError> {
easy.guard.detach()?;
Ok(easy.easy)
}
/// Same as `remove`, but for `Easy2Handle`.
pub fn remove2<H>(&self, mut easy: Easy2Handle<H>) -> Result<Easy2<H>, MultiError> {
easy.guard.detach()?;
Ok(easy.easy)
}
/// Read multi stack informationals
///
/// Ask the multi handle if there are any messages/informationals from the
/// individual transfers. Messages may include informationals such as an
/// error code from the transfer or just the fact that a transfer is
/// completed. More details on these should be written down as well.
pub fn messages<F>(&self, mut f: F)
where
F: FnMut(Message),
{
self._messages(&mut f)
}
fn _messages(&self, f: &mut dyn FnMut(Message)) {
let mut queue = 0;
unsafe {
loop {
let ptr = curl_sys::curl_multi_info_read(self.raw.handle, &mut queue);
if ptr.is_null() {
break;
}
f(Message { ptr, _multi: self })
}
}
}
/// Inform of reads/writes available data given an action
///
/// When the application has detected action on a socket handled by libcurl,
/// it should call this function with the sockfd argument set to
/// the socket with the action. When the events on a socket are known, they
/// can be passed `events`. When the events on a socket are unknown, pass
/// `Events::new()` instead, and libcurl will test the descriptor
/// internally.
///
/// The returned integer will contain the number of running easy handles
/// within the multi handle. When this number reaches zero, all transfers
/// are complete/done. When you call `action` on a specific socket and the
/// counter decreases by one, it DOES NOT necessarily mean that this exact
/// socket/transfer is the one that completed. Use `messages` to figure out
/// which easy handle that completed.
///
/// The `action` function informs the application about updates in the
/// socket (file descriptor) status by doing none, one, or multiple calls to
/// the socket callback function set with the `socket_function` method. They
/// update the status with changes since the previous time the callback was
/// called.
pub fn action(&self, socket: Socket, events: &Events) -> Result<u32, MultiError> {
let mut remaining = 0;
unsafe {
cvt(curl_sys::curl_multi_socket_action(
self.raw.handle,
socket,
events.bits,
&mut remaining,
))?;
Ok(remaining as u32)
}
}
/// Inform libcurl that a timeout has expired and sockets should be tested.
///
/// The returned integer will contain the number of running easy handles
/// within the multi handle. When this number reaches zero, all transfers
/// are complete/done. When you call `action` on a specific socket and the
/// counter decreases by one, it DOES NOT necessarily mean that this exact
/// socket/transfer is the one that completed. Use `messages` to figure out
/// which easy handle that completed.
///
/// Get the timeout time by calling the `timer_function` method. Your
/// application will then get called with information on how long to wait
/// for socket actions at most before doing the timeout action: call the
/// `timeout` method. You can also use the `get_timeout` function to
/// poll the value at any given time, but for an event-based system using
/// the callback is far better than relying on polling the timeout value.
pub fn timeout(&self) -> Result<u32, MultiError> {
let mut remaining = 0;
unsafe {
cvt(curl_sys::curl_multi_socket_action(
self.raw.handle,
curl_sys::CURL_SOCKET_BAD,
0,
&mut remaining,
))?;
Ok(remaining as u32)
}
}
/// Get how long to wait for action before proceeding
///
/// An application using the libcurl multi interface should call
/// `get_timeout` to figure out how long it should wait for socket actions -
/// at most - before proceeding.
///
/// Proceeding means either doing the socket-style timeout action: call the
/// `timeout` function, or call `perform` if you're using the simpler and
/// older multi interface approach.
///
/// The timeout value returned is the duration at this very moment. If 0, it
/// means you should proceed immediately without waiting for anything. If it
/// returns `None`, there's no timeout at all set.
///
/// Note: if libcurl returns a `None` timeout here, it just means that
/// libcurl currently has no stored timeout value. You must not wait too
/// long (more than a few seconds perhaps) before you call `perform` again.
pub fn get_timeout(&self) -> Result<Option<Duration>, MultiError> {
let mut ms = 0;
unsafe {
cvt(curl_sys::curl_multi_timeout(self.raw.handle, &mut ms))?;
if ms == -1 {
Ok(None)
} else {
Ok(Some(Duration::from_millis(ms as u64)))
}
}
}
/// Block until activity is detected or a timeout passes.
///
/// The timeout is used in millisecond-precision. Large durations are
/// clamped at the maximum value curl accepts.
///
/// The returned integer will contain the number of internal file
/// descriptors on which interesting events occured.
///
/// This function is a simpler alternative to using `fdset()` and `select()`
/// and does not suffer from file descriptor limits.
///
/// # Example
///
/// ```
/// use curl::multi::Multi;
/// use std::time::Duration;
///
/// let m = Multi::new();
///
/// // Add some Easy handles...
///
/// while m.perform().unwrap() > 0 {
/// m.wait(&mut [], Duration::from_secs(1)).unwrap();
/// }
/// ```
pub fn wait(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
let timeout_ms = Multi::timeout_i32(timeout);
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_wait(
self.raw.handle,
waitfds.as_mut_ptr() as *mut _,
waitfds.len() as u32,
timeout_ms,
&mut ret,
))?;
Ok(ret as u32)
}
}
fn timeout_i32(timeout: Duration) -> i32 {
let secs = timeout.as_secs();
if secs > (i32::MAX / 1000) as u64 {
// Duration too large, clamp at maximum value.
i32::MAX
} else {
secs as i32 * 1000 + timeout.subsec_nanos() as i32 / 1_000_000
}
}
/// Block until activity is detected or a timeout passes.
///
/// The timeout is used in millisecond-precision. Large durations are
/// clamped at the maximum value curl accepts.
///
/// The returned integer will contain the number of internal file
/// descriptors on which interesting events occurred.
///
/// This function is a simpler alternative to using `fdset()` and `select()`
/// and does not suffer from file descriptor limits.
///
/// While this method is similar to [Multi::wait], with the following
/// distinctions:
/// * If there are no handles added to the multi, poll will honor the
/// provided timeout, while [Multi::wait] returns immediately.
/// * If poll has blocked due to there being no activity on the handles in
/// the Multi, it can be woken up from any thread and at any time before
/// the timeout expires.
///
/// Requires libcurl 7.66.0 or later.
///
/// # Example
///
/// ```
/// use curl::multi::Multi;
/// use std::time::Duration;
///
/// let m = Multi::new();
///
/// // Add some Easy handles...
///
/// while m.perform().unwrap() > 0 {
/// m.poll(&mut [], Duration::from_secs(1)).unwrap();
/// }
/// ```
#[cfg(feature = "poll_7_68_0")]
pub fn poll(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
let timeout_ms = Multi::timeout_i32(timeout);
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_poll(
self.raw.handle,
waitfds.as_mut_ptr() as *mut _,
waitfds.len() as u32,
timeout_ms,
&mut ret,
))?;
Ok(ret as u32)
}
}
/// Returns a new [MultiWaker] that can be used to wake up a thread that's
/// currently blocked in [Multi::poll].
#[cfg(feature = "poll_7_68_0")]
pub fn waker(&self) -> MultiWaker {
MultiWaker::new(Arc::downgrade(&self.raw))
}
/// Reads/writes available data from each easy handle.
///
/// This function handles transfers on all the added handles that need
/// attention in an non-blocking fashion.
///
/// When an application has found out there's data available for this handle
/// or a timeout has elapsed, the application should call this function to
/// read/write whatever there is to read or write right now etc. This
/// method returns as soon as the reads/writes are done. This function does
/// not require that there actually is any data available for reading or
/// that data can be written, it can be called just in case. It will return
/// the number of handles that still transfer data.
///
/// If the amount of running handles is changed from the previous call (or
/// is less than the amount of easy handles you've added to the multi
/// handle), you know that there is one or more transfers less "running".
/// You can then call `info` to get information about each individual
/// completed transfer, and that returned info includes `Error` and more.
/// If an added handle fails very quickly, it may never be counted as a
/// running handle.
///
/// When running_handles is set to zero (0) on the return of this function,
/// there is no longer any transfers in progress.
///
/// # Return
///
/// Before libcurl version 7.20.0: If you receive `is_call_perform`, this
/// basically means that you should call `perform` again, before you select
/// on more actions. You don't have to do it immediately, but the return
/// code means that libcurl may have more data available to return or that
/// there may be more data to send off before it is "satisfied". Do note
/// that `perform` will return `is_call_perform` only when it wants to be
/// called again immediately. When things are fine and there is nothing
/// immediate it wants done, it'll return `Ok` and you need to wait for
/// "action" and then call this function again.
///
/// This function only returns errors etc regarding the whole multi stack.
/// Problems still might have occurred on individual transfers even when
/// this function returns `Ok`. Use `info` to figure out how individual
/// transfers did.
pub fn perform(&self) -> Result<u32, MultiError> {
unsafe {
let mut ret = 0;
cvt(curl_sys::curl_multi_perform(self.raw.handle, &mut ret))?;
Ok(ret as u32)
}
}
/// Extracts file descriptor information from a multi handle
///
/// This function extracts file descriptor information from a given
/// handle, and libcurl returns its `fd_set` sets. The application can use
/// these to `select()` on, but be sure to `FD_ZERO` them before calling
/// this function as curl_multi_fdset only adds its own descriptors, it
/// doesn't zero or otherwise remove any others. The curl_multi_perform
/// function should be called as soon as one of them is ready to be read
/// from or written to.
///
/// If no file descriptors are set by libcurl, this function will return
/// `Ok(None)`. Otherwise `Ok(Some(n))` will be returned where `n` the
/// highest descriptor number libcurl set. When `Ok(None)` is returned it
/// is because libcurl currently does something that isn't possible for
/// your application to monitor with a socket and unfortunately you can
/// then not know exactly when the current action is completed using
/// `select()`. You then need to wait a while before you proceed and call
/// `perform` anyway.
///
/// When doing `select()`, you should use `get_timeout` to figure out
/// how long to wait for action. Call `perform` even if no activity has
/// been seen on the `fd_set`s after the timeout expires as otherwise
/// internal retries and timeouts may not work as you'd think and want.
///
/// If one of the sockets used by libcurl happens to be larger than what
/// can be set in an `fd_set`, which on POSIX systems means that the file
/// descriptor is larger than `FD_SETSIZE`, then libcurl will try to not
/// set it. Setting a too large file descriptor in an `fd_set` implies an out
/// of bounds write which can cause crashes, or worse. The effect of NOT
/// storing it will possibly save you from the crash, but will make your
/// program NOT wait for sockets it should wait for...
pub fn fdset2(
&self,
read: Option<&mut curl_sys::fd_set>,
write: Option<&mut curl_sys::fd_set>,
except: Option<&mut curl_sys::fd_set>,
) -> Result<Option<i32>, MultiError> {
unsafe {
let mut ret = 0;
let read = read.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
let write = write.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
let except = except.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
cvt(curl_sys::curl_multi_fdset(
self.raw.handle,
read,
write,
except,
&mut ret,
))?;
if ret == -1 {
Ok(None)
} else {
Ok(Some(ret))
}
}
}
/// Does nothing and returns `Ok(())`. This method remains for backwards
/// compatibility.
///
/// This method will be changed to take `self` in a future release.
#[doc(hidden)]
#[deprecated(
since = "0.4.30",
note = "cannot close safely without consuming self; \
will be changed or removed in a future release"
)]
pub fn close(&self) -> Result<(), MultiError> {
Ok(())
}
/// Get a pointer to the raw underlying CURLM handle.
pub fn raw(&self) -> *mut curl_sys::CURLM {
self.raw.handle
}
}
impl Drop for RawMulti {
fn drop(&mut self) {
unsafe {
let _ = cvt(curl_sys::curl_multi_cleanup(self.handle));
}
}
}
#[cfg(feature = "poll_7_68_0")]
impl MultiWaker {
/// Creates a new MultiWaker handle.
fn new(raw: std::sync::Weak<RawMulti>) -> Self {
Self { raw }
}
/// Wakes up a thread that is blocked in [Multi::poll]. This method can be
/// invoked from any thread.
///
/// Will return an error if the RawMulti has already been dropped.
///
/// Requires libcurl 7.68.0 or later.
pub fn wakeup(&self) -> Result<(), MultiError> {
if let Some(raw) = self.raw.upgrade() {
unsafe { cvt(curl_sys::curl_multi_wakeup(raw.handle)) }
} else {
// This happens if the RawMulti has already been dropped:
Err(MultiError::new(curl_sys::CURLM_BAD_HANDLE))
}
}
}
fn cvt(code: curl_sys::CURLMcode) -> Result<(), MultiError> {
if code == curl_sys::CURLM_OK {
Ok(())
} else {
Err(MultiError::new(code))
}
}
impl fmt::Debug for Multi {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Multi").field("raw", &self.raw).finish()
}
}
macro_rules! impl_easy_getters {
() => {
impl_easy_getters! {
time_condition_unmet -> bool,
effective_url -> Option<&str>,
effective_url_bytes -> Option<&[u8]>,
response_code -> u32,
http_connectcode -> u32,
filetime -> Option<i64>,
download_size -> f64,
content_length_download -> f64,
total_time -> Duration,
namelookup_time -> Duration,
connect_time -> Duration,
appconnect_time -> Duration,
pretransfer_time -> Duration,
starttransfer_time -> Duration,
redirect_time -> Duration,
redirect_count -> u32,
redirect_url -> Option<&str>,
redirect_url_bytes -> Option<&[u8]>,
header_size -> u64,
request_size -> u64,
content_type -> Option<&str>,
content_type_bytes -> Option<&[u8]>,
os_errno -> i32,
primary_ip -> Option<&str>,
primary_port -> u16,
local_ip -> Option<&str>,
local_port -> u16,
cookies -> List,
}
};
($($name:ident -> $ret:ty,)*) => {
$(
impl_easy_getters!($name, $ret, concat!(
"Same as [`Easy2::",
stringify!($name),
"`](../easy/struct.Easy2.html#method.",
stringify!($name),
")."
));
)*
};
($name:ident, $ret:ty, $doc:expr) => {
#[doc = $doc]
pub fn $name(&mut self) -> Result<$ret, Error> {
self.easy.$name()
}
};
}
impl EasyHandle {
/// Sets an internal private token for this `EasyHandle`.
///
/// This function will set the `CURLOPT_PRIVATE` field on the underlying
/// easy handle.
pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
unsafe {
crate::cvt(curl_sys::curl_easy_setopt(
self.easy.raw(),
curl_sys::CURLOPT_PRIVATE,
token,
))
}
}
impl_easy_getters!();
/// Unpause reading on a connection.
///
/// Using this function, you can explicitly unpause a connection that was
/// previously paused.
///
/// A connection can be paused by letting the read or the write callbacks
/// return `ReadError::Pause` or `WriteError::Pause`.
///
/// The chance is high that you will get your write callback called before
/// this function returns.
pub fn unpause_read(&self) -> Result<(), Error> {
self.easy.unpause_read()
}
/// Unpause writing on a connection.
///
/// Using this function, you can explicitly unpause a connection that was
/// previously paused.
///
/// A connection can be paused by letting the read or the write callbacks
/// return `ReadError::Pause` or `WriteError::Pause`. A write callback that
/// returns pause signals to the library that it couldn't take care of any
/// data at all, and that data will then be delivered again to the callback
/// when the writing is later unpaused.
pub fn unpause_write(&self) -> Result<(), Error> {
self.easy.unpause_write()
}
/// Get a pointer to the raw underlying CURL handle.
pub fn raw(&self) -> *mut curl_sys::CURL {
self.easy.raw()
}
}
impl fmt::Debug for EasyHandle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
self.easy.fmt(f)
}
}
impl<H> Easy2Handle<H> {
/// Acquires a reference to the underlying handler for events.
pub fn get_ref(&self) -> &H {
self.easy.get_ref()
}
/// Acquires a reference to the underlying handler for events.
pub fn get_mut(&mut self) -> &mut H {
self.easy.get_mut()
}
/// Same as `EasyHandle::set_token`
pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
unsafe {
crate::cvt(curl_sys::curl_easy_setopt(
self.easy.raw(),
curl_sys::CURLOPT_PRIVATE,
token,
))
}
}
impl_easy_getters!();
/// Unpause reading on a connection.
///
/// Using this function, you can explicitly unpause a connection that was