随着数字化转型的不断深入,数据在企业和个人生活中扮演着日益重要的角色。
在企业方面,数据可以帮助企业更好地了解客户需求、市场趋势和业务表现,从而更好地制定战略和决策。此外,数据还可以帮助企业优化业务流程、提高效率和降低成本。
在个人生活方面,数据也扮演着越来越重要的角色。例如,我们使用智能手机、智能手表等设备时,这些设备会收集我们的健康数据、位置数据等信息,帮助我们更好地管理自己的生活和健康。此外,社交媒体、电子商务等应用程序也会收集我们的数据,以便更好地为我们提供个性化的服务和推荐。
ETL(抽取、转换和加载)是一种数据处理方法,它对于企业和个人来说都具有重要意义。
ETL是一种数据处理方法,它由以下三个步骤组成:
EtlT它拆分了原有ETL和ELT的结构,并力求实时和批量统一在一起处理以满足实时数据仓库和AI应用的需求。
EtlT由以下四个步骤组成:
在EtLT架构下,使用者人群也有了明确的分工:
通常情况下,在大数据的处理基本倾向于前面部分的Etl 环节,而后面的T 环节的数据处理倾向于SQL统计分析,也就是俗称的最后一公里。
.neter 人员大数据处理框架终于来了。
ETL.NET是一个完全用.NET编写的开源框架,可用于多平台使用,并可直接集成到任何.NET应用程序中。可以毫不费力地实现快速的、低内存的和易于维护的数据处理,即使是百万数据也能轻松应对。所有用于规范化、更新插入、查找或连接的工具都大大减少了任何导入和转换目的的工作量。处理跟踪的所有内容,错误跟踪都是为开发人员自动完成的。
ETL.NET是一组.NET库,允许将常规商业智能ETL功能嵌入到任何.NET应用程序中。
缺少什么?没有问题!任何类型的扩展都可以在瞬间实现,以创建新类型的数据源/目标或任何类型的运算符。ETL.NET就是为此而设计的。
ETL.NET支持从多种数据类型和数据源中读取可写入数据,可以满足各种使用场景。
2.1、Read or write any file type and any data source.(读取或写入任何文件类型和任何数据源)
2.2、Read or write files on any source.(在任何源上读取或写入文件)
ETL.NET是一系列的类库,可以方便的通过Nuget包安装集成到任何.NET应用程序即可使用。
实验目标:提取特定格式.zip文件中所有的.csv文件数据,并处理数据写入到指定数据库表中。
- dotnet new console -o ConsoleAppEtl --no-https -f net8.0
-
- PS C:\Users\Jeffrey.Chai> dotnet new console -h
- 控制台应用 (C#)
- 作者: Microsoft
- 描述: 用于创建可在 Windows、Linux 和 macOS 上 .NET 上运行的命令行应用程序的项目
-
- 用法:
- dotnet new console [options] [模板选项]
-
- 选项:
- -n, --name <name> 正在创建的输出名称。如未指定名称,则使用输出目录的名称。
- -o, --output <output> 要放置生成的输出的位置。
- --dry-run 如果运行给定命令行将导致模板创建,则显示将发生情况的摘要。
- --force 强制生成内容 (即使它会更改现有文件)。
- --no-update-check 在实例化模板时,禁用对模板包更新的检查。
- --project <project> 应用于上下文评估的项目。
- -lang, --language <C#> 指定要实例化的模板语言。
- --type <project> 指定要实例化的模板类型。
-
- 模板选项:
- -f, --framework <net6.0|net7.0|net8.0> 项目的目标框架。
- 类型: choice
- net8.0 目标 net8.0
- net7.0 目标 net7.0
- net6.0 目标 net6.0
- 默认: net8.0
- --langVersion <langVersion> 在创建的项目文件中设置 LangVersion 属性
- 类型: text
- --no-restore 如果指定,则在创建时跳过项目的自动还原。
- 类型: bool
- 默认: false
- --use-program-main 是否生成显式程序类和主方法,而不是顶级语句。
- 类型: bool
- 默认: false
- --aot 是否启用将项目以 native AOT 发布。
- 类型: bool
- 默认: false
-
- 要查看有关其他模板语言(F#, VB)的帮助,请使用 --language 选项:
- dotnet new console -h --language F#
-
该项目中使用到的nuget包信息如下:
- <Project Sdk="Microsoft.NET.Sdk">
-
- <PropertyGroup>
- <OutputType>Exe</OutputType>
- <TargetFramework>net8.0</TargetFramework>
- <ImplicitUsings>enable</ImplicitUsings>
- <Nullable>enable</Nullable>
- </PropertyGroup>
-
- <ItemGroup>
- <PackageReference Include="Paillave.EtlNet.Core" Version="2.0.47" />
- <PackageReference Include="Paillave.EtlNet.FileSystem" Version="2.0.47" />
- <PackageReference Include="Paillave.EtlNet.SqlServer" Version="2.0.47" />
- <PackageReference Include="Paillave.EtlNet.TextFile" Version="2.0.47" />
- <PackageReference Include="Paillave.EtlNet.Zip" Version="2.0.47" />
- </ItemGroup>
-
- <ItemGroup>
- <Folder Include="Files\Input\" />
- </ItemGroup>
-
- </Project>
-
在编写demo代码之前,先在项目中添加一个文件夹:
下面是一个ETL.NET处理数据的场景:
依据上面的处理流程,在Program.cs中编写如下代码:
- using Paillave.Etl.FileSystem;
- using Paillave.Etl.Zip;
- using Paillave.Etl.TextFile;
- using Paillave.Etl.SqlServer;
- using System.Data.SqlClient;
- using Paillave.Etl.Core;
-
- namespace ConsoleAppEtl;
-
- internal class Program
- {
- static async Task Main(string[] args)
- {
- Console.WriteLine("Hello, ETL.NET! https://paillave.github.io/");
-
- var processRunner = StreamProcessRunner.Create<string>(DefineProcess);
- processRunner.DebugNodeStream += (sender, e) => {
- /* PLACE A CONDITIONAL BREAKPOINT HERE FOR DEBUG ex: e.NodeName == "parse file" */
- Console.WriteLine($"NodeName = {e.NodeName},Count = {e.Count},HasError = {e.HasError},ToSequenceId = {e.ToSequenceId},FromSequenceId = {e.FromSequenceId}");
- };
-
- string inputFilesPath = @"C:\Users\Jeffrey.Chai\Desktop\test\ConsoleAppEtl\Files\Input";
- string connStr = "Data Source=.;User Id=sa;Password=123@etl.net;Initial Catalog=EtlTest;Encrypt=True;TrustServerCertificate=True;Pooling=true;Min Pool Size=1;Max Pool Size=10;";
- using (var cnx = new SqlConnection(connStr))
- {
- cnx.Open();
- var executionOptions = new ExecutionOptions<string>
- {
- Resolver = new SimpleDependencyResolver().Register(cnx)
- };
-
- var res = await processRunner.ExecuteAsync(config: inputFilesPath, options: executionOptions);
- if (res.Failed && res.ErrorTraceEvent != null)
- {
- Console.WriteLine($"errors:{res.ErrorTraceEvent.Content.Type},{res.ErrorTraceEvent.Content.Level},{res.ErrorTraceEvent.Content.Message}");
- }
- Console.WriteLine(res.Failed ? "Failed" : "Succeeded");
- }
- }
-
- /// <summary>
- /// 定义处理流程
- /// </summary>
- /// <param name="contextStream"></param>
- private static void DefineProcess(ISingleStream<string> contextStream)
- {
- contextStream
- .CrossApplyFolderFiles(name: "列出所有的 .zip 文件", pattern: "*.zip", recursive: true)
- .CrossApplyZipFiles(name: "从 .zip 解压出 .csv 文件", pattern: "*.csv")
- .CrossApplyTextFile(name: "解析 .csv 文件",
- args: FlatFileDefinition.Create(item => new Person
- {
- Email = item.ToColumn("email"),
- FirstName = item.ToColumn("first name"),
- LastName = item.ToColumn("last name"),
- DateOfBirth = item.ToDateColumn("date of birth", "yyyy-MM-dd"),
- Reputation = item.ToNumberColumn<int?>("reputation", ".")
- }).IsColumnSeparated(','))
- .Distinct("email 去重", item => item.Email)
- .SqlServerSave("写入 mssql2022 数据库", o => o
- .ToTable("dbo.Person")
- .SeekOn(p => p.Email)
- .DoNotSave(p => p.Id))
- .Do("输出到控制台", item => Console.WriteLine($"fullname:{item.FirstName}-{item.LastName},email:{item.Email}"));
- }
-
- /// <summary>
- /// 数据库表实体模型
- /// </summary>
- private class Person
- {
- public int Id { get; set; }
- public string Email { get; set; }
- public string FirstName { get; set; }
- public string LastName { get; set; }
- public DateTime DateOfBirth { get; set; }
- public int? Reputation { get; set; }
- }
- }
-
4.1、创建数据库
创建数据库命名为EtlTest,此处我就使用docker容器化部署一个mssql2022,执行如下命令:
- # 搜索镜像
- docker search mssql
- # 拉取镜像
- docker pull mcr.microsoft.com/mssql/server:2022-latest
-
- # 运行容器 mssql2022
- docker run -d --name mssql2022 --hostname mssql2022 \
- -p 1433:1433 \
- -e "ACCEPT_EULA=Y" \
- -e "MSSQL_SA_PASSWORD=123@etl.net" \
- -e "TZ=Asia/Shanghai" \
- -e "MSSQL_PID=Developer" \
- -e "MSSQL_COLLATION=Chinese_PRC_BIN" \
- mcr.microsoft.com/mssql/server:2022-latest
-
4.2、测试数据库连接
数据库容器运行成功后,使用数据库工具dbeaver-ce连接测试,看数据库是否能够正常访问,如果正常访问即显示如下信息:
4.3、创建数据库表
依据上面的Person.cs实体模型,创建数据表Person,执行如下sql脚本:
- -- mssql
- CREATE TABLE "Person" (
- "Id" INT identity(1,1) PRIMARY KEY NOT NULL,
- "Email" VARCHAR(32) NOT NULL,
- "FirstName" VARCHAR(32) NOT NULL,
- "LastName" VARCHAR(32) NOT NULL,
- "DateOfBirth" DateTime NOT NULL,
- "Reputation" INT NULL
- );
-
此时该表的数据为空,改表主要用于从特定文件.csv提取数据,并写入表中保存。
这里我们准备的源数据是后缀为.csv类型的文件,并且压缩为.zip格式的文件夹。
Person.zip文件存放入项目中Files\Input文件夹里面,该压缩文件中,存放了3个.csv的文件,用于模拟分批到处数据的情况。下面3个文件中,分别在每个文件中放入10条数据。
.csv文件的内容格式类似如下:
- "id","email","first name","last name","date of birth","reputation"
- 1,"123@qq.com",hu,pingan,"2023-12-12",10
- 2,"321@qq.com",ma,liuliu,"2023-11-12",5
- 3,"132@qq.com",zhan,xiaosan,"2023-12-15",2
-
说明:Person.zip文件可以存放多个结构相同的.csv文件,这里为了方便测试,模拟3个文件即可。
经过上面的环节,我们已经准备好了项目测试的基础条件。接下来我们就运行项目,启动看下,能否把Person.zip文件中的3个.csv文件给解析出来,并提取到里面的数据写入到提前准备好的数据库表中。
执行完成,控制台输出信息如下:
- Hello, ETL.NET! https://paillave.github.io/
- fullname:项-栋,email:H6zQXwZ@gmail.com
- fullname:谈-震,email:YbjDWRYLT@gmail.com
- fullname:滑-超浩,email:NOtywnmL@gmail.com
- fullname:慕容-伦,email:iqGQtl8Vf@gmail.com
- fullname:司空-群,email:MvDxeaOs@gmail.com
- fullname:壤驷-泰,email:OPcQBLZ@gmail.com
- fullname:国-才,email:bMIhd6K@gmail.com
- fullname:尤-建,email:OCrdarkYy@gmail.com
- fullname:公西-毅,email:pYhCZZQ@gmail.com
- fullname:邢-飞,email:8FDeFQ@gmail.com
- fullname:昝-龙,email:Ybr1jWLv@gmail.com
- fullname:麻-军,email:6hPwNdi@gmail.com
- fullname:崔-朗,email:gdg2ghvZC@gmail.com
- fullname:籍-清,email:y2fSVPoJ@gmail.com
- fullname:缑-克,email:SOJ72Ih@gmail.com
- fullname:赫连-广,email:WC4NLHXr@gmail.com
- fullname:印-信,email:BsGL3fcV@gmail.com
- fullname:扈-强,email:BO3P744@gmail.com
- fullname:漆雕-波,email:LZA31yMtR@gmail.com
- fullname:堵-山,email:kRIjZjmmA@gmail.com
- fullname:左-民,email:mg02bO@gmail.com
- fullname:乔-言若,email:tp4Nqq@gmail.com
- fullname:郁-江,email:LwyEXTrAV@gmail.com
- fullname:翟-新利,email:dgWodVMqn@gmail.com
- fullname:褚-才,email:AFx57A8@gmail.com
- fullname:郁-奇,email:KitTNOc5@gmail.com
- fullname:寇-平,email:bwtG8Ipfr@gmail.com
- fullname:须-厚,email:DLgJR4s@gmail.com
- fullname:宓-奇,email:2wFB27f@gmail.com
- fullname:窦-超浩,email:jbbmcenx@gmail.com
- NodeName = 列出所有的 .zip 文件,Count = 1,HasError = False,ToSequenceId = 1,FromSequenceId = 1
- NodeName = 从 .zip 解压出 .csv 文件,Count = 3,HasError = False,ToSequenceId = 5,FromSequenceId = 3
- NodeName = 解析 .csv 文件,Count = 30,HasError = False,ToSequenceId = 123,FromSequenceId = 7
- NodeName = email 去重,Count = 30,HasError = False,ToSequenceId = 124,FromSequenceId = 8
- NodeName = 写入 mssql2022 数据库,Count = 30,HasError = False,ToSequenceId = 125,FromSequenceId = 9
- NodeName = 输出到控制台,Count = 30,HasError = False,ToSequenceId = 126,FromSequenceId = 10
- Succeeded
-
此时我再次查看数据库表信息,是否有把.csv文件的数据提取处理保存到指定的数据库表中。
- SELECT Id, Email, FirstName, LastName, DateOfBirth, Reputation
- FROM EtlTest.dbo.Person;
-
数据库表Person查询信息显示如下:
结论:Person.zip文件中的3个.csv文件合计数据量30行,存入数据库表EtlTest.dbo.Person的数据符合预期。