Python 数据持久化 - Cassandra 驱动程序

Cassandra 是另一种流行的 NoSQL 数据库。 高可扩展性、一致性和容错性——这些是 Cassandra 的一些重要特性。 这是列存储 数据库。 数据存储在许多商用服务器上。 因此,数据的可用性很高。

Cassandra 是 Apache 软件基金会的产品。 数据以分布式方式存储在多个节点上。 每个节点都是由 keyspace (键空间)组成的单个服务器。 Cassandra 数据库的基本构建块是 keyspace(键空间),它可以被认为类似于数据库。

Cassandra 一个节点中的数据通过对等节点网络复制到其他节点中。 这使 Cassandra 成为一个万无一失的数据库。 该网络称为数据中心。 多个数据中心可以互连形成集群。 通过在创建 keyspace 时设置复制策略和复制因子来配置复制的性质。

一个 keyspace 可能有多个列族——就像一个数据库可能包含多个表一样。 Cassandra 的 keyspace 没有预定义的模式。 Cassandra 表中的每一行可能包含名称不同且编号可变的列。

Cassandra 表

Cassandra 软件还有两个版本:社区版和企业版。 最新的 Cassandra 企业版可在 https://cassandra.apache.org/download/ 下载。 社区版位于 https://academy.datastax.com/planet-cassandra/cassandra

Cassandra 有自己的查询语言,称为 Cassandra 查询语言 (CQL)。 CQL 查询可以从 CQLASH shell 内部执行——类似于 MySQL 或 SQLite shell。 CQL 语法看起来类似于标准 SQL。

Datastax community edition,也自带一个Develcenter IDE,如下图所示 −

Develcenter IDE

用于处理 Cassandra 数据库的 Python 模块称为 Cassandra Driver。 它也是由 Apache 基金会开发的。 该模块包含一个 ORM API,以及一个本质上类似于关系数据库的 DB-API 的核心 API。

使用 pip 实用程序 可以轻松安装 Cassandra 驱动程序。

pip3 install cassandra-driver

与 Cassandra 数据库的交互,是通过 Cluster 对象完成的。 Cassandra.cluster 模块定义了 Cluster 类。 我们首先需要声明 Cluster 对象。

from cassandra.cluster import Cluster
clstr=Cluster()

所有事务,如插入/更新等,都是通过使用 keyspace 启动会话来执行的。

session=clstr.connect()

要创建一个新的键空间,使用会话对象的execute()方法。 execute() 方法接受一个字符串参数,该参数必须是一个查询字符串。 CQL 具有如下 CREATE KEYSPACE 语句。 完整代码如下 −

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect()
session.execute(“create keyspace mykeyspace with replication={
   'class': 'SimpleStrategy', 'replication_factor' : 3
};”

这里,SimpleStrategyreplication strategy 的值,replication factor 设置为 3。如前所述,键空间包含一个或多个表。 每个表的特点是它的数据类型。 Python 数据类型根据下表自动解析为对应的 CQL 数据类型 −

Python 类型 CQL 类型
None NULL
Bool Boolean
Float float, double
int, long int, bigint, varint, smallint, tinyint, counter
decimal.Decimal Decimal
str, Unicode ascii, varchar, text
buffer, bytearray Blob
Date Date
Datetime Timestamp
Time Time
list, tuple, generator List
set, frozenset Set
dict, OrderedDict Map
uuid.UUID timeuuid, uuid

要创建一个表,使用 session 会话对象来执行创建表的CQL 查询。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
qry= '''
create table students (
   studentID int,
   name text,
   age int,
   marks int,
   primary key(studentID)
);'''
session.execute(qry)

这样创建的键空间可以进一步用于插入行。 INSERT 查询的 CQL 版本类似于 SQL Insert 语句。 下面的代码在 students 表中插入一行。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
session.execute("insert into students (studentID, name, age, marks) values 
   (1, 'Juhi',20, 200);"

如您所料,SELECT 语句也用于 Cassandra。 如果 execute() 方法包含 SELECT 查询字符串,它会返回一个可以使用循环遍历的结果集对象。

from cassandra.cluster import Cluster
clstr=Cluster()
session=clstr.connect('mykeyspace')
rows=session.execute("select * from students;")
for row in rows:
print (StudentID: {} Name:{} Age:{} price:{} Marks:{}'
   .format(row[0],row[1], row[2], row[3]))

Cassandra 的 SELECT 查询支持使用 WHERE 子句对要获取的结果集应用过滤器。 可以识别像 <, > == 等传统逻辑运算符。 要检索 students 表中年龄>20 的行,execute() 方法中的查询字符串应如下所示 −

rows=session.execute("select * from students WHERE age>20 allow filtering;")

请注意,使用ALLOW FILTERING。 该语句的 ALLOW FILTERING 部分允许明确允许(某些)需要过滤的查询。

Cassandra 驱动程序 API 在其 cassendra.query 模块中定义了以下语句类型类。


SimpleStatement

包含在查询字符串中的简单、未经准备的 CQL 查询。 以上所有示例都是 SimpleStatement 的示例。


BatchStatement

多个查询(如 INSERT、UPDATE 和 DELETE)放在一个批处理中并一次执行。 每行首先转换为 SimpleStatement,然后批量添加。

让我们将要添加的行以元组列表的形式添加到 Students 表中,如下所示 −

studentlist=[(1,'Juhi',20,100), ('2,'dilip',20, 110),(3,'jeevan',24,145)]

要使用 BathStatement 添加以上行,请运行以下脚本 −

from cassandra.query import SimpleStatement, BatchStatement
batch=BatchStatement()
for student in studentlist:
   batch.add(SimpleStatement("INSERT INTO students 
      (studentID, name, age, marks) VALUES
      (%s, %s, %s %s)"), (student[0], student[1],student[2], student[3]))
session.execute(batch)

PreparedStatement

Prepared statement 就像 DB-API 中的参数化查询。 它的查询字符串由 Cassandra 保存以备后用。 Session.prepare() 方法返回一个 PreparedStatement 实例。

对于我们的students表,一个用于INSERT查询的PreparedStatement如下 −

stmt=session.prepare("INSERT INTO students (studentID, name, age, marks) VALUES (?,?,?)")

后续只需要将参数的值传给绑定即可。 例如 −

qry=stmt.bind([1,'Ram', 23,175])

最后,执行上面的绑定语句。

session.execute(qry)

这减少了网络流量和 CPU 利用率,因为 Cassandra 不必每次都重新解析查询。