随着数字化转型的不断深入,数据在企业和个人生活中扮演着日益重要的角色。
在企业方面,数据可以帮助企业更好地了解客户需求、市场趋势和业务表现,从而更好地制定战略和决策。此外,数据还可以帮助企业优化业务流程、提高效率和降低成本。
在个人生活方面,数据也扮演着越来越重要的角色。例如,我们使用智能手机、智能手表等设备时,这些设备会收集我们的健康数据、位置数据等信息,帮助我们更好地管理自己的生活和健康。此外,社交媒体、电子商务等应用程序也会收集我们的数据,以便更好地为我们提供个性化的服务和推荐。
ETL(抽取、转换和加载)是一种数据处理方法,它对于企业和个人来说都具有重要意义。
ETL是一种数据处理方法,它由以下三个步骤组成:
EtlT它拆分了原有ETL和ELT的结构,并力求实时和批量统一在一起处理以满足实时数据仓库和AI应用的需求。
EtlT由以下四个步骤组成:
在EtLT架构下,使用者人群也有了明确的分工:
通常情况下,在大数据的处理基本倾向于前面部分的Etl 环节,而后面的T 环节的数据处理倾向于SQL统计分析,也就是俗称的最后一公里。
.neter 人员大数据处理框架终于来了。
ETL.NET是一个完全用.NET编写的开源框架,可用于多平台使用,并可直接集成到任何.NET应用程序中。可以毫不费力地实现快速的、低内存的和易于维护的数据处理,即使是百万数据也能轻松应对。所有用于规范化、更新插入、查找或连接的工具都大大减少了任何导入和转换目的的工作量。处理跟踪的所有内容,错误跟踪都是为开发人员自动完成的。
ETL.NET是一组.NET库,允许将常规商业智能ETL功能嵌入到任何.NET应用程序中。
缺少什么?没有问题!任何类型的扩展都可以在瞬间实现,以创建新类型的数据源/目标或任何类型的运算符。ETL.NET就是为此而设计的。
ETL.NET支持从多种数据类型和数据源中读取可写入数据,可以满足各种使用场景。
2.1、Read or write any file type and any data source.(读取或写入任何文件类型和任何数据源)
2.2、Read or write files on any source.(在任何源上读取或写入文件)
ETL.NET是一系列的类库,可以方便的通过Nuget包安装集成到任何.NET应用程序即可使用。
实验目标:提取特定格式.zip文件中所有的.csv文件数据,并处理数据写入到指定数据库表中。
dotnet new console -o ConsoleAppEtl --no-https -f net8.0
PS C:\Users\Jeffrey.Chai> dotnet new console -h
控制台应用 (C#)
作者: Microsoft
描述: 用于创建可在 Windows、Linux 和 macOS 上 .NET 上运行的命令行应用程序的项目
用法:
dotnet new console [options] [模板选项]
选项:
-n, --name <name> 正在创建的输出名称。如未指定名称,则使用输出目录的名称。
-o, --output <output> 要放置生成的输出的位置。
--dry-run 如果运行给定命令行将导致模板创建,则显示将发生情况的摘要。
--force 强制生成内容 (即使它会更改现有文件)。
--no-update-check 在实例化模板时,禁用对模板包更新的检查。
--project <project> 应用于上下文评估的项目。
-lang, --language <C#> 指定要实例化的模板语言。
--type <project> 指定要实例化的模板类型。
模板选项:
-f, --framework <net6.0|net7.0|net8.0> 项目的目标框架。
类型: choice
net8.0 目标 net8.0
net7.0 目标 net7.0
net6.0 目标 net6.0
默认: net8.0
--langVersion <langVersion> 在创建的项目文件中设置 LangVersion 属性
类型: text
--no-restore 如果指定,则在创建时跳过项目的自动还原。
类型: bool
默认: false
--use-program-main 是否生成显式程序类和主方法,而不是顶级语句。
类型: bool
默认: false
--aot 是否启用将项目以 native AOT 发布。
类型: bool
默认: false
要查看有关其他模板语言(F#, VB)的帮助,请使用 --language 选项:
dotnet new console -h --language F#
该项目中使用到的nuget包信息如下:
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Paillave.EtlNet.Core" Version="2.0.47" />
<PackageReference Include="Paillave.EtlNet.FileSystem" Version="2.0.47" />
<PackageReference Include="Paillave.EtlNet.SqlServer" Version="2.0.47" />
<PackageReference Include="Paillave.EtlNet.TextFile" Version="2.0.47" />
<PackageReference Include="Paillave.EtlNet.Zip" Version="2.0.47" />
</ItemGroup>
<ItemGroup>
<Folder Include="Files\Input\" />
</ItemGroup>
</Project>
在编写demo代码之前,先在项目中添加一个文件夹:
下面是一个ETL.NET处理数据的场景:
依据上面的处理流程,在Program.cs中编写如下代码:
using Paillave.Etl.FileSystem;
using Paillave.Etl.Zip;
using Paillave.Etl.TextFile;
using Paillave.Etl.SqlServer;
using System.Data.SqlClient;
using Paillave.Etl.Core;
namespace ConsoleAppEtl;
internal class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello, ETL.NET! https://paillave.github.io/");
var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
processRunner.DebugNodeStream += (sender, e) => {
/* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG ex: e.NodeName == "parse file" */
Console.WriteLine($"NodeName = {e.NodeName},Count = {e.Count},HasError = {e.HasError},ToSequenceId = {e.ToSequenceId},FromSequenceId = {e.FromSequenceId}");
};
string inputFilesPath = @"C:\Users\Jeffrey.Chai\Desktop\test\ConsoleAppEtl\Files\Input";
string connStr = "Data Source=.;User Id=sa;Password=123@etl.net;Initial Catalog=EtlTest;Encrypt=True;TrustServerCertificate=True;Pooling=true;Min Pool Size=1;Max Pool Size=10;";
using (var cnx = new SqlConnection(connStr))
{
cnx.Open();
var executionOptions = new ExecutionOptions<string>
{
Resolver = new SimpleDependencyResolver().Register(cnx)
};
var res = await processRunner.ExecuteAsync(config: inputFilesPath, options: executionOptions);
if (res.Failed && res.ErrorTraceEvent != null)
{
Console.WriteLine($"errors:{res.ErrorTraceEvent.Content.Type},{res.ErrorTraceEvent.Content.Level},{res.ErrorTraceEvent.Content.Message}");
}
Console.WriteLine(res.Failed ? "Failed" : "Succeeded");
}
}
/// <summary>
/// 定义处理流程
/// </summary>
/// <param name="contextStream"></param>
private static void DefineProcess(ISingleStream<string> contextStream)
{
contextStream
.CrossApplyFolderFiles(name: "列出所有的 .zip 文件", pattern: "*.zip", recursive: true)
.CrossApplyZipFiles(name: "从 .zip 解压出 .csv 文件", pattern: "*.csv")
.CrossApplyTextFile(name: "解析 .csv 文件",
args: FlatFileDefinition.Create(item => new Person
{
Email = item.ToColumn("email"),
FirstName = item.ToColumn("first name"),
LastName = item.ToColumn("last name"),
DateOfBirth = item.ToDateColumn("date of birth", "yyyy-MM-dd"),
Reputation = item.ToNumberColumn<int?>("reputation", ".")
}).IsColumnSeparated(','))
.Distinct("email 去重", item => item.Email)
.SqlServerSave("写入 mssql2022 数据库", o => o
.ToTable("dbo.Person")
.SeekOn(p => p.Email)
.DoNotSave(p => p.Id))
.Do("输出到控制台", item => Console.WriteLine($"fullname:{item.FirstName}-{item.LastName},email:{item.Email}"));
}
/// <summary>
/// 数据库表实体模型
/// </summary>
private class Person
{
public int Id { get; set; }
public string Email { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public DateTime DateOfBirth { get; set; }
public int? Reputation { get; set; }
}
}
4.1、创建数据库
创建数据库命名为EtlTest,此处我就使用docker容器化部署一个mssql2022,执行如下命令:
# 搜索镜像
docker search mssql
# 拉取镜像
docker pull mcr.microsoft.com/mssql/server:2022-latest
# 运行容器 mssql2022
docker run -d --name mssql2022 --hostname mssql2022 \
-p 1433:1433 \
-e "ACCEPT_EULA=Y" \
-e "MSSQL_SA_PASSWORD=123@etl.net" \
-e "TZ=Asia/Shanghai" \
-e "MSSQL_PID=Developer" \
-e "MSSQL_COLLATION=Chinese_PRC_BIN" \
mcr.microsoft.com/mssql/server:2022-latest
4.2、测试数据库连接
数据库容器运行成功后,使用数据库工具dbeaver-ce连接测试,看数据库是否能够正常访问,如果正常访问即显示如下信息:
4.3、创建数据库表
依据上面的Person.cs实体模型,创建数据表Person,执行如下sql脚本:
-- mssql
CREATE TABLE "Person" (
"Id" INT identity(1,1) PRIMARY KEY NOT NULL,
"Email" VARCHAR(32) NOT NULL,
"FirstName" VARCHAR(32) NOT NULL,
"LastName" VARCHAR(32) NOT NULL,
"DateOfBirth" DateTime NOT NULL,
"Reputation" INT NULL
);
此时该表的数据为空,改表主要用于从特定文件.csv提取数据,并写入表中保存。
这里我们准备的源数据是后缀为.csv类型的文件,并且压缩为.zip格式的文件夹。
Person.zip文件存放入项目中Files\Input文件夹里面,该压缩文件中,存放了3个.csv的文件,用于模拟分批到处数据的情况。下面3个文件中,分别在每个文件中放入10条数据。
.csv文件的内容格式类似如下:
"id","email","first name","last name","date of birth","reputation"
1,"123@qq.com",hu,pingan,"2023-12-12",10
2,"321@qq.com",ma,liuliu,"2023-11-12",5
3,"132@qq.com",zhan,xiaosan,"2023-12-15",2
说明:Person.zip文件可以存放多个结构相同的.csv文件,这里为了方便测试,模拟3个文件即可。
经过上面的环节,我们已经准备好了项目测试的基础条件。接下来我们就运行项目,启动看下,能否把Person.zip文件中的3个.csv文件给解析出来,并提取到里面的数据写入到提前准备好的数据库表中。
执行完成,控制台输出信息如下:
Hello, ETL.NET! https://paillave.github.io/
fullname:项-栋,email:H6zQXwZ@gmail.com
fullname:谈-震,email:YbjDWRYLT@gmail.com
fullname:滑-超浩,email:NOtywnmL@gmail.com
fullname:慕容-伦,email:iqGQtl8Vf@gmail.com
fullname:司空-群,email:MvDxeaOs@gmail.com
fullname:壤驷-泰,email:OPcQBLZ@gmail.com
fullname:国-才,email:bMIhd6K@gmail.com
fullname:尤-建,email:OCrdarkYy@gmail.com
fullname:公西-毅,email:pYhCZZQ@gmail.com
fullname:邢-飞,email:8FDeFQ@gmail.com
fullname:昝-龙,email:Ybr1jWLv@gmail.com
fullname:麻-军,email:6hPwNdi@gmail.com
fullname:崔-朗,email:gdg2ghvZC@gmail.com
fullname:籍-清,email:y2fSVPoJ@gmail.com
fullname:缑-克,email:SOJ72Ih@gmail.com
fullname:赫连-广,email:WC4NLHXr@gmail.com
fullname:印-信,email:BsGL3fcV@gmail.com
fullname:扈-强,email:BO3P744@gmail.com
fullname:漆雕-波,email:LZA31yMtR@gmail.com
fullname:堵-山,email:kRIjZjmmA@gmail.com
fullname:左-民,email:mg02bO@gmail.com
fullname:乔-言若,email:tp4Nqq@gmail.com
fullname:郁-江,email:LwyEXTrAV@gmail.com
fullname:翟-新利,email:dgWodVMqn@gmail.com
fullname:褚-才,email:AFx57A8@gmail.com
fullname:郁-奇,email:KitTNOc5@gmail.com
fullname:寇-平,email:bwtG8Ipfr@gmail.com
fullname:须-厚,email:DLgJR4s@gmail.com
fullname:宓-奇,email:2wFB27f@gmail.com
fullname:窦-超浩,email:jbbmcenx@gmail.com
NodeName = 列出所有的 .zip 文件,Count = 1,HasError = False,ToSequenceId = 1,FromSequenceId = 1
NodeName = 从 .zip 解压出 .csv 文件,Count = 3,HasError = False,ToSequenceId = 5,FromSequenceId = 3
NodeName = 解析 .csv 文件,Count = 30,HasError = False,ToSequenceId = 123,FromSequenceId = 7
NodeName = email 去重,Count = 30,HasError = False,ToSequenceId = 124,FromSequenceId = 8
NodeName = 写入 mssql2022 数据库,Count = 30,HasError = False,ToSequenceId = 125,FromSequenceId = 9
NodeName = 输出到控制台,Count = 30,HasError = False,ToSequenceId = 126,FromSequenceId = 10
Succeeded
此时我再次查看数据库表信息,是否有把.csv文件的数据提取处理保存到指定的数据库表中。
SELECT Id, Email, FirstName, LastName, DateOfBirth, Reputation
FROM EtlTest.dbo.Person;
数据库表Person查询信息显示如下:
结论:Person.zip文件中的3个.csv文件合计数据量30行,存入数据库表EtlTest.dbo.Person的数据符合预期。