Skip to content

Commit

Permalink
Fix #489. Remove replication factor from CreateDatabase command
Browse files Browse the repository at this point in the history
  • Loading branch information
jvshahid authored and pauldix committed May 27, 2014
1 parent 09a1f17 commit 72bea47
Show file tree
Hide file tree
Showing 8 changed files with 26 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
### Bugfixes

- Fix a bug in shard logic that caused short term shards to be clobbered with long term shards
- [Issue #489](https://github.com/influxdb/influxdb/issues/489). Remove replication factor from CreateDatabase command

## v0.6.3 [2014-05-13]

Expand Down
7 changes: 3 additions & 4 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,7 @@ func (self *HttpServer) writePoints(w libhttp.ResponseWriter, r *libhttp.Request
}

type createDatabaseRequest struct {
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
Name string `json:"name"`
}

func (self *HttpServer) listDatabases(w libhttp.ResponseWriter, r *libhttp.Request) {
Expand All @@ -400,12 +399,12 @@ func (self *HttpServer) createDatabase(w libhttp.ResponseWriter, r *libhttp.Requ
if err != nil {
return libhttp.StatusBadRequest, err.Error()
}
err = self.coordinator.CreateDatabase(user, createRequest.Name, createRequest.ReplicationFactor)
err = self.coordinator.CreateDatabase(user, createRequest.Name)
if err != nil {
log.Error("Cannot create database %s. Error: %s", createRequest.Name, err)
return errorToStatusCode(err), err.Error()
}
log.Debug("Created database %s with replication factor %d", createRequest.Name, createRequest.ReplicationFactor)
log.Debug("Created database %s", createRequest.Name)
return libhttp.StatusCreated, nil
})
}
Expand Down
8 changes: 4 additions & 4 deletions src/api/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ func (self *MockCoordinator) DeleteSeriesData(_ User, db string, query *parser.D
return nil
}

func (self *MockCoordinator) CreateDatabase(_ User, db string, _ uint8) error {
func (self *MockCoordinator) CreateDatabase(_ User, db string) error {
self.db = db
return nil
}

func (self *MockCoordinator) ListDatabases(_ User) ([]*cluster.Database, error) {
return []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}}, nil
return []*cluster.Database{&cluster.Database{"db1"}, &cluster.Database{"db2"}}, nil
}

func (self *MockCoordinator) DropDatabase(_ User, db string) error {
Expand Down Expand Up @@ -749,7 +749,7 @@ func (self *ApiSuite) TestDatabasesIndex(c *C) {
c.Assert(err, IsNil)
err = json.Unmarshal(body, &databases)
c.Assert(err, IsNil)
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1", uint8(1)}, &cluster.Database{"db2", uint8(1)}})
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1"}, &cluster.Database{"db2"}})
}
}

Expand All @@ -768,7 +768,7 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
c.Assert(err, IsNil)
err = json.Unmarshal(body, &databases)
c.Assert(err, IsNil)
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1", 1}, &cluster.Database{"db2", 1}})
c.Assert(databases, DeepEquals, []*cluster.Database{&cluster.Database{"db1"}, &cluster.Database{"db2"}})
}

func (self *ApiSuite) TestContinuousQueryOperations(c *C) {
Expand Down
17 changes: 8 additions & 9 deletions src/cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
*/
type ClusterConfiguration struct {
createDatabaseLock sync.RWMutex
DatabaseReplicationFactors map[string]uint8
DatabaseReplicationFactors map[string]struct{}
usersLock sync.RWMutex
clusterAdmins map[string]*ClusterAdmin
dbUsers map[string]map[string]*DbUser
Expand Down Expand Up @@ -94,8 +94,7 @@ type ContinuousQuery struct {
}

type Database struct {
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
Name string `json:"name"`
}

func NewClusterConfiguration(
Expand All @@ -104,7 +103,7 @@ func NewClusterConfiguration(
shardStore LocalShardStore,
connectionCreator func(string) ServerConnection) *ClusterConfiguration {
return &ClusterConfiguration{
DatabaseReplicationFactors: make(map[string]uint8),
DatabaseReplicationFactors: make(map[string]struct{}),
clusterAdmins: make(map[string]*ClusterAdmin),
dbUsers: make(map[string]map[string]*DbUser),
continuousQueries: make(map[string][]*ContinuousQuery),
Expand Down Expand Up @@ -261,8 +260,8 @@ func (self *ClusterConfiguration) GetDatabases() []*Database {
defer self.createDatabaseLock.RUnlock()

dbs := make([]*Database, 0, len(self.DatabaseReplicationFactors))
for name, rf := range self.DatabaseReplicationFactors {
dbs = append(dbs, &Database{Name: name, ReplicationFactor: rf})
for name, _ := range self.DatabaseReplicationFactors {
dbs = append(dbs, &Database{Name: name})
}
return dbs
}
Expand All @@ -275,14 +274,14 @@ func (self *ClusterConfiguration) DatabaseExists(name string) bool {
}
}

func (self *ClusterConfiguration) CreateDatabase(name string, replicationFactor uint8) error {
func (self *ClusterConfiguration) CreateDatabase(name string) error {
self.createDatabaseLock.Lock()
defer self.createDatabaseLock.Unlock()

if _, ok := self.DatabaseReplicationFactors[name]; ok {
return common.NewDatabaseExistsError(name)
}
self.DatabaseReplicationFactors[name] = replicationFactor
self.DatabaseReplicationFactors[name] = struct{}{}
return nil
}

Expand Down Expand Up @@ -483,7 +482,7 @@ func (self *ClusterConfiguration) SaveClusterAdmin(u *ClusterAdmin) {
}

type SavedConfiguration struct {
Databases map[string]uint8
Databases map[string]struct{}
Admins map[string]*ClusterAdmin
DbUsers map[string]map[string]*DbUser
Servers []*ClusterServer
Expand Down
9 changes: 4 additions & 5 deletions src/coordinator/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,11 @@ func (c *DropDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
}

type CreateDatabaseCommand struct {
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
Name string `json:"name"`
}

func NewCreateDatabaseCommand(name string, replicationFactor uint8) *CreateDatabaseCommand {
return &CreateDatabaseCommand{name, replicationFactor}
func NewCreateDatabaseCommand(name string) *CreateDatabaseCommand {
return &CreateDatabaseCommand{name}
}

func (c *CreateDatabaseCommand) CommandName() string {
Expand All @@ -123,7 +122,7 @@ func (c *CreateDatabaseCommand) CommandName() string {

func (c *CreateDatabaseCommand) Apply(server raft.Server) (interface{}, error) {
config := server.Context().(*cluster.ClusterConfiguration)
err := config.CreateDatabase(c.Name, c.ReplicationFactor)
err := config.CreateDatabase(c.Name)
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,7 @@ func (self *CoordinatorImpl) ListContinuousQueries(user common.User, db string)
return series, nil
}

func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replicationFactor uint8) error {
func (self *CoordinatorImpl) CreateDatabase(user common.User, db string) error {
if !user.IsClusterAdmin() {
return common.NewAuthorizationError("Insufficient permissions to create database")
}
Expand All @@ -766,7 +766,7 @@ func (self *CoordinatorImpl) CreateDatabase(user common.User, db string, replica
return fmt.Errorf("%s isn't a valid db name", db)
}

err := self.raftServer.CreateDatabase(db, replicationFactor)
err := self.raftServer.CreateDatabase(db)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions src/coordinator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Coordinator interface {
// 5. TODO: Aggregation on the nodes
WriteSeriesData(user common.User, db string, series []*protocol.Series) error
DropDatabase(user common.User, db string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
CreateDatabase(user common.User, db string) error
ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*cluster.Database, error)
DeleteContinuousQuery(user common.User, db string, id uint32) error
Expand All @@ -30,7 +30,7 @@ type Coordinator interface {
}

type ClusterConsensus interface {
CreateDatabase(name string, replicationFactor uint8) error
CreateDatabase(name string) error
DropDatabase(name string) error
CreateContinuousQuery(db string, query string) error
DeleteContinuousQuery(db string, id uint32) error
Expand Down
7 changes: 2 additions & 5 deletions src/coordinator/raft_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,8 @@ func SendCommandToServer(url string, command raft.Command) (interface{}, error)

}

func (s *RaftServer) CreateDatabase(name string, replicationFactor uint8) error {
if replicationFactor == 0 {
replicationFactor = 1
}
command := NewCreateDatabaseCommand(name, replicationFactor)
func (s *RaftServer) CreateDatabase(name string) error {
command := NewCreateDatabaseCommand(name)
_, err := s.doOrProxyCommand(command)
return err
}
Expand Down

0 comments on commit 72bea47

Please sign in to comment.