个性化阅读
专注于IT技术分析

在Python中使用PostgreSQL

点击下载

本文概述

在这篇文章中, 你将发现:

  • PostgreSQL和Python教程
  • 使用SQL数据库
  • 用Python连接到数据库
    • 尝试一些复杂的查询
    • 剖析此功能

诸如PostgreSQL之类的数据库需要用户身份验证才能访问, 并且特定于给定的数据库结构。一旦可以访问数据库, 就可以采用类似的技术。

本模块说明了一些基本技术, 可用于连接和使用关系数据库中的数据, 在本例中为PostgreSQL, 它是几个基于SQL的数据库之一。如果你想了解有关关系数据库管理系统(RDBMS)的系统理论方面的更多信息, 那么”数据库系统和概念”是一个很好的资源。

使用SQL数据库

在Python中使用数据库依赖于能够在SQL中编写数据库查询。实际上, 测试Python脚本的一种好方法是首先在第三方程序(例如DBeaver)中测试SQL命令。

SQLZOO是你当前可以练习这些技能的好地方


基础

在SQL语言中, 数据存储在表中。表通常嵌套在称为”模式”的组中。一个数据库中可能有多个架构。

$$ \ text {数据库} \ supset \ text {Schema} \ supset \ text {Table} $$

示例:假设你有一个数据库, 其中包含数据和机器学习问题的结果, 例如预测不良警察事件。结果存储在”结果”模式中。特定的模型指标位于”评估”表中。要查看所有此类结果:

SQL从results.evaluations中选择*;

Decomposing this query, you have several essential elements: `select`. What task you will be doing, `select`. Next, what do you want to return? If everything uses `*`. If you only want one column `your_colunm_name`. Thus `select *` or `select your_column_name`. Next, where are your data stored: `from`? Specify a `schema.table`.

Putting this together, you have the general form: `select * from schema.table;` Note: it is best to end your SQL statements with a "`;`".

Now suppose you want only the results for a specific model id, parameter, and metric, and parameter, you can use a `where` statement to specify:

```sql
select * from results.evaluations
    where model_id=971
    and metric='false negatives@' and parameter='75.0';

用Python连接到数据库

连接数据库的一种简单方法是使用Python。在这里, 你首先需要一个凭证文件, 例如example_psql.py:

PGHOST=your_database_host
PGDATABASE=your_database_name
PGUSER=your_database_username
PGPASSWORD=your_database_secret_password

接下来, 你将需要导入几个软件包:

import psycopg2
import sys, os
import numpy as np
import pandas as pd
import example_psql as creds
import pandas.io.sql as psql

最后, 数据库连接可以相对简单:

## ****** LOAD PSQL DATABASE ***** ##


# Set up a connection to the postgres server.
conn_string = "host="+ creds.PGHOST +" port="+ "5432" +" dbname="+ creds.PGDATABASE +" user=" + creds.PGUSER \
+" password="+ creds.PGPASSWORD
conn=psycopg2.connect(conn_string)
print("Connected!")

# Create a cursor object
cursor = conn.cursor()


def load_data(schema, table):

    sql_command = "SELECT * FROM {}.{};".format(str(schema), str(table))
    print (sql_command)

    # Load the data
    data = pd.read_sql(sql_command, conn)

    print(data.shape)
    return (data)

为了强调此代码的作用, 它使用你的凭据连接到你的数据库, 并返回你查询的数据, 即schema.table中的select *;作为熊猫数据框。然后, 你可以可视化或分析此数据, 就像将任何数据从CSV加载到Pandas中一样。


一个更复杂的例子

除了简单的连接, 你还可以在单​​独的脚本中使用一系列功能来连接数据库。该脚本位于setup / setup_environment.py中:

#!/usr/bin/env python
import os
import yaml
from sqlalchemy import create_engine
import logging

log = logging.getLogger(__name__)


def get_database():
    try:
        engine = get_connection_from_profile()
        log.info("Connected to PostgreSQL database!")
    except IOError:
        log.exception("Failed to get database connection!")
        return None, 'fail'

    return engine


def get_connection_from_profile(config_file_name="default_profile.yaml"):
    """
    Sets up database connection from config file.
    Input:
    config_file_name: File containing PGHOST, PGUSER, PGPASSWORD, PGDATABASE, PGPORT, which are the
                      credentials for the PostgreSQL database
    """

    with open(config_file_name, 'r') as f:
        vals = yaml.load(f)

    if not ('PGHOST' in vals.keys() and
            'PGUSER' in vals.keys() and
            'PGPASSWORD' in vals.keys() and
            'PGDATABASE' in vals.keys() and
            'PGPORT' in vals.keys()):
        raise Exception('Bad config file: ' + config_file_name)

    return get_engine(vals['PGDATABASE'], vals['PGUSER'], vals['PGHOST'], vals['PGPORT'], vals['PGPASSWORD'])


def get_engine(db, user, host, port, passwd):
    """
    Get SQLalchemy engine using credentials.
    Input:
    db: database name
    user: Username
    host: Hostname of the database server
    port: Port number
    passwd: Password for the database
    """

    url = 'postgresql://{user}:{passwd}@{host}:{port}/{db}'.format(
        user=user, passwd=passwd, host=host, port=port, db=db)
    engine = create_engine(url, pool_size = 50)
    return engine

有了此脚本后, 我们可以使用新脚本连接到数据库:

import sys
import os
import pandas as pd
import subprocess
import argparse
import pdb
import pickle
from setup import setup_environment

# Make PostgreSQL Connection
engine = setup_environment.get_database()
try:
    con = engine.raw_connection()
    con.cursor().execute("SET SCHEMA '{}'".format('your_schema_name'))
except:
    pass

注意:在此代码示例中, 你希望将” your_schema_name”替换为架构的特定名称, 例如” models”架构。

尝试一些复杂的查询:

现在, 你已经建立了数据库连接, 你可以尝试一个复杂的查询, 例如为使用Sci-kit Learn构建的模型结果返回pickle文件(一种存储数据的Python方法)。

def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):

    """
    --------------------------------------------------------
    Get the PICKLE FILE of the best models
    by the specified timestamp and given metric
    RETURNS the PICKLE FILE to a DIRECTORY
    --------------------------------------------------------
    ARGUMENTS:
        timestamp:  models run on or after given timestamp
                    example: '2018-09-02'
        metric:     metric to be optimized
                    example: 'precision@'
        parameter:  parameter value or threshold if any
                    default=None
                    example: '10.0'
        number:     maximum number of desired results
                    default = 25
    --------------------------------------------------------
    """

    if parameter is None:
        query = ("SELECT pickle_blob, run_time  FROM \
                    (SELECT evaluations.model_id, run_time \
                        FROM results.evaluations JOIN results.models \
                        ON evaluations.model_id=models.model_id \
                        WHERE run_time >= '{}' \
                        AND value is not null \
                        AND metric = '{}' \
                        ORDER BY value DESC LIMIT {}) \
                    AS top_models \
                    INNER JOIN results.data \
                    ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, number)

    elif parameter is not None:
        query = ("SELECT pickle_blob, run_time  FROM \
                    (SELECT evaluations.model_id, run_time \
                        FROM results.evaluations JOIN results.models \
                        ON evaluations.model_id=models.model_id \
                        WHERE run_time >= '{}' \
                        AND value is not null \
                        AND metric = '{}' \
                        AND parameter = '{}' \
                        ORDER BY value DESC LIMIT {}) \
                    AS top_models \
                    INNER JOIN results.data \
                    ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)



    df_models = pd.read_sql(query, con=con)
    N = len(df_models['pickle_blob'])

    for file_number in range(0, N):
        pickle_file = pickle.loads(df_models['pickle_blob'].iloc[file_number])
        file_name = df_models['run_time'].apply(lambda x: str(x).replace(' ', 'T')).iloc[file_number]
        if parameter is None:
            full_file_name = "police_eis_results_"+"top_"+metric+"any"+"_"+file_name+".pkl"
        elif parameter is not None:
            full_file_name = "police_eis_results_"+"top_"+metric+parameter+"_"+file_name+".pkl"
        file_path = directory+full_file_name
        pickle.dump(pickle_file, open( file_path, "wb" ) )

    return None

剖析此功能:

除去一些细节, 你便具有以下一般流程

def get_pickle_best_models(timestamp, metric, parameter=None, number=25, directory="results/"):

    if parameter is None:
        # Do Query WITHOUT Parameter

    elif parameter is not None:
        # Do Query WITH Parameter

    # Run Query and Store Results as Pandas Data Frame
    df_models = pd.read_sql(query, con=con)

    # Loop Over Dataframe to Save Pickle Files for Different Model Run Times
    # (Code Here)

编写Python查询

如示例所示, 你将SQL查询编写为字符串:

query = ("Long SQL Query as a String")

在这里, 你可以使用.format(var)方法将变量插入查询中。这样, 你可以根据传递给函数的各种参数来系统地更改查询。上述查询之一的完整格式如下:

query = ("SELECT pickle_blob, run_time  FROM \
              (SELECT evaluations.model_id, run_time \
              FROM results.evaluations JOIN results.models \
              ON evaluations.model_id=models.model_id \
              WHERE run_time >= '{}' \
              AND value is not null \
              AND metric = '{}' \
              AND parameter = '{}' \
              ORDER BY value DESC LIMIT {}) \
          AS top_models \
          INNER JOIN results.data \
          ON top_models.model_id=data.model_id ; " ).format(timestamp, metric, parameter, number)

注意查询的格式。在python代码中, 你需要使用\中断查询行, 并使用格式函数'{}’。format(x)将变量插入脚本。格式括号是否需要用单引号引起来, 具体取决于你是否尝试将字符串值或数字值传递给查询。从Python操作中抽象查询, 然后剩下一个SQL查询。下面, 你可以尝试一些示例格式的示例值, 以表示时间戳, 度量, 参数和结果数的Python格式:

SELECT pickle_blob, run_time  FROM
          (SELECT evaluations.model_id, run_time
              FROM results.evaluations JOIN results.models
              ON evaluations.model_id=models.model_id
              WHERE run_time >= '2018-09-02'
              AND value is not null
              AND metric = 'precision@'
              AND parameter = '10.0'
              ORDER BY value DESC LIMIT 10)
          AS top_models
          INNER JOIN results.data
          ON top_models.model_id=data.model_id ;

你还回什么

上面的查询是一个更复杂的示例, 但很有价值。首先, 你要返回什么:

  • pickle_blob
  • 运行

*例如。 SELECT pickle_blob, 运行时

这些数据从哪里来?

  • 通过子查询对results.data表进行联接, 对result.evaluations和results.models进行联接。

在上面的示例中, 你需要模型的pickle_blob和模型的run_time。你还需要根据某些条件获得最佳模型。但是, 所有这些属性都位于结果模式中的离散表中。因此, 你必须对这些表进行几次连接。

从技术上讲, 你正在执行所谓的子查询。请注意主查询SELECT pickle_blob的第一行, run_time FROM, 其中FROM引用括号中的子查询(SELECT … LIMIT 10), 你使用别名AS top_models对其进行引用。

在这里, top_models是一个任意名称, 仅在查询中定义和使用。你可以将top_models替换为x或y或my_best_model_ever。该名称无所谓, 只要它与其他表名和保留的SQL语句(例如select)是离散的即可。它仅引用我们的子查询的结果。

本质上, 子查询返回联接的result.models和results.evaluations表的选择性版本。你可以使用ON的top_models.model_id = data.model_id对top_models子查询使用top_models子查询来进行INNER JOIN。

你的子查询是什么?

SELECT evaluations.model_id, run_time
      FROM results.evaluations JOIN results.models
      ON evaluations.model_id=models.model_id
      WHERE run_time >= '2018-09-02'
      AND value is not null
      AND metric = 'precision@'
      AND parameter = '10.0'
      ORDER BY value DESC LIMIT 10

子查询还对在model_id上联接的result.evaluations和results.models表进行联接。它从联接的表中返回model_id和run_time。

条件:

在这里, 我们只需要符合特定条件的模型:

  • 时间戳记WHERE运行时间> =’2018-09-02′
  • value列上的结果为非null:AND值不为null
  • 指标AND指标=’precision @’
  • 参数AND参数= ’10 .0′
  • 前n个结果ORDER BY值DESC LIMIT 10

如你所见, 尽管此查询包含许多元素, 但是你可以使用Python操纵查询。最后, 你可以利用数据库的功能和Python的功能来构建动态代码。


总结

做得好!你现在知道了Python中的数据库, 而PostgreSQL数据库中的示例很少。此外, 你可以自己进行自我学习并找到学习资源。如果你有兴趣一般地了解有关数据库主题的更多信息, 请阅读srcmini的教程”在SQL中执行Python / R”。

赞(0)
未经允许不得转载:srcmini » 在Python中使用PostgreSQL

评论 抢沙发

评论前必须登录!