Skip to content

Commit

Permalink
Merge pull request #201 from vapor/release
Browse files Browse the repository at this point in the history
mysql 3.0 release
  • Loading branch information
tanner0101 authored Jul 19, 2018
2 parents 0b926de + f806b71 commit 8717e4a
Show file tree
Hide file tree
Showing 37 changed files with 323 additions and 285 deletions.
90 changes: 0 additions & 90 deletions Sources/MySQL/Codable/MySQLQueryEncoder.swift

This file was deleted.

1 change: 0 additions & 1 deletion Sources/MySQL/Connection/MySQLColumn.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,3 @@ extension Dictionary where Key == MySQLColumn {
return self[MySQLColumn(table: table, name: column)]
}
}

38 changes: 20 additions & 18 deletions Sources/MySQL/Connection/MySQLConnection+Connect.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ import Crypto

extension MySQLConnection {
/// Connects to a MySQL server using TCP.
public static func connect(
config: MySQLDatabaseConfig,
on worker: Worker,
onError: @escaping (Error) -> ()
) -> Future<MySQLConnection> {
let handler = MySQLConnectionHandler(config: config)
///
/// MySQLConnection.connect(config: .root(database: "vapor"), on: ...)
///
/// - parameters:
/// - config: Connection configuration options.
/// - worker: Event loop to run the connection on.
public static func connect(config: MySQLDatabaseConfig, on worker: Worker) -> Future<MySQLConnection> {
let ready = worker.eventLoop.newPromise(Void.self)
let handler = MySQLConnectionHandler(config: config, ready: ready)
let bootstrap = ClientBootstrap(group: worker.eventLoop)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelInitializer { channel in
return channel.pipeline.addMySQLClientHandlers().then {
channel.pipeline.add(handler: handler)
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.channelInitializer { channel in
return channel.pipeline.addMySQLClientHandlers().then {
return channel.pipeline.add(handler: handler)
}
}
}
return bootstrap.connect(host: config.hostname, port: config.port).map { channel -> MySQLConnection in
return .init(handler: handler, channel: channel)
}.flatMap { conn in
let rfq = worker.eventLoop.newPromise(Void.self)
handler.readyForQuery = rfq
return rfq.futureResult.map { conn }
let channel = bootstrap.connect(host: config.hostname, port: config.port)
channel.catch { ready.fail(error: $0) }
return channel.flatMap { channel in
let conn = MySQLConnection(handler: handler, channel: channel)
return ready.futureResult.transform(to: conn)
}
}
}

extension ChannelPipeline {
/// Adds MySQL packet encoder and decoder to the channel pipeline.
func addMySQLClientHandlers(first: Bool = false) -> Future<Void> {
public func addMySQLClientHandlers(first: Bool = false) -> Future<Void> {
let session = MySQLPacketState()
return addHandlers(MySQLPacketEncoder(session: session), MySQLPacketDecoder(session: session), first: first)
}
Expand Down
9 changes: 9 additions & 0 deletions Sources/MySQL/Connection/MySQLConnection+Metadata.swift
Original file line number Diff line number Diff line change
@@ -1,21 +1,30 @@
extension MySQLConnection {
/// Query result metadata.
///
/// conn.lastMetadata?.affectedRows
///
public struct Metadata {
/// Root OK packet.
private let ok: MySQLPacket.OK

/// Number of affected rows from the last query.
public var affectedRows: UInt64 {
return ok.affectedRows
}

/// `AUTO_INCREMENT` insert ID from the last query (if exists).
public var lastInsertID: UInt64? {
return ok.lastInsertID
}

/// Casts the `lastInsertID` to a generic `FixedWidthInteger`.
public func lastInsertID<I>(as type: I.Type = I.self) -> I?
where I: FixedWidthInteger
{
return lastInsertID.flatMap(numericCast)
}

/// Creates a new query result metadata.
init(_ ok: MySQLPacket.OK) {
self.ok = ok
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/MySQL/Connection/MySQLConnection+SimpleQuery.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ extension MySQLConnection {
var rows: [[MySQLColumn: MySQLData]] = []
return simpleQuery(string) { row in
rows.append(row)
}.map(to: [[MySQLColumn: MySQLData]].self) {
}.map {
return rows
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import NIOOpenSSL

/// Supported options for MySQL connection TLS.
public struct MySQLTransportConfig {
/// Does not attempt to enable TLS (this is the default).
public static var cleartext: MySQLTransportConfig {
Expand Down
28 changes: 13 additions & 15 deletions Sources/MySQL/Connection/MySQLConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ public final class MySQLConnection: BasicWorker, DatabaseConnection, DatabaseQue

/// The channel
private let channel: Channel

/// Currently running `send(...)`.
private var currentSend: Promise<Void>?

/// Close has been requested.
private var isClosing: Bool
Expand All @@ -49,18 +46,19 @@ public final class MySQLConnection: BasicWorker, DatabaseConnection, DatabaseQue
// currently running calls to `send(...)`.
channel.closeFuture.always {
self.isClosed = true
if let current = self.currentSend {
switch handler.state {
// connection is closing, the handler is not going to be ready for query
case .nascent(let ready): ready.fail(error: closeError)
case .callback(let currentSend, _):
if self.isClosing {
// if we're closing, this is the close's current send
// so complete it!
current.succeed()
currentSend.succeed()
} else {
// if currently sending, fail it
current.fail(error: closeError)
currentSend.fail(error: closeError)
}
} else if let rfq = handler.readyForQuery {
// connection is closing, the handler is not going to be ready for query
rfq.fail(error: closeError)
case .waiting: break
}
}
}
Expand All @@ -78,16 +76,13 @@ public final class MySQLConnection: BasicWorker, DatabaseConnection, DatabaseQue
return eventLoop.newFailedFuture(error: closeError)
}

// if currentSend is not nil, previous send has not completed
assert(currentSend == nil, "Attempting to call `send(...)` again before previous invocation has completed.")
switch handler.state {
case .waiting: break
default: assertionFailure("Attempting to call `send(...)` while handler is still: \(handler.state).")
}

// create a new promise and store it
let promise = eventLoop.newPromise(Void.self)
currentSend = promise

handler.state = .callback(promise) { packet in
switch packet {
Expand All @@ -108,8 +103,7 @@ public final class MySQLConnection: BasicWorker, DatabaseConnection, DatabaseQue
channel.flush()

// FIXME: parse metadata from ok packet

promise.futureResult.always { self.currentSend = nil }

return promise.futureResult
}

Expand All @@ -120,7 +114,11 @@ public final class MySQLConnection: BasicWorker, DatabaseConnection, DatabaseQue

/// Closes this client.
public func close(done promise: Promise<Void>?) {
assert(currentSend == nil, "Cannot close while sending.")
switch handler.state {
case .waiting: break
case .nascent: fatalError("Cannot close while still connecting.")
case .callback: fatalError("Cannot close during a query.")
}
self.isClosing = true
let done = send([.quit]) { packet in
return true
Expand Down
Loading

0 comments on commit 8717e4a

Please sign in to comment.